Asynchronous Processing Models in Services

Sergiy Yevtushenko - Sep 30 '19 - - Dev Community

Asynchronous processing is one of the topics of my interests for long time. I saw a lot of articles dedicated to this topic, but I newer saw any article which would cover (at least briefly) most of them. I decided to try to fill this gap. Description below is quite brief, but tries to show at least basic properties of each model.

Callbacks

This one is, probably one of the oldest and probably most widely used. The idea is obvious - configure callback functions (often called handler) and it will be called when request arrives or some other event happens. There are a lot of uses of this model. Probably most famous one are earlier versions of Node.js. In Java world, older versions of Vert.x is a great example of this model.

In code usually this model looks similar to following:

    ... // Prepare request
    request.send(asyncResult -> {
      if (asyncResult.succeeded()) {
        System.out.println(asyncResult.result().body().getString("joke"));       
      }
    });

Enter fullscreen mode Exit fullscreen mode

The lambda passed to request.send is a callback function, which is invoked when response on request is received.

Although this model looks simple, it has one huge drawback - callbacks are not composable. There is no simple way, for example, to create callback which will be invoked when two other callbacks were triggered. As a consequence, any more or less complex processing quickly turns into callback hell.

Reactive Streams

This model is quite popular: implementations like RxJava, Project Reactor, Reactive Streams are widely used.

The idea behind this model is following: every source of incoming events/requests/data/etc. is represented as stream and then processing is assembled as pipeline which processes elements from that stream. The pipeline can be quite complex and can be composed from different streams.

The code usually looks like this:

Observable.fromIterable(words)
 .zipWith(Observable.range(1, Integer.MAX_VALUE), 
     (string, count)->String.format("%2d. %s", count, string))
 .subscribe(System.out::println);
Enter fullscreen mode Exit fullscreen mode

This model is extremely powerful and convenient. Nevertheless, complex processing streams might require some time to master and not always simple to grasp. Another disadvantage is that every source of data (including single values) should be represented as stream. This often might look artificial.

Future

The Future represents a result which might not be yet available, but will be available eventually:

Future<Result> futureResult = executor.submit(() -> new Result("success"));
Enter fullscreen mode Exit fullscreen mode

Future is composable (although requires quite a lot of ceremony ) but it's far from convenient. If you need to know when result is ready, you have no choice but just convert asynchronous processing into synchronous one by calling Future.get().

Promises

At the high level Promises can be thought as combination of Futures and callbacks. When Promise is get resolved (assigned value) callbacks are invoked automatically:

...
Promise<User> getUserById(UUID userId);
...

getUserById(userId)
    .then(user -> System.out.println("User: " + user));
...

Enter fullscreen mode Exit fullscreen mode

The callback function passed to .then() method will be invoked when Promise will be resolved. There is no limitation to number of callbacks which can be added to Promise, so while instance is passed across various methods, they can add their own callbacks as necessary. If Promise is already resolved by the time when new callback is added, the callback is invoked immediately. Every callback is invoked only once and only in context of one thread, so there are no synchronization issues to worry about.

Promises are composable, code is quite simple to write and read. There are also no artificial representations of data.

These properties of Promises triggered (slow) switching to them in recent versions of Vert.x, although they look somewhat foreign there because callback-based API remains mostly unchanged, so Promises are just wrappers for callbacks.

There is another implementation of Promises present in JDK since Java 8 and called CompletableFuture. Must admit that this implementation is not a direct replacement for traditional Promises - it's quite heavyweight and has inconvenient API.

Instead of afterword

At present I'm working on the library which implements Promises model in more lightweight and convenient to use form.
To get a taste how code utilizing this library might look like:

    public Promise<Either<? extends BaseError, UserDashboard>> userProfileHandler(final UUID userId) {
        return zipAll(userService.userProfile(userId), 
                      userService.followers(userId), 
                      articleService.articlesByUser(userId, Order.DESC), 
                      commentService.commentsByUser(userId, Order.DESC))
                .map(result -> result.mapSuccess(tuple -> tuple.map(UserDashboard::new)));
    }

Enter fullscreen mode Exit fullscreen mode

The code above is just an example and library itself has version 0.1.0, but it's already possible to write code this way using library in its current state.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .