Create custom operators to sum and sort RxJS streams

Connie Leung - Dec 25 '22 - - Dev Community

Introduction

This is day 4 of Wes Bos’s JavaScript 30 challenge where I am going to create RxJS custom decorators to sum and sort streams.

In this blog post, I describe the steps to create RxJS custom decorators, sum and sort to manipulate streams. First, I apply sort() to sort RxJS streams by a comparison function and render the sorted results on browser. Then, I apply sum() to accumulate stream by a property and output the total on browser. When RxJS does not provide operator for a task, we have the ability to build our own and use them to process RxJS streams.

Create a new Angular project in workspace

ng generate application day4-array-cardio-part1
Enter fullscreen mode Exit fullscreen mode

Create RxJS custom operator to sum stream

I create custom-operators directory and add a new file, sum.operator.ts

// sum.operator.ts

import { reduce } from 'rxjs';

export function sum<T, A extends number>(sumFn: (acc: A, t: T) => A, initial: A) {
    return reduce(sumFn, initial);
}
Enter fullscreen mode Exit fullscreen mode
  • the operator accepts an initial value (initial parameter) and accumulator (sumFn)
  • call RxJS’s reduce to calculate the sum of a stream
  • emit total when stream completes.

Create RxJS custom operator to sort stream

Similarly, I add sort.operator.ts to custom-operators directory to sort stream when it completes

import { Observable, map, toArray } from 'rxjs';

export function sort<T>(sortFn: (x: T, y: T) => number) {
    return function(source: Observable<T>) {
        return source.pipe(
            toArray(),
            map(items => items.sort(sortFn))
        )
    }
}
Enter fullscreen mode Exit fullscreen mode
  • the operator accepts a comparison function (sortFn) that compares two objects and return an integer to define order
  • when source stream completes, I call toArray() to emit an T[]
  • execute map() to pass sortFn to Array.sort() and emit the sorted array

Get RxJS stream in an array

src/app
├── app.component.spec.ts
├── app.component.ts
├── app.module.ts
└── custom-operators
    ├── sort.operator.ts
    └── sum.operator.ts
Enter fullscreen mode Exit fullscreen mode

Inventors is an array of object I convert the array into a stream using from operator

inventors = [
    { first: 'Albert', last: 'Einstein', year: 1879, passed: 1955 },
    { first: 'Isaac', last: 'Newton', year: 1643, passed: 1727 },
    { first: 'Galileo', last: 'Galilei', year: 1564, passed: 1642 },
    { first: 'Marie', last: 'Curie', year: 1867, passed: 1934 },
    { first: 'Johannes', last: 'Kepler', year: 1571, passed: 1630 },
    { first: 'Nicolaus', last: 'Copernicus', year: 1473, passed: 1543 },
    { first: 'Max', last: 'Planck', year: 1858, passed: 1947 },
    { first: 'Katherine', last: 'Blodgett', year: 1898, passed: 1979 },
    { first: 'Ada', last: 'Lovelace', year: 1815, passed: 1852 },
    { first: 'Sarah E.', last: 'Goode', year: 1855, passed: 1905 },
    { first: 'Lise', last: 'Meitner', year: 1878, passed: 1968 },
    { first: 'Hanna', last: 'Hammarström', year: 1829, passed: 1909 }
  ];

inventors$ = from(this.inventors).pipe(shareReplay(this.inventors.length));
Enter fullscreen mode Exit fullscreen mode

shareReplay(this.inventors.length) caches all the inventors before other streams reuse inventors$ to sum or sort RxJS stream.

Define inventoryArray$ to get an inventory when inventors$ completes

inventorArray$ = this.inventors$.pipe(toArray());
Enter fullscreen mode Exit fullscreen mode

Define a ng template to output inventory array as an unordered list. Subsequently, I reuse this template to output other array examples.

// app.component.ts

@Component({
  selector: 'app-root',
  template: `
    <div class="container">
      <ng-container 
        *ngTemplateOutlet="inventors; context: { $implicit: 'Inventors', list: inventorArray$ | async }">
      </ng-container>
    </div>

    <ng-template #inventors let-title let-list="list">
      <section class="inventors">
        <h2>{{ title }}</h2>
        <ul>
          <li *ngFor="let inventory of list; trackby: inventoryTrackBy">
            Name: {{ inventory.first }} {{ inventory.last }}<br />
            {{ inventory.year }} - {{ inventory.passed }}, Age: {{ inventory.passed - inventory.year }}
          </li>
        </ul>
      </section>
    </ng-template>
  `,
  styles: [`
    :host {
      display: block;
    }
    ... omitted for brevity ...
  `]
})
export class AppComponent { ...RxJS codes ... }
Enter fullscreen mode Exit fullscreen mode

Sort inventor stream by year

ordered$ = this.inventors$.pipe(sort((a, b) => a.year > b.year ? 1 : -1));
Enter fullscreen mode Exit fullscreen mode

I simply pass a comparison function to the custom sort operator to compare the year property of two inventory elements. It is possible because toArray returns an JavaScript array that has a sort function to perform sorting.

Similarly, the template is available to render ordered$ after it is resolved

<ng-container 
     *ngTemplateOutlet="inventors; context: { $implicit: 'Ordered Inventors', list: ordered$ | async }">
</ng-container>
Enter fullscreen mode Exit fullscreen mode

Sort inventor stream by age from oldest to youngest

oldest$ = this.inventors$.pipe(sort((a, b) => { 
    const lastInventor = a.passed - a.year;
    const nextInventor = b.passed - b.year;
    return lastInventor > nextInventor ? -1 : 1;
  }));
Enter fullscreen mode Exit fullscreen mode

I pass a different comparison function to the operator to compare the age of the inventors. When the next inventor has a greater age (passed – year) than the previous inventor, the comparison function swaps the position to obtain the correct ordering.

<ng-container 
    *ngTemplateOutlet="inventors; context: { $implicit: 'Oldest Inventors', list: oldest$ | async }">
</ng-container>
Enter fullscreen mode Exit fullscreen mode

The template displays the oldest$ stream to list the inventors by their age

Demonstrate sort operator can work on any data type

The custom sort operator can work on any data type because of a powerful TypeScript concept called generic. It can sort any element of type T as long as the comparison function returns an integer to define order.

The following is an example to sort a people stream by last name.

people = ['Bernhard, Sandra', 'Bethea, Erin', 'Becker, Carl', 'Bentsen, Lloyd', 'Beckett, Samuel', 'Blake, William', 'Berger, Ric', 'Beddoes, Mick', 'Beethoven, Ludwig','Belloc, Hilaire', 'Begin, Menachem', 'Bellow, Saul', 'Benchley, Robert', 'Blair, Robert', 'Benenson, Peter', 'Benjamin, Walter', 'Berlin, Irving','Benn, Tony', 'Benson, Leana', 'Bent, Silas', 'Berle, Milton', 'Berry, Halle', 'Biko, Steve', 'Beck, Glenn', 'Bergman, Ingmar', 'Black, Elk', 'Berio, Luciano','Berne, Eric', 'Berra, Yogi', 'Berry, Wendell', 'Bevan, Aneurin', 'Ben-Gurion, David', 'Bevel, Ken', 'Biden, Joseph', 'Bennington, Chester', 'Bierce, Ambrose', 'Billings, Josh', 'Birrell, Augustine', 'Blair, Tony', 'Beecher, Henry', 'Biondo, Frank'];

people$ = from(this.people).pipe(shareReplay(this.people.length));
alpha$ = this.people$.pipe(sort((lastOne, nextOne) => {
   const [aLast] = lastOne.split(', ');
   const [bLast] = nextOne.split(', ');
   return aLast > bLast ? 1 : -1;
}));
Enter fullscreen mode Exit fullscreen mode

The data type of people in string whereas the inventors in the inventors stream are objects. Yet, sort sorts the streams in the correct order in all the examples.

Demonstrate sum operator to sum RxJS stream

The example is to add the year of all the inventors and display the total year. The sum operator calls reduce under the hood; therefore, it expects a reducer function and an initial value.

inventors$ = from(this.inventors).pipe(shareReplay(this.inventors.length));
totalYears$ = this.inventors$.pipe(sum((acc: number, y) => acc + (y.passed - y.year), 0));

<section class="inventors">
    <h2>Total Years</h2>
    <p>{{ totalYears$ | async }}</p>
</section>
Enter fullscreen mode Exit fullscreen mode

The custom operator is very simple and it can take other functions to do things such as count the number of characters in first or last.

totalFirstLength$ = this.inventors$.pipe(sum((acc: number, y) => acc + y.first.length, 0));
Enter fullscreen mode Exit fullscreen mode

Another one-liner that states the purpose of the stream clearly.

Final Thoughts

In this post, I show how to create custom RxJS operators and use them to transform streams in an Angular application. To emphasize DRY principle, I create a ng template to render the streams in a list to reduce duplicated codes in the inline template.

This is the end of the blog post and I hope you like the content and continue to follow my learning experience in Angular and other technologies.

Resources:

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .