Get Started with Reactive Programming in Spring

avatar-matt_raible.jpg Matt Raible

Reactive programming allows you to build systems that are resilient to high load. Handling lots of traffic isn’t a problem because the server is non-blocking and doesn’t block client processes to wait for responses. The client cannot directly observe, or synchronize with, the execution that occurs on the server. When an API is struggling to handle requests, it should respond in a sensible way. It shouldn’t fail to work or drop messages in an uncontrolled fashion. It should communicate that it is under stress to upstream components and get them to reduce load. This is known as "back-pressure" and is an important aspect of reactive programming.

Josh Long I collaborated on this post with Josh Long, fellow Java Champion, Spring Developer Advocate, and all around great guy at Pivotal. I’ve been a longtime user of Spring, and Josh was the one who first showed me Spring Boot, at a Devoxx Belgium many moons ago. We’ve been good friends for a while now, sharing the same passion for Java, developers, and building awesome applications.

Reactive Programming, or, I/O, I/O, It’s Off to Work We Go…​

Reactive programming is an approach to writing software that embraces asynchronous I/O. Asynchronous I/O is a small idea that portends big changes for software. The idea is simple: alleviate inefficient resource utilization by reclaiming resources that would otherwise be idle as they waited for I/O activity. Asynchronous I/O inverts the normal design of I/O processing: the clients are notified of new data instead of asking for it; this frees the client to do other things while waiting for new notifications. Let’s look at an example that compares and contrasts asynchronous I/O to synchronous I/O.

Let’s build a simple program that reads data from a source (a java.io.File reference, specifically). First up, an implementation that uses a trusty 'ol java.io.InputStream implementation:

Example 1. Read data from a file synchronously
package com.example.io;

import lombok.extern.log4j.Log4j2;
import org.springframework.util.FileCopyUtils;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.function.Consumer;

@Log4j2
class Synchronous implements Reader {

    @Override
    public void read(File file, Consumer<BytesPayload> consumer) throws IOException {
        try (FileInputStream in = new FileInputStream(file)) { (1)
            byte[] data = new byte[FileCopyUtils.BUFFER_SIZE];
            int res;
            while ((res = in.read(data, 0, data.length)) != -1) { (2)
                    consumer.accept(BytesPayload.from(data, res)); (3)
            }
        }
    }
}
1 source the file using a regular java.io.File
2 pull the results out of the source one line at a time…​
3 I’ve written this code to accept a Consumer<BytesPayload> that gets called when there’s new data

Pretty straightforward, eh? Run this and you’ll see in the log output, on the left-hand side of each line, that all activity is happening on a single thread.

We’re pulling bytes out of a source of data (in this case, a java.io.InputStream subclass, java.io.FileInputStream). What’s wrong with this example? Well, probably nothing! In this case, we’re using an InputStream that’s pointing to data on the local file system. If the file is there, and the hard drive is working, then this code will work as we expect.

What if, instead of reading data from a File, we read data from a network socket, and used a different implementation of an InputStream? Nothing to worry about! Well, nothing to worry about if the network is infinitely fast, at least. And if the network link between this node and another never fails. If those things are true, then there’s certainly nothing to worry about! This code will work just fine.

What happens if the network is slow, or down? In this case, it’d mean that the time it takes for the in.read(…​) operation to return would be prolonged. Indeed, it may never return! This is a problem if we’re trying to do something else with the thread on which we’re reading data. Sure, we can spin up another thread and read from that one instead. We could keep this up to a point, but eventually, we’ll run into a limit where adding threads doesn’t support our goal of scaling. We won’t have true concurrency beyond the number of cores on our machine. We’re stuck! We can’t handle more I/O, reads in this case, without adding threads, and our ability to scale up with more threads is, ultimately, limited.

In that example, the bulk of the work is in the reading - there’s not much else going on anywhere. We are _I/O bound. Let’s see how an asynchronous solution can help us alleviate the monopolization of our threads.

Example 2. Read data from a file asynchronously
package com.example.io;

import lombok.extern.log4j.Log4j2;
import org.springframework.util.FileCopyUtils;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

@Log4j2
class Asynchronous implements Reader, CompletionHandler<Integer, ByteBuffer> {

    private int bytesRead;
    private long position;
    private AsynchronousFileChannel fileChannel;
    private Consumer<BytesPayload> consumer;
    private final ExecutorService executorService = Executors.newFixedThreadPool(10);

    public void read(File file, Consumer<BytesPayload> c) throws IOException {
        this.consumer = c;
        Path path = file.toPath(); (1)
        this.fileChannel = AsynchronousFileChannel.open(path,
            Collections.singleton(StandardOpenOption.READ), this.executorService); (2)
        ByteBuffer buffer = ByteBuffer.allocate(FileCopyUtils.BUFFER_SIZE);
        this.fileChannel.read(buffer, position, buffer, this); (3)
        while (this.bytesRead > 0) {
                this.position = this.position + this.bytesRead;
                this.fileChannel.read(buffer, this.position, buffer, this);
        }
    }

    @Override
    public void completed(Integer result, ByteBuffer buffer) {
        (4)
        this.bytesRead = result;

        if (this.bytesRead < 0)
            return;

        buffer.flip();

        byte[] data = new byte[buffer.limit()];
        buffer.get(data);

        (5)
        consumer.accept(BytesPayload.from(data, data.length));

        buffer.clear();

        this.position = this.position + this.bytesRead;
        this.fileChannel.read(buffer, this.position, buffer, this);
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        log.error(exc);
    }
}
1 this time, we adapt the java.io.File into a Java NIO java.nio.file.Path
2 when we create the Channel, we specify, among other things, a java.util.concurrent.ExecutorService, that will be used to invoke our CompletionHandler when there’s data available
3 start reading, passing in a reference to a CompletionHandler<Integer, ByteBuffer> (this)
4 in the callback, we read the bytes out of a ByteBuffer into a byte[] holder
5 just as in the Synchronous example, the byte[] data is passed to a consumer

First thing’s first: this code’s waaaay more complicated! There’s a ton of things going on here and it can seem overwhelming, but indulge me, for a moment…​ This code reads data from a Java NIO Channel and processes that data, asynchronously, on a separate thread in a callback handler. The thread on which the read was started isn’t monopolized. We return virtually instantly after we call .read(..), and when there is finally data available, our callback is invoked, and on a different thread. If there is latency between .read() calls, then we can move on and do other things with our thread. The duration of the asynchronous read, from the first byte to the last, is at best as short as the duration of the synchronous read. It’s likely a tiny bit longer. But, for that complexity, we can be more efficient with our threads. We can handle more work, multiplexing I/O across a finite thread pool.

I work for a cloud computing company. We’d love it if you solved your scale-out problems by buying more application instances! Of course, I’m being a bit tongue-in-cheek here. Asynchronous I/O does make things a bit more complicated, but hopefully this example highlights the ultimate benefit of reactive code: we can handle more requests, and do more work, using asynchronous I/O on the same hardware if our work is I/O bound. If it’s CPU-bound (e.g.: fibonacci, bitcoin mining, or cryptography) then reactive programming won’t buy us anything.

Now, most of us don’t work with Channel or InputStream implementations for their day-to-day work! They think about things in terms of higher order abstractions. Things like the arrays, or, more likely, the java.util.Collection hierarchy. A java.util.Collection maps very nicely to an InputStream: they both assume that you’ll be able to work with all the data, near instantly. You expect to be able to finish reading from most InputStreams sooner rather than later. Collection types start to become a bit awkward when you move to larger sums of data; what happens when you’re dealing with something potentially infinite - unbounded - like websockets, or server-sent events? What happens when there’s latency between records? One record arrives now and another not for another minute or hour such as with a chat, or when the network suffers a failure?

We need a better way to describe these kinds of data. We’re describing something asynchronous - something that will eventually happen. This might seem a good fit for a Future<T> or a CompletableFuture<T>, but that only describes one eventual thing. Not a whole stream of potentially unlimited things. Java hasn’t really offered an appropriate metaphor by which to describe this kind of data. Both Iterator and Java 8 Stream types can be unbounded, but they are both pull-centric; you ask for the next record instead of having the type call your code back. One assumes that if they did support push-based processing, which lets you do more with your threads, that the APIs would also expose threading and scheduling control. Iterator implementations say nothing about threading and Java 8 streams all share the same fork-join pool.

If Iterator and Stream did support push-based processing, then we’d run into another problem that really only becomes an issue in the context of I/O: we’d need some way to push back! As a consumer of data being produced asynchronously, we have no idea when or how much data might be in the pipeline. We don’t know if one byte will be produced in the next callback or a if terabyte will be produced!

When you pull data off of an InputStream, you read as much data as you’re prepared to handle, and no more. In the examples above we read into a byte[] buffer of a fixed and known length. In an asynchronous world, we need some way to communicate to the producer how much data we’re prepared to handle.

Yep. We’re definitely missing something.

Solve for the Missing Metaphor

What we want is something that maps nicely to asynchronous I/O, and that supports this push-back mechanism, or flow control, in distributed systems. In reactive programming, the ability of the client to signal how much work it can manage is called backpressure.

There are a good deal many projects - Vert.x, Akka Streams, and RxJava - that support reactive programming. The Spring team has a project called Reactor. There’s common enough ground across these different approaches extracted into a de-facto standard, the Reactive Streams initiative. The Reactive Streams initiative defines four types:

The Publisher<T> is a producer of values that may eventually arrive. A Publisher<T> produces values of type T to a Subscriber<T>.

Example 3. Reactive Streams Publisher<T>.
package org.reactivestreams;

public interface Publisher<T> {

    void subscribe(Subscriber<? super T> s);
}

The Subscriber subscribes to a Publisher<T>, receiving notifications on any new values of type T through its onNext(T) method. If there are any errors, its onError(Throwable) method is called. When processing has completed normally, the subscriber’s onComplete method is called.

Example 4. Reactive Streams Subscriber<T>.
package org.reactivestreams;

public interface Subscriber<T> {

    public void onSubscribe(Subscription s);

    public void onNext(T t);

    public void onError(Throwable t);

    public void onComplete();
}

When a Subscriber first connects to a Publisher, it is given a Subscription in the Subscriber#onSubscribe method. The Subscription is arguably the most important part of the whole specification: it enables backpressure. The Subscriber uses the Subscription#request method to request more data or the Subscription#cancel method to halt processing.

Example 5. Reactive Streams Subscription<T>.
package org.reactivestreams;

public interface Subscription {

    public void request(long n);

    public void cancel();
}

The Reactive Streams specification provides one more useful, albeit obvious, type: A Processor<A,B> is a simple interface that extends both Subscriber<A> and a Publisher<B>.

Example 6. Reactive Streams Processor<T>.
package org.reactivestreams;

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

The specification is not meant to be a prescription for the implementations, instead defining types for interoperability. The Reactive Streams types are so obviously useful that they eventually found their way into the recent Java 9 release as one-to-one semantically equivalent interfaces in the java.util.concurrent.Flow class, e.g.: java.util.concurrent.Flow.Publisher.

Meet Reactor

The Reactive Streams types are not enough; you’ll need higher order implementations to support operations like filtering and transformation. The Reactor project is a good choice here; it builds on top of the Reactive Streams specification. It provides two specializations of Publisher<T>.

The first, Flux<T>, is a Publisher that produces zero or more values. It’s unbounded. The second, Mono<T>, is a Publisher<T> that produces zero or one value. They’re both publishers and you can treat them that way, but they go much further than the Reactive Streams specification. They both provide operators, ways to process a stream of values. Reactor types compose nicely - the output of one thing can be the input to another and if a type needs to work with other streams of data, they rely upon Publisher<T> instances.

Both Mono<T> and Flux<T> implement Publisher<T>; our recommendation is that your methods accept Publisher<T> instances but return Flux<T> or Mono<T> to help the client distinguish the kind of data its being given.

Suppose you’re given a Publisher<T> and asked to render a user interface for that Publisher<T>. Should you render a detail page for one record, as you might be given a CompletableFuture<T>? Or should you render an overview page, with a list or grid displaying all the records in a paged fashion? It’s hard to know.

Flux<T> and Mono<T>, on the other hand, are very specific. You know to render an overview page if you’re given a Flux<T> and a detail page for one (or no) record when given a Mono<T>.

Reactor is an open source project started by Pivotal; it’s become very popular. Facebook uses it in their reactive RPC mechanism, RSocket, led by RxJava creator Ben Christensen. Salesforce uses it in their reactive gRPC implementation. It implements the Reactive Streams types, and so can interoperate with other technologies that support those types like Netflix’s RxJava 2, Lightbend’s Akka Streams, and the Eclipse Foundation’s Vert.x project. David Karnok, lead of RxJava 2, has worked extensively with Pivotal on Reactor, too, making it even better. And, of course, it’s been in Spring Framework in some form or another since Spring Framework 4.0.

Reactive Programming with Spring WebFlux

As useful as project Reactor is, it’s only a foundation. Our applications need to talk to data sources. They need to produce and consume HTTP, SSE and WebSocket endpoints. They will need to support authentication and authorization. Spring provides these things. If Reactor gives us the missing metaphor, Spring helps us all speak the same language.

Spring Framework 5.0 was released in September 2017. It builds on Reactor and the Reactive Streams specification. It includes a new reactive runtime and component model called Spring WebFlux.

Spring WebFlux does not depend on or require the Servlet APIs to work. It ships with adapters that allow it to work on top of a Servlet-engine if need be, but it’s not required. It also provides a net new Netty-based web runtime called Spring WebFlux. Spring Framework 5, which works with a baseline of Java 8 and Java EE 7, is now the baseline for much of the Spring ecosystem including Spring Data Kay, Spring Security 5, Spring Boot 2 and Spring Cloud Finchley.

Learn More about Reactive Programming and Spring

This post is the first in a series on reactive programming, Spring WebFlux, and handling real-time data with React. The next posts in this series can be found below.

If you’d like to learn more, check out these interesting resources on reactive programming and Spring.

Have questions? Please leave a comment below, hit up @starbuxman or @mraible on Twitter, or post a question on our Developer Forums.

Like what you learned today? Follow @oktadev, like us on Facebook, and subscribe to our YouTube channel.