Here is the implementation for a function that takes a Promise-based fetch function and a function that decides if a response has been successful (from a polling perspective) and polls the fetch function until the isSuccess
function returns true
:
import { Observable, from, interval } from 'rxjs';
import { switchMap, takeWhile } from 'rxjs/operators';
const POLLING_INTERVAL = 600; // in milliseconds
function poll(fetchFn, isSuccessFn, pollInterval = POLLING_INTERVAL) {
return interval(POLLING_INTERVAL).pipe(
switchMap(() => from(fetchFn())),
takeWhile((response) => isSuccessFn(response))
);
}
The above leverages RxJS 5 and uses its new pipe
syntax. If we walk through it:
- create an Observable that emits every 600ms (by default) using
interval
- pipe that into
switchMap
, this operator essentially replaces the Observable’s value to another one, in this instance, replace the emitted interval count with an Observable created from the Promise returned byfetchFn
-
takeWhile
theisSuccessFn
returns true
To go into a bit more detail about switchMap
, it’s a bit like doing this in a Promise chain:
const promiseChain = Promise
.resolve()
.then(
() => someFunctionThatReturnsAPromise()
);
Anything that is now .then
-ed onto promiseChain
would now have the output of someFunctionThatReturnsAPromise
passed in.
Back to the poll function, this is an example of how it would be used:
import axios from 'axios;
poll(
axios.get('https://changing-status.com/status'),
(response) => response.data.status === 'success'
).subscribe((response) => {
// response is HTTP response
console.log(response.data.status);
});
This was sent out on the Code with Hugo newsletter last Monday.
Subscribe to get the latest posts right in your inbox (before anyone else).
The advantage of this Observable-based approach is that we have access to each HTTP response. That’s nice if we need to display the status somewhere.
Observables are also pretty composable, so to turn what we’ve got into something that just emits once when the poll completes we can also do that:
import { skipWhile } from 'rxjs/operators';
const emitOnPollComplete = poll(
axios.get('https://changing-status.com/status'),
(response) => response.data.status === 'success'
).pipe(
skipWhile((response) => response.data.status !== 'success')
).subscribe((response) => {
console.log('Poll complete');
});
Cover photo by Jp Valery