Reactive Programming in JavaScript with RxJS.

Sagar - Sep 30 '18 - - Dev Community

RxJS is JavaScript library for transforming, composing and querying asynchronous streams of data. RxJS can be used both in the browser or in the server-side using Node.js.

I took a challenge to explain RxJS to developers in a simplistic way. The hardest part of the learning RxJS is “Thinking in Reactively".

Think of RxJS as “LoDash” for handling asynchronous events.

So, What Exactly, Reactive Programming is?

Reactive programming

Reactive programming is a programming paradigm for writing code, mainly concerned with asynchronous data streams. Just a different way of building software applications which will "react" to changes that happen instead of the typical way of writing software where we explicitly write code (aka "imperative" programming) to handle those changes.

Stream

stream

A stream is a sequence of ongoing events ordered in time. It can be anything like user inputs, button clicks or data structures. You can listen to a stream and react to it accordingly. You can use functions to combine, filter or map streams.

Stream emit three things during its timeline, a value, an error, and complete signal. We have to catch this asynchronous event and execute functions accordingly.

Both promise and observables are built to solve problems around async (to avoid “callback hell”).

Types of async operations in modern web applications

  • DOM Events- (mouse events, touch events, keyboard events, form events etc)
  • Animations - (CSS Transitions and Animations, requestAnimationFrame etc)
  • AJAX
  • WebSockets
  • SSE - Server-Sent Events
  • Alternative inputs (voice, joystick etc)

If you're still confused, don't worry, this normally doesn't make much sense at this point. Let's dive in step by step.

Observable

observable

  • An Observable is just a function, with a few special characteristics. It takes in an “observer” (an object with “next”, “error” and “complete” methods on it), and returns cancellation logic.
  • Observables provide support for passing messages between publishers and subscribers in your application.
  • Observables offer significant benefits over other techniques for event handling, asynchronous programming, and handling multiple values.
  • Observables are lazy. It doesn't start producing data untill you subscribe to it.
  • subscribe() returns a subscription, on which a consumer can be call unsubscribe() to cancel the subscription and tear donw the producer.
  • RxJS offers a number of functions that can be used to create new observables. These functions can simplify the process of creating observables from things such as events, timers, promises, and so on. For example:
    const button = document.querySelector("button");
    const observer = {
      next: function(value) {
        console.log(value);
      },
      error: function(err) {
        console.error(err);
      },
      complete: function() {
        console.log("Completed");
      }
    };

    // Create an Observable from event
    const observable = Rx.Observable.fromEvent(button, "click");
    // Subscribe to begin listening for async result
    observable.subscribe(observer);
Enter fullscreen mode Exit fullscreen mode

Subscription

subscription

  • An Observable instance begins publishing values only when someone subscribes to it. You subscribe by calling the subscribe() method of the instance, passing an observer object to receive the notifications.
  • A Subscription has one important method, unsubscribe(), that takes no argument and just disposes of the resource held by the subscription.
    const button = document.querySelector("button");
    const observable = Rx.Observable.fromEvent(button, "click");
    const subscription = observable.subscribe(event => console.log(event));
    // Later:
    // This cancels the ongoing Observable execution which
    // was started by calling subscribe with an Observer.
    subscription.unsubscribe();
Enter fullscreen mode Exit fullscreen mode

Observer

observer

  • An observer is object literal with next(), error() and complete() functions. In the above example, the observer is the object literal we pass into our .subscribe() method.
  • When an Observable produces values, it then informs the observer, by calling .next() method when a new value was successfully captured and .error() when an error occurred.
  • When we subscribe to an Observable, it will keep passing values to an observer until the complete signal.
  • Example of an observer.
    // observer is just object literal with next(), error() and complete() functions
    // Howerver, next() function is required, remaining error() and complete() functions are optional 
    const observer = {
      next: function(value) {
        console.log(value);
      },
      error: function(err) {
        console.error(err);
      },
      complete: function() {
        console.log("Completed");
      }
    };
Enter fullscreen mode Exit fullscreen mode

Operators

operator

  • Operators are functions that build on the Observables foundation to enable sophisticated manipulation of collections.
  • An Operator is essentially a pure function which takes one Observable as input and generates another Observable as output.
  • There are operators for different purposes, and they may be categorized as creation, transformation, filtering, combination, multicasting, error handling, utility etc.
  • Operators pass each value from one operator to the next before proceeding to the next value in the set. This is different from array operators (map and filter) which will process the entire array at each step.
  • For example,
    const observable = Rx.Observable.of(1, 2, 3).map(value => value * value);

    observable.subscribe(x => console.log(x));
    // Output:
    // 1
    // 4
    // 9
Enter fullscreen mode Exit fullscreen mode
  • RxJS provides many operators, but only a handful are used frequently. For a list of operators and usage samples, visit the RxJS API Documentation.

common operator list

Subject

subject

  • RxJS Subject is a special type of Observable that allows values to be multicasted to many Observers. While plain Observables are unicast (each subscribed Observer owns an independent execution of the Observable), Subjects are multicast.
  • A subject in RxJS is a special hybrid that can act as both an Observable and an Observer at the same time.
  • In the example below, we have two Observers attached to a Subject, and we feed some values to the Subject:
    const subject = new Rx.Subject();

    subject.subscribe({
      next: v => console.log("observerA: " + v)
    });
    subject.subscribe({
      next: v => console.log("observerB: " + v)
    });

    subject.next(1);
    subject.next(2);

    // output
    // observerA: 1
    // observerB: 1
    // observerA: 2
    // observerB: 2
Enter fullscreen mode Exit fullscreen mode

Observable vs Promise

Observable vs Promise

For better understanding, we're going to compare and contrast the ES6 Promise API to the Observable library RxJS. We will see how similar Promises and Observables are as well as how they differ and why we would want to use Observables over promises in certain situations.

Single value vs multiple values

  • If you make a request through the promise and wait for a response. You can be sure that there won’t be multiple responses to the same request. You can create a Promise, which resolves with some value.
  • Promise is always resolved with the first value passed to the resolve function and ignores further calls to it.
  • On the contrary, Observables allow you to resolve multiple values until we call observer.complete() function.
  • Example of Promise and Observable.

    // creating demoPromise using ES6 Promise API
    const demoPromise = new Promise((resolve, reject) => {
      asyncOperation((err, value) => {
        if (err) {
          reject(err); // error occured. We will catch error inside chain .catch()
        } else {
          resolve(value); // value received. we will get value inside .then() chain method
        }
      });
    });
    
    // creating a demoObservable using Rxjs.Observable API
    const demoObservable = Rx.Observable.create(observer => {
      asyncOperation((err, value) => {
        if (err) {
          observer.error(err); // instead of reject(err)
        } else {
          observer.next(value); // instead of resolve(value)
          observer.complete(); // optional. once your async task finished then call observer.complete()
        }
      });
    });
    

Eager vs lazy

  • Promises are eager by design meaning that a promise will start doing whatever task you give it as soon as the promise constructor is invoked.
  • Observables are lazy. Observable constructor gets called only when someone actually subscribes to an Observable means nothing happens until you subscribe to it.
  • Examples,
    // demoPromise started emmiting values but still we have not call .then() method on promise
    const demoPromise = new Promise((resolve, reject) => {
      setTimeout(() => {
        console.log('emmit value');
        resolve(100);
      }, 3000);
    });

    // demoObservable not started emmiting values unitll we subscribe to it.
    const demoObservable = new Observable(observer => {
      setInterval(() => {
        if (err) {
          observer.error('DemoError throw'); // instead of reject(err)
        } else {
          observer.next('value'); // instead of resolve(value)
          observer.complete(); // optional. once your async task finished then call observer.complete()
        }
      });
    });
Enter fullscreen mode Exit fullscreen mode

Not cancellable vs cancellable

  • One of the first things new promise users often wonder about is how to cancel a promise. ES6 promises do not support cancellation yet. It is, the reality of the matter is cancellation is really an important scenario in client-side programming.
  • Use a third party library like a bluebird or axios they offer promise cancellation feature.
  • Observable support cancellation of asynchronous task by calling unsubscribe() method on Observable.
  • When you subscribe to an Observable, you get back a Subscription, which represents the ongoing execution. Just call unsubscribe() to cancel the execution.
  • Example of cancellable observable
    const observable = Rx.Observable.from([10, 20, 30]);
    const subscription = observable.subscribe(x => console.log(x));
    // Later:
    subscription.unsubscribe(); // its will stop ongoing execution 
Enter fullscreen mode Exit fullscreen mode

Practical Examples

Creating observables from values

  const observable = Rx.Observable.of("foo", 98, false, ["john", "doe"], {
    age: 19,
    gender: "male"
  });

  observable.subscribe(val => console.log(val));
Enter fullscreen mode Exit fullscreen mode

Creating Observables from stream of values

  const observable = Rx.Observable.create( observer => {
    observer.next('Hello');
    observer.next('Its monday morning!!');
  });

  observable.subscribe(value => console.log(value));
  // output:
  // Hello
  // It's monday morning
Enter fullscreen mode Exit fullscreen mode

Observable from DOM Events

    const button = document.querySelector('button');
    const observable = Rx.Observable.fromEvent(button, 'click');
    observable.subscribe(event => console.log(event));
Enter fullscreen mode Exit fullscreen mode

Observable from Promise

  const promise = new Promise((resolve, reject) => {
    asyncOperation((err, value) => {
      if (err) {
        reject(err);
      } else {
        resolve(value);
      }
    });
  });

  const Observable = Rx.Observable.fromPromise(promise);

  Observable.subscribe(value => console.log(value));
Enter fullscreen mode Exit fullscreen mode

Observable from Timer method

  const timer = Rx.Observable.timer(3000);

  timer.subscribe(() => console.log("timeout!!"));
Enter fullscreen mode Exit fullscreen mode

Observable from Interval

  const interval = Rx.Observable.interval(3000);

  interval.subscribe(tick => console.log(`${tick} tick`));
Enter fullscreen mode Exit fullscreen mode

Map operator

  const observable = Rx.Observable.from(2, 4, 6, 8);

  observable.map(value => value * value).subscribe(result => console.log(result));
Enter fullscreen mode Exit fullscreen mode

Do Operator

    const dogs = Rx.Observable.of("Buddy", "Charlie", "Cooper", "Rocky");

    // do operator used for debugging purpose
    dogs
      .do(dog => console.log(dog))
      .filter(dog => dog === "Cooper")
      .do(dog => console.log(dog))
      .subscribe(dog => console.log(dog));
Enter fullscreen mode Exit fullscreen mode

Debounce and Throttle

  • Debounce - Wait X time, then give me the last value.
  • Throttle - Give me the first value, then wait X time.
    const input = document.querySelector("input");
    const observable = Rx.Observable.fromEvent(input, "keyup");

    observable.debounceTime(3000).subscribe(event => console.log(event));

    observable.throttleTime(1000).subscribe(event => console.log(event));
Enter fullscreen mode Exit fullscreen mode

bufferTime - Collects values from the past as an array, and emits those arrays periodically in time.

      const clicks = Rx.Observable.fromEvent(document, "click");
      const buffered = clicks.bufferTime(1000);
      buffered.subscribe(x => console.log(x));
Enter fullscreen mode Exit fullscreen mode

Conclusion

The promise is the best fit for AJAX operations where Observables are extremely powerful for handling asynchronous tasks. Observables provide a bunch of operators for creating, transforming, filtering and multicasting asynchronous events. Sounds great, doesn’t it? :D

Closing Note

Thanks for reading. I hope you like this article feel free to like, comment or share this article with your friends. For more depth understanding of RxJS checkout provided reference links.

References

  1. RxJS official website
  2. The introduction to Reactive Programming you've been missing
  3. LearnRxJS
  4. What is RxJS?
  5. RxJS Quick Start With 20 Practical Examples
  6. Angular official website
  7. RxJS: Observables, Observers and Operators Introduction
  8. Promises vs Observables
. . . . . . . . .