NestJS : from Promises to Observables

Meidi Airouche - Mar 28 - - Dev Community

NestJS, is a progressive Node.js framework based on Angular core for backend applications. It is built with Typescript. In this article, we'll see how Observables can be a useful alternative to Promises in specific use cases.

Understanding Promises in NestJS

In Javascript/Typescript, Promises are a standard for executing asynchronous tasks such as database queries, file operations, HTTP requests...

// Example of Promise in NestJS

@Injectable()
export class UserService {
  async findById(id: string): Promise<User> {
    return this.userModel.findById(id).exec();
  }
}
Enter fullscreen mode Exit fullscreen mode

The main features of Promises are :

  • Simple Asynchronous Operations: When dealing with straightforward asynchronous tasks that return a single value or a one-time operation, Promises are often sufficient.
  • Compatibility: Promises are natively supported in JavaScript and are the goto choice when integrating with libraries or APIs that utilize Promise-based APIs.
  • Error Handling: Promises provide built-in error handling through .catch() or try-catch blocks, making them suitable for scenarios where error handling is crucial.

What are Observables ?

Observables comes in RxJS library and represent a stream of values over time. They offer powerful capabilities for handling asynchronous operations, such as transformation, combination, and cancellation.

// Example of Observable in NestJS

@Injectable()
export class UserService {
  findAll(): Observable<User[]> {
    return from(this.userModel.find().exec());
  }
}
Enter fullscreen mode Exit fullscreen mode

With Observables, you can write code that reacts to changes in data and events, allowing you to build more reactive applications.

The main features of Observables are :

  • Complex Asynchronous Workflows: Observables excel in scenarios involving complex asynchronous workflows, such as event streams, real-time data updates, or continuous data transformations.
  • Data Streams: When dealing with data streams or sequences of events, Observables provide a more expressive and flexible way to handle asynchronous operations.
  • Operators and Pipelines: Observables offer a rich set of operators that enable powerful transformations, filtering, and combination of data streams, making them ideal for scenarios requiring advanced data manipulation. Some common operators include map, filter, reduce, merge, and debounce, among many others.

Why would I need Observables ?

One of the key benefits of using Observables is that they support composition. You can combine multiple observables, apply operators to them, and create new observables as a result. This makes it easy to build complex asynchronous workflows and handle data dependencies.

Also, Observables support handling errors and completion. Observables can emit error notifications when something goes wrong during data processing, allowing you to handle and recover from errors with compensation process. They also emit a notification when the stream of data has ended, indicating the end of the streaming.

So, Observables and RxJS helps you to write code that is more reactive, declarative, and efficient when dealing with complex asynchronous scenarios.

What are some common use cases in NestJS ?

Using RxJS with a NestJS application can enhance its capabilities for handling asynchronous operations and creating reactive pipelines. Here are some common RxJS use cases in NestJS.

Asynchronous

As any application, you may want to perform asynchronous time-consuming operations. Observables can be used to represent asynchronous data streams and apply operators like from to convert Promise or callback functions into Observables and handle data processing.

import { Injectable } from '@nestjs/common';
import { Observable, from } from 'rxjs';
import { AxiosResponse } from 'axios';

@Injectable()
export class DataService {
  fetchData(): Observable<any> {
    return from(getExternalDataFromAPI());
  }
}

async function getExternalDataFromAPI(): Promise<AxiosResponse> {
  // Simulate fetching data from an external API
  return axios.get('https://myapi.com/data');
}
Enter fullscreen mode Exit fullscreen mode

Reactive Controllers

You can create reactive endpoints that respond to changes in data or events thanks to Observables representing data streams and then returning the result as a response.

import { Controller, Get } from '@nestjs/common';
import { Observable, interval } from 'rxjs';
import { map } from 'rxjs/operators';

@Controller('data')
export class DataReactiveController {
  @Get('stream')
  streamData(): Observable<number> {
    // Simulate streaming data with an interval
    return interval(1000).pipe(
      map(() => Math.random()) // Transform the interval data
    );
  }
}
Enter fullscreen mode Exit fullscreen mode

Inter Process Communication

In a microservices architecture, NestJS applications may need to communicate with other services. RxJS can facilitate this communication by using Observables as a means of streaming data between services. You can use operators like switchMapor mergeMap to handle data dependencies and make multiple service calls in a reactive manner.

import { Controller, Get, Inject } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';
import { Observable } from 'rxjs';
import { switchMap } from 'rxjs/operators';

@Controller('data')
export class DataController {
  private readonly client: ClientProxy;

  constructor() {
    this.client = ClientProxyFactory.create({
      transport: Transport.TCP,
      options: {
        host: 'localhost',
        port: 8888,
      },
    });
  }

  @Get()
  fetchData(): Observable<YourType> {
    return this.client.send<YourType>({ cmd: 'fetchData' }).pipe(
      switchMap(response => {
        // Perform additional operations with the received data
        // For example, make another service call based on the initial response
        return this.client.send<YourType>({ cmd: 'processData', data: response });
      })
    );
  }
}
Enter fullscreen mode Exit fullscreen mode

Sometimes, in complex communication flows, having a graphical representation of them can be helpful. This is why you may want to use Marbles.

In RxJS, marbles are a visual representation used to illustrate the behavior of observable sequences, operators, and time-based events. These diagrams consist of characters, such as -, |, ^, and #, representing different aspects of observable streams, including values emitted over time, completion, errors, and subscription points. You can use tools like ThinkRx to visualize your flows.

Middleware and Pipes

NestJS provides middleware and pipes for intercepting and modifying incoming requests and outgoing responses. You can use RxJS operators to handle asynchronous operations within middleware or pipes. For example, you can use the map operator to transform data or catchErroroperator to handle errors.

import { Injectable, NestMiddleware } from '@nestjs/common';
import { Request, Response } from 'express';
import { Observable, of } from 'rxjs';
import { catchError, map } from 'rxjs/operators';

@Injectable()
export class LoggingMiddleware implements NestMiddleware {
  use(req: Request, res: Response, next: () => void) {
    console.log('Logging middleware executing...');
    // Simulate an asynchronous operation
    this.asyncOperation().pipe(
      map(data => {
        // Transform data if needed
        return data.toUpperCase();
      }),
      catchError(error => {
        // Handle errors if any
        console.error('Error occurred in logging middleware:', error);
        return of('Error occurred in logging middleware');
      })
    ).subscribe(
      transformedData => {
        console.log('Transformed data:', transformedData);
        next();
      }
    );
  }

  asyncOperation(): Observable<string> {
    return new Observable<string>(observer => {
      setTimeout(() => {
        observer.next('Data from async operation');
        observer.complete();
      }, 1000);
    });
  }
}
Enter fullscreen mode Exit fullscreen mode

Event-driven Programming

NestJS applications can benefit from event-driven programming, where components react to events and trigger actions accordingly. RxJS provides a rich set of operators to handle event streams. You can use subjects or event emitters as Observables to represent events and use operators like filter or debounceTime to handle event stream transformations. Let's illustrate it with a real-time notification system :

import { Injectable } from '@nestjs/common';
import { Subject, Observable } from 'rxjs';
import { filter, debounceTime } from 'rxjs/operators';

@Injectable()
export class EventService {
  private eventSubject = new Subject<string>();

  emitEvent(event: string): void {
    this.eventSubject.next(event);
  }

  getFilteredEvents(keyword: string): Observable<string> {
    return this.eventSubject.pipe(
      filter(event => event.includes(keyword))
    );
  }

  getDebouncedEvents(time: number): Observable<string> {
    return this.eventSubject.pipe(
      debounceTime(time)
    );
  }
}
Enter fullscreen mode Exit fullscreen mode

Testing

You also have testing utilities that can be used to write tests for NestJS applications in RxJS. You can use operators like toArray or toPromise to convert Observables into arrays or promises to assert the emitted values during testing.

Let's imagine a data service like :

import { Injectable } from '@nestjs/common';
import { HttpClient } from '@nestjs/common';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';

@Injectable()
export class DataService {
  constructor(private readonly http: HttpClient) {}

  fetchData(): Observable<YourType[]> {
    return this.http.get<YourType[]>('https://myapi.com/data').pipe(
      map(response => response.map(item => ({ id: item.id, name: item.name })))
    );
  }
}
Enter fullscreen mode Exit fullscreen mode

Here is the NestJS test we could build thanks to RxJS:

import { Test, TestingModule } from '@nestjs/testing';
import { DataService } from './data.service';
import { HttpClientTestingModule, HttpTestingController } from '@nestjs/common/testing';
import { of } from 'rxjs';

describe('DataService', () => {
  let service: DataService;
  let httpTestingController: HttpTestingController;

  beforeEach(async () => {
    const module: TestingModule = await Test.createTestingModule({
      imports: [HttpClientTestingModule],
      providers: [DataService],
    }).compile();

    service = module.get<DataService>(DataService);
    httpTestingController = module.get<HttpTestingController>(HttpTestingController);
  });

  afterEach(() => {
    httpTestingController.verify();
  });

  it('should be defined', () => {
    expect(service).toBeDefined();
  });

  it('should fetch data from the API and transform it', (done) => {
    const testData = [{ id: 1, name: 'Item 1' }, { id: 2, name: 'Item 2' }];
    const transformedData = [{ id: 1, name: 'Item 1' }, { id: 2, name: 'Item 2' }];

    service.fetchData().subscribe((data) => {
      expect(data).toEqual(transformedData);
      done();
    });

    const req = httpTestingController.expectOne('https://myapi.com/data');
    expect(req.request.method).toEqual('GET');

    req.flush(testData);
  });

  it('should handle errors', (done) => {
    const errorResponse = { status: 404, message: 'Not Found' };

    service.fetchData().subscribe(
      () => {},
      (error) => {
        expect(error).toEqual(errorResponse);
        done();
      }
    );

    const req = httpTestingController.expectOne('https://myapi.com/data');
    req.error(new ErrorEvent('Error'));
  });
});

Enter fullscreen mode Exit fullscreen mode

We've created a DataService that fetches data from an external API using the HttpClient from @nestjs/common. The fetchData method transforms the data using the map operator before returning it as an Observable.

In the tests, we use Test.createTestingModule from @nestjs/testing to set up a testing module. We import HttpClientTestingModule from @nestjs/common/testing to mock the HttpClient. We then test the behavior of the fetchData method by subscribing to the Observable and asserting the emitted values. We also test error handling by simulating an error response from the API.

By using RxJS testing utilities like of, toPromise, and HttpTestingController from @nestjs/common/testing, we can easily write tests for NestJS applications that use Observables, ensuring our services behave as expected and handle errors gracefully.

Conclusion

When needed, transitioning from Promises to Observables & RxJS operators in NestJS opens up new possibilities for handling complex asynchronous workflows. Whether you're fetching data from external APIs in a pipe, handling real-time updates, or managing streams of events, it offers a great tools for sustainable asynchronous programming in NestJS.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .