JEP 266: More Concurrency Updates defines an interoperable publish-subscribe framework for reactive streams, enhancements to the java.util.concurrent.CompletableFuture
class, and various other improvements. This post wraps up my series on Java 9's other new enhancements by acquainting you with these concurrency enhancements.
Publish-subscribe framework for reactive streams
Java 9 includes a publish-subscribe framework for reactive streams. In this section, I first introduce the concept of reactive streams and then present this publish-subscribe framework.
Introducing reactive streams
Data processing has evolved from batch architectures that collect data and subsequently process the data after some threshold has been reached, to stream-oriented architectures that help to turn data into knowledge as quickly as possible. Stream-oriented architectures capture and process live data, and modify systems based on the processed results very quickly (typically in seconds or less). In contrast, a batch-processing system might take hours, days, or weeks to respond.
Handling streams of data (especially "live" data whose volume isn't predetermined) requires special care in an asynchronous system. The main issue is that resource consumption needs to be controlled so that a fast data source doesn't overwhelm the stream destination. Asynchrony is needed to enable the parallel use of computing resources, on collaborating network hosts or multiple CPU cores within a single machine, which can greatly speed up data processing.
The Reactive Streams initiative provides a standard for asynchronous stream processing with nonblocking back pressure. A reactive stream provides a way to signal its source to ease production of data when the stream's destination becomes overwhelmed with that data. This signaling capability is like a valve on a water pipe. Closing this valve increases back pressure (the pressure back at the source) while easing the burden on the destination.
The initiative's objective is to govern the exchange of stream data across an asynchronous boundary (such as passing data to another thread) while ensuring that the destination isn't forced to buffer arbitrary amounts of data. In other words, back pressure is an integral part of this model in order to allow the queues that mediate between threads to be bounded. Note that the communication of back pressure is handled asynchronously.
Reactive Streams focuses on finding a minimal set of interfaces, methods, and protocols for describing the operations and entities needed to achieve the objective: asynchronous streams of data with nonblocking back pressure.
Exploring the publish-subscribe framework
Java 9 supports the Reactive Streams initiative by providing a publish-subscribe framework (also known as the Flow API) that consists of the java.util.concurrent.Flow
and java.util.concurrent.SubmissionPublisher
classes.
Flow
is a repository for four nested static
interfaces whose methods establish flow-controlled components in which publishers produce data items that are consumed by one or more subscribers:
Publisher
: A producer of data items that are received by subscribers.Subscriber
: A receiver of data items.Subscription
: Linkage between aPublisher
and aSubscriber
.Processor
: A combination ofPublisher
andSubscriber
for specifying a data-transformation function.
A publisher publishes a stream of data items to registered subscribers and implements Flow.Publisher
. This interface declares a single method, which is invoked to register a subscriber with a publisher:
void subscribe(Flow.Subscriber<? super T> subscriber)
Invoking this method registers subscriber
with the publisher. However, if subscriber
is already registered or the registration fails due to some policy (or other) violation, this method invokes subscriber
's onError()
method with an IllegalStateException
object. Otherwise, subscriber
's onSubscribe()
method is invoked with a new Flow.Subscription
object. subscribe()
throws NullPointerException
when null
is passed to subscriber
.
A subscriber subscribes to a publisher for callbacks of data items and implements Flow.Subscriber<T>
. This interface declares onSubscribe()
and three additional methods:
void onSubscribe(Flow.Subscription subscription)
void onComplete()
void onError(Throwable throwable)
void onNext(T item)
onSubscribe()
is invoked to confirm registration. It receives a subscription
argument whose methods allow requests for new data items to be made to the publisher or to request that the publisher send no more data items.
onComplete()
is invoked when it's known that no additional Subscriber
method invocations will occur for a Subscription
that's not already terminated by error. No other Subscriber
methods are called after this method.
onError(Throwable throwable)
is invoked with the specified throwable
upon an unrecoverable error that's encountered by the publisher or subscription. No other Subscriber
methods are called after this method.
onNext()
is invoked with a subscription's next item
. If this method throws an exception, the resulting behavior isn't guaranteed, but may cause the subscription to be cancelled.
A subscription provides a link between a publisher and a subscriber, letting subscribers receive data items only upon request and cancel at any time. Subscriptions implement the Flow.Subscription
interface, which declares two methods:
void request(long n)
void cancel()
request()
adds n
data items to the current unfulfilled demand for this subscription. If n
is less than or equal to 0, the subscriber's onError()
method is called with an IllegalArgumentException
argument. Otherwise, the subscriber receives up to n
additional onNext()
invocations (or fewer when terminated). Passing Long.MAX_VALUE
to n
indicates an unbounded number of invocations.
cancel()
causes the subscriber to eventually stop receiving data items. A best-effort attempt is made; additional data items may be received after cancel()
is called.
Finally, a processor is a data-transformation function that operates on a stream without having to change the publisher or subscriber. One or more chained processors can be placed between publisher and subscriber to transform a data stream into another. The publisher and subscriber aren't dependent on the transformation(s) taking place. The JDK doesn't provide any concrete processors so you must create your own, by implementing the methodless Processor
interface.
SubmissionPublisher
implements Flow.Publisher
, asynchronously issuing submitted (nonnull) data items to current subscribers until it's closed. Each current subscriber receives newly submitted data items in the same order unless drops or exceptions are encountered. SubmissionPublisher
allows data item generators to act as compliant reactive streams publishers that rely on drop handling and/or blocking for flow control.
SubmissionPublisher
provides three constructors for initializing a submission publisher. The simplest (noargument) constructor constructs a submission publisher that relies on the ForkJoinPool.commonPool()
method to provide the asynchrony needed for delivering data items to subscribers (unless it doesn't support a parallelism level of at least two, in which case a new Thread
object is created to run each task).
Listing 1 presents the source code to a FlowDemo
application that demonstrates SubmissionPublisher
and other aspects of Java 9's publish-subscribe framework for reactive streams.
Listing 1. FlowDemo.java
import java.util.Arrays;
import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;
public class FlowDemo
{
public static void main(String[] args)
{
// Create a publisher.
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
// Create a subscriber and register it with the publisher.
MySubscriber<String> subscriber = new MySubscriber<>();
publisher.subscribe(subscriber);
// Publish several data items and then close the publisher.
System.out.println("Publishing data items...");
String[] items = { "jan", "feb", "mar", "apr", "may", "jun",
"jul", "aug", "sep", "oct", "nov", "dec" };
Arrays.asList(items).stream().forEach(i -> publisher.submit(i));
publisher.close();
try
{
synchronized("A")
{
"A".wait();
}
}
catch (InterruptedException ie)
{
}
}
}
class MySubscriber<T> implements Subscriber<T>
{
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription)
{
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item)
{
System.out.println("Received: " + item);
subscription.request(1);
}
@Override
public void onError(Throwable t)
{
t.printStackTrace();
synchronized("A")
{
"A".notifyAll();
}
}
@Override
public void onComplete()
{
System.out.println("Done");
synchronized("A")
{
"A".notifyAll();
}
}
}
I use Object
's wait()
and notifyAll()
methods to cause the main thread (that runs the main()
method) to wait until onComplete()
is finished. Otherwise, you'll probably not observe any subscriber output.
Compile Listing 1 as follows:
javac FlowDemo.java
Run the application as follows:
java FlowDemo
You should observe the following output:
Publishing data items...
Received: jan
Received: feb
Received: mar
Received: apr
Received: may
Received: jun
Received: jul
Received: aug
Received: sep
Received: oct
Received: nov
Received: dec
Done
CompletableFuture enhancements
Java 8 introduced the CompletableFuture<T>
class, which is a java.util.concurrent.Future<T>
that may be explicitly completed (setting its value and status), and may be used as a java.util.concurrent.CompletionStage
, supporting dependent functions and actions that are triggered upon the future's completion. Java 9 introduces several enhancements to CompletableFuture
:
- support for delays and timeouts
- improved support for subclassing
- new factory methods
Support for delays and timeouts
Java 9 extends CompletableFuture
with time-based methods that enable a future to complete with a value or exceptionally after a certain duration:
CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit)
: Completes thisCompletableFuture
with the givenvalue
when not otherwise completed before the giventimeout
(expressed inunit
java.util.concurrent.TimeUnit
units -- such asMILLISECONDS
). Returns thisCompletableFuture
.CompletableFuture<T> orTimeout(long timeout, TimeUnit unit)
: Exceptionally completes thisCompletableFuture
with ajava.util.concurrent.TimeoutException
when not otherwise completed before the giventimeout
. Returns thisCompletableFuture
.
Suppose we create a CompletableFuture
for fetching theatre recommendations from some recommendation service. We introduce a way to load static recommendations when the service cannot provide the expected result in a timely manner:
Supplier<List<Theatre>> invokeRecommendationService = ...
CompletableFuture.supplyAsync(invokeRecommendationService)
.completeOnTimeout(Collections.singletonList(cats), 1, TimeUnit.SECONDS)
.thenAccept(showRecommendationsToUser);
If the recommendation service gives us recommendations in less than a second, we show these recommendations to the user. Otherwise, we recommend that the user check out the Cats production (assuming that it's playing). The new completeOnTimeout(T value, long timeout, TimeUnit unit)
method completes the CompletableFuture
with the given value
when it’s not completed before the given timeout
.
Perhaps you don't want to offer a static recommendation. In this case, you can raise a TimeoutException
via the orTimeout(long timeout, TimeUnit unit)
method:
CompletableFuture.supplyAsync(invokeRecommendationService)
.orTimeout(1, TimeUnit.SECONDS)
.thenAccept(showRecommendationsToUser);
Additionally, a pair of delayedExecutor()
static
methods have been added. Each method returns a java.util.concurrent.Executor
that allows a task to execute after a certain duration:
Executor delayedExecutor(long delay, TimeUnit unit)
Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
Improved support for subclassing
Various enhancements have been made to CompletableFuture
to make it easier to extend this class. For example, you might want to override the new Executor defaultExecutor()
method to support an alternative default executor.