Introduction To Rxjs and Observables

Kinanee Samson - Mar 13 '21 - - Dev Community

Good day guys, during the last weekend and the past few days, i have been using Observables in my code and i have learnt a few cool things about observables and how they can help you to handle data as if it is a stream that passes through time.

Installation,

run npm install rxjs

What are Observables?

Observables are lazy push collections of multiple values. This is quite unambiguous and abstract, to understand more about what observables are, let's understand what push and pull systems are and then look at observables again and we can see how much we can use observables to accomplish.

PULL System

In Pull systems the consumer pulls the data out of the producer. The producer is the function that returns the data. e.g

// every function is a pull system
const fetchHero = function (){
 return "superman"
}

console.log(fetchHero()) // superman
Enter fullscreen mode Exit fullscreen mode

Immediately the function is called the data inside is pulled by the consumer and we get the data... The consumer is the compiler or runtime, a function is a single pull system.

How does that relate to Observables?

Observables are push systems as we said earlier, push system determine when they will push their data to the consumer. Much like we can return a promise and then consume it later using the .then method. But a Promise only returns a single value. Observables can emit multiple values over time. That is why we said that Observables allow us to treat data as if it was a stream. This is not the only way Observables are different from Promises.

Observables can behave both synchronous and asynchronous, however promises are asynchronous.


import { Observable } from 'rxjs'

const observable = new Observable(subscriber => {
 subscriber.next("hello")
 subscriber.next("world")
 setTimeout(()=> {
 subscriber.next("superman was with louis")
 subscriber.complete()
}, 1000)
})

console.log("before subscribe")
observable.subscribe( next(val) => {
 console.log(val)
} ,
complete(){
 console.log('done')
})
console.log("where's supes?")
// before subscribe
// hello
// world
// where's supes ?
// superman was with louis
// done
Enter fullscreen mode Exit fullscreen mode

You see, we just created a basic observable and used it to emit mutiple values hello and world observe closely and you'll see that they are emitted synchronously. Next we use a setTimeout function to mimmick an asynchronous operation amd we emit the final value which is another string. We can emitt other values and even Observables too. When we defined the observable it still had it's data inside it. We log a string reading before subscribe. That is before we subscribed to the observable to get the data out of it. That how you consume Observables

Subscription

When we want to consume the data in the observable we use a subscribe function to get the data out, the subscribe function accepts three functions as arguments.

  • next, this is the first method and it is required when we call the subscribe function. We must pass in this function, it accepts a parameter which is a representataive of the value that the observable emitting, once that data is available it performs a specific function on each instance on the emitted data.
  • error, this function is used for custom error handling however we did not pass in an error handler function ourselves because that is not the right ray way to do things, we will look at error handling later.
  • complete, this function is passed in to do something when the subscriber.complete is returned, at this point, the Observable stops emitting data, we can then do something immediately after that. Here we just log done.

Know the above and keep it with you, we will seldom create observables like that, the Rxjs library provides many custom handlers for creating and handling Observables

From Promises

We can create an observable from a promise, this is useful if we are working with API's that returms a promise.

 import { from } from 'rxjs'

const users = fetch('https://jsonplaceholder.typicode.com/users').then( res => res.json())

const usersObservables = from(users())
usersObservable.subscribe(user => console.log(user))

Enter fullscreen mode Exit fullscreen mode

From an Array

We can also create an Observable from an array or an iterable object. It will emit the values in the array, one by one.

  import { from } from 'rxjs'

const heroObservable = from(['superman', 'batman', 'cyborg', 'wonder woman'])

heroObservable.subscribe(v => console.log(v))
//superman, batman, cyborg, wonder woman
Enter fullscreen mode Exit fullscreen mode

From an Event

import { fromEvent } from 'rxjs'

const body = document.queySelector('body')

const Click = fromEvent(body, 'click')
Click.subscribe(v => console.log(v))

Enter fullscreen mode Exit fullscreen mode

The fromEvent function accepts two parameters, the first one is the element we want to listen for the event on, the next parameter is the event itself. In this example we listened for the click event on the body of the document.

From Timers

We can create an Observable from a timer, we just import timer from rxjs and pass in an integer that represents the number of seconds we want to delay.

import { timer } from 'rxjs'
const waitTime = timer(2000) // 2000miloseconds
waitTime.subscribe(() => console.log('2secs has passed'))
// 2secs has passed
Enter fullscreen mode Exit fullscreen mode

We can easily create an interval, that is Observale that will emit after an interval of the number of mili seconds we pass to it.

import { interval } from 'rxjs'

const intervalObservable = interval(1000)

intervalObservable.subscribe(() => console.log(new Date().getSeconds()))
Enter fullscreen mode Exit fullscreen mode

Http Resquest,

To make http request, we can import the ajax operator, let's see how we can do that.

import { ajax } from 'rxjs/operators/ajax'

const httpReq = ajax.getJSON('url')
httpReq.subscribe(data => console.log(data))
Enter fullscreen mode Exit fullscreen mode

Operators

Rxjs comes with tones of operators that you can use to make an Observable (creation observables) or to transform them (pipeable operators). The methods we discussed in the previous section are all creation Observables this implies that they create Observables out of ordinary values as we saw above, while pipeable operators create Observables from other Observables or transform them or combine them. Let's see some pipeable operators and how usefuo they can be to us.

Map

The map operator subscribes to in coming values emitted by another observable and them maps them to a value and emitts them to another Observable or a subscriber. The map operator works just like array.map()

 import { from } from 'rxjs'
 import { map } from 'rxjs/operators'

const heroes = [
 {name: 'spiderman', alias: 'peter parker'},
 {name: 'ironman', alias: 'tony stark'},
 {name: 'batman', alias: 'bruce wayne'}
]

const mappedHero = from(heroes).pipe(
 map(hero => hero.alias)
)

mappedHero.subscribe(x => console.log(x))
//peter parker, tony stark, bruce wayne
Enter fullscreen mode Exit fullscreen mode

Filter

The filter operator is used to filter incoming values emitted from a previous Observable, it works like array.filter()

 import { from } from 'rxjs'
 import { map } from 'rxjs/operators'

const nums = [-1, 3, -2, 5, -6, 8]

const positiveNums = from(nums).pipe(
 filter(num => num > 0)
)

positiveNums.subscribe(x => console.log(x))
Enter fullscreen mode Exit fullscreen mode

First

This operator returns only the first value emitted by an Observable, this might be useful when we have an Observable that is emitting multiple values but we are only interested in the first value.

 import { from } from 'rxjs'
 import { first } from 'rxjs/operators'

const FirstObservable = from([1, 2, 3, 4]).pipe(
 first()
)

FirstObservable.subscribe(x => console.log(x))
Enter fullscreen mode Exit fullscreen mode

takeUntil

This Operator is useful when we want to continue receiving values emitted from an Observablw untill a condition becomes true.

import { interval, timer } from 'rxjs'
import { takeUntil, map } from 'rxjs/operators'

const seconds = interval(1000)
const timed = timer(5000)

seconds.pipe (
 takeUntil(timed)
)

seconds.subscribe(x => console.log(new Date().getSeconds()))
// 1
// 2
// 3
// 4
// And it unsubscribes from the Observable
Enter fullscreen mode Exit fullscreen mode

catch an error

This operator is used to catch errors that result from working with Observables.

import { Observable, from } from 'rxjs'
import { catchError } from 'rxjs/operators'
const observer = new Observable(subscriber => {
subscriber.next(1)
subscriber.next(2)
subscriber.next(3)
throw 'something happened'
})

observer.pipe(
catchError(err => from([4, 5, 6,7])
)

observer.subscribe(x => console.log(x))
// 1, 2, 3, 4, 5, 6, 7
Enter fullscreen mode Exit fullscreen mode

There are so many operators that comes with rxjs than i can cover in this article, in the future i will do an article to cover rxjs operators. I hope you found this useful.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .