Java 9's other new enhancements, Part 6: Concurrency

A publish-subscribe framework for reactive streams, CompletableFuture, and more

Java 9 concurrency enhancements
Garry Knight (CC BY 2.0)

JEP 266: More Concurrency Updates defines an interoperable publish-subscribe framework for reactive streams, enhancements to the java.util.concurrent.CompletableFuture class, and various other improvements. This post wraps up my series on Java 9's other new enhancements by acquainting you with these concurrency enhancements.

Publish-subscribe framework for reactive streams

Java 9 includes a publish-subscribe framework for reactive streams. In this section, I first introduce the concept of reactive streams and then present this publish-subscribe framework.

Introducing reactive streams

Data processing has evolved from batch architectures that collect data and subsequently process the data after some threshold has been reached, to stream-oriented architectures that help to turn data into knowledge as quickly as possible. Stream-oriented architectures capture and process live data, and modify systems based on the processed results very quickly (typically in seconds or less). In contrast, a batch-processing system might take hours, days, or weeks to respond.

Handling streams of data (especially "live" data whose volume isn't predetermined) requires special care in an asynchronous system. The main issue is that resource consumption needs to be controlled so that a fast data source doesn't overwhelm the stream destination. Asynchrony is needed to enable the parallel use of computing resources, on collaborating network hosts or multiple CPU cores within a single machine, which can greatly speed up data processing.

The Reactive Streams initiative provides a standard for asynchronous stream processing with nonblocking back pressure. A reactive stream provides a way to signal its source to ease production of data when the stream's destination becomes overwhelmed with that data. This signaling capability is like a valve on a water pipe. Closing this valve increases back pressure (the pressure back at the source) while easing the burden on the destination.

The initiative's objective is to govern the exchange of stream data across an asynchronous boundary (such as passing data to another thread) while ensuring that the destination isn't forced to buffer arbitrary amounts of data. In other words, back pressure is an integral part of this model in order to allow the queues that mediate between threads to be bounded. Note that the communication of back pressure is handled asynchronously.

Reactive Streams focuses on finding a minimal set of interfaces, methods, and protocols for describing the operations and entities needed to achieve the objective: asynchronous streams of data with nonblocking back pressure.

Exploring the publish-subscribe framework

Java 9 supports the Reactive Streams initiative by providing a publish-subscribe framework (also known as the Flow API) that consists of the java.util.concurrent.Flow and java.util.concurrent.SubmissionPublisher classes.

Flow is a repository for four nested static interfaces whose methods establish flow-controlled components in which publishers produce data items that are consumed by one or more subscribers:

  • Publisher: A producer of data items that are received by subscribers.
  • Subscriber: A receiver of data items.
  • Subscription: Linkage between a Publisher and a Subscriber.
  • Processor: A combination of Publisher and Subscriber for specifying a data-transformation function.

A publisher publishes a stream of data items to registered subscribers and implements Flow.Publisher. This interface declares a single method, which is invoked to register a subscriber with a publisher:

void subscribe(Flow.Subscriber<? super T> subscriber)

Invoking this method registers subscriber with the publisher. However, if subscriber is already registered or the registration fails due to some policy (or other) violation, this method invokes subscriber's onError() method with an IllegalStateException object. Otherwise, subscriber's onSubscribe() method is invoked with a new Flow.Subscription object. subscribe() throws NullPointerException when null is passed to subscriber.

A subscriber subscribes to a publisher for callbacks of data items and implements Flow.Subscriber<T>. This interface declares onSubscribe() and three additional methods:

void onSubscribe(Flow.Subscription subscription)
void onComplete()
void onError(Throwable throwable)
void onNext(T item)

onSubscribe() is invoked to confirm registration. It receives a subscription argument whose methods allow requests for new data items to be made to the publisher or to request that the publisher send no more data items.

onComplete() is invoked when it's known that no additional Subscriber method invocations will occur for a Subscription that's not already terminated by error. No other Subscriber methods are called after this method.

onError(Throwable throwable) is invoked with the specified throwable upon an unrecoverable error that's encountered by the publisher or subscription. No other Subscriber methods are called after this method.

onNext() is invoked with a subscription's next item. If this method throws an exception, the resulting behavior isn't guaranteed, but may cause the subscription to be cancelled.

A subscription provides a link between a publisher and a subscriber, letting subscribers receive data items only upon request and cancel at any time. Subscriptions implement the Flow.Subscription interface, which declares two methods:

void request(long n)
void cancel()

request() adds n data items to the current unfulfilled demand for this subscription. If n is less than or equal to 0, the subscriber's onError() method is called with an IllegalArgumentException argument. Otherwise, the subscriber receives up to n additional onNext() invocations (or fewer when terminated). Passing Long.MAX_VALUE to n indicates an unbounded number of invocations.

cancel() causes the subscriber to eventually stop receiving data items. A best-effort attempt is made; additional data items may be received after cancel() is called.

Finally, a processor is a data-transformation function that operates on a stream without having to change the publisher or subscriber. One or more chained processors can be placed between publisher and subscriber to transform a data stream into another. The publisher and subscriber aren't dependent on the transformation(s) taking place. The JDK doesn't provide any concrete processors so you must create your own, by implementing the methodless Processor interface.

SubmissionPublisher implements Flow.Publisher, asynchronously issuing submitted (nonnull) data items to current subscribers until it's closed. Each current subscriber receives newly submitted data items in the same order unless drops or exceptions are encountered. SubmissionPublisher allows data item generators to act as compliant reactive streams publishers that rely on drop handling and/or blocking for flow control.

SubmissionPublisher provides three constructors for initializing a submission publisher. The simplest (noargument) constructor constructs a submission publisher that relies on the ForkJoinPool.commonPool() method to provide the asynchrony needed for delivering data items to subscribers (unless it doesn't support a parallelism level of at least two, in which case a new Thread object is created to run each task).

Listing 1 presents the source code to a FlowDemo application that demonstrates SubmissionPublisher and other aspects of Java 9's publish-subscribe framework for reactive streams.

Listing 1. FlowDemo.java

import java.util.Arrays;

import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;  

public class FlowDemo
{
   public static void main(String[] args)
   {
      // Create a publisher.

      SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
  
      // Create a subscriber and register it with the publisher.

      MySubscriber<String> subscriber = new MySubscriber<>();
      publisher.subscribe(subscriber);

      // Publish several data items and then close the publisher.

      System.out.println("Publishing data items...");
      String[] items = { "jan", "feb", "mar", "apr", "may", "jun",
                         "jul", "aug", "sep", "oct", "nov", "dec" };
      Arrays.asList(items).stream().forEach(i -> publisher.submit(i));
      publisher.close();

      try
      {
         synchronized("A")
         {
            "A".wait();
         }
      }
      catch (InterruptedException ie)
      {
      }
   }
}

class MySubscriber<T> implements Subscriber<T>
{
   private Subscription subscription;

   @Override
   public void onSubscribe(Subscription subscription)
   {
      this.subscription = subscription;
      subscription.request(1);
   }

   @Override
   public void onNext(T item)
   {
      System.out.println("Received: " + item);
      subscription.request(1);
   }

   @Override
   public void onError(Throwable t)
   {
      t.printStackTrace();
      synchronized("A")
      {
         "A".notifyAll();
      }
   }

   @Override
   public void onComplete()
   {
      System.out.println("Done");
      synchronized("A")
      {
         "A".notifyAll();
      }
   }
}

I use Object's wait() and notifyAll() methods to cause the main thread (that runs the main() method) to wait until onComplete() is finished. Otherwise, you'll probably not observe any subscriber output.

Compile Listing 1 as follows:

javac FlowDemo.java

Run the application as follows:

java FlowDemo

You should observe the following output:

Publishing data items...
Received: jan
Received: feb
Received: mar
Received: apr
Received: may
Received: jun
Received: jul
Received: aug
Received: sep
Received: oct
Received: nov
Received: dec
Done

CompletableFuture enhancements

Java 8 introduced the CompletableFuture<T> class, which is a java.util.concurrent.Future<T> that may be explicitly completed (setting its value and status), and may be used as a java.util.concurrent.CompletionStage, supporting dependent functions and actions that are triggered upon the future's completion. Java 9 introduces several enhancements to CompletableFuture:

  • support for delays and timeouts
  • improved support for subclassing
  • new factory methods

Support for delays and timeouts

Java 9 extends CompletableFuture with time-based methods that enable a future to complete with a value or exceptionally after a certain duration:

  • CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit): Completes this CompletableFuture with the given value when not otherwise completed before the given timeout (expressed in unit java.util.concurrent.TimeUnit units -- such as MILLISECONDS). Returns this CompletableFuture.
  • CompletableFuture<T> orTimeout(long timeout, TimeUnit unit): Exceptionally completes this CompletableFuture with a java.util.concurrent.TimeoutException when not otherwise completed before the given timeout. Returns this CompletableFuture.

Suppose we create a CompletableFuture for fetching theatre recommendations from some recommendation service. We introduce a way to load static recommendations when the service cannot provide the expected result in a timely manner:

Supplier<List<Theatre>> invokeRecommendationService = ...
CompletableFuture.supplyAsync(invokeRecommendationService)
                 .completeOnTimeout(Collections.singletonList(cats), 1, TimeUnit.SECONDS)
                 .thenAccept(showRecommendationsToUser);

If the recommendation service gives us recommendations in less than a second, we show these recommendations to the user. Otherwise, we recommend that the user check out the Cats production (assuming that it's playing). The new completeOnTimeout(T value, long timeout, TimeUnit unit) method completes the CompletableFuture with the given value when it’s not completed before the given timeout.

Perhaps you don't want to offer a static recommendation. In this case, you can raise a TimeoutException via the orTimeout(long timeout, TimeUnit unit) method:

CompletableFuture.supplyAsync(invokeRecommendationService)
                 .orTimeout(1, TimeUnit.SECONDS)
                 .thenAccept(showRecommendationsToUser);

Additionally, a pair of delayedExecutor() static methods have been added. Each method returns a java.util.concurrent.Executor that allows a task to execute after a certain duration:

Executor delayedExecutor(long delay, TimeUnit unit)
Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)

Improved support for subclassing

Various enhancements have been made to CompletableFuture to make it easier to extend this class. For example, you might want to override the new Executor defaultExecutor() method to support an alternative default executor.

1 2 Page 1
Page 1 of 2
InfoWorld Technology of the Year Awards 2023. Now open for entries!