Reactive Streams in Java

Reactive Streams are a concept that defines the mechanism for handling streams asynchronously with non-blocking back pressure. Back pressure here, we can understand that there is too much work to do at the same time, then it should lead to overload. Non-blocking back pressure means to avoid the fact that at the same time too much work is involved, at a time we only handle some tasks. As soon as these task finished, we can receive new other tasks … In this tutorial, I will present the concept of Reactive Streams in Java for you all!

The main idea of ​​Reactive Streams is:

  • We have a Publisher to broadcast the information.
  • We have one or more Subscribers to receive the information that Publisher publishes.
  • And a Subscription bridges the gap between Publisher and Subscribers.

Publisher connect with Subscribers in principle:

  • Publisher will add all Subscribers that it needs to notify.
  • Subscribers will receive all the information added to Publisher.
  • Subscribers will request and process one or more information from Publisher asynchronously through the Subscription object.
  • When a Publisher has an information to publish, this information is sent to all Subscribers who are requesting it.

From version 9, Java supports us to create Reactive Streams applications by introducing three interfaces Flow.Publisher, Flow.Subscriber, Flow.Subscription, and a class named SubmissionPublisher that implements the Flow.Publisher interface. Each interface will play a different role, corresponding to the principles of Reactive Streams.

To better understand Reactive Streams in Java, I will make an example using these interfaces:

Reactive Streams in Java

  • First, I will create an object that contains the information that Publisher needs inform to Subscriber.

This object is called Student with simple content as follows:

  • The next object would be a class implement interface Subscriber.

Interface Subscriber has four methods:

  • The onComplete() method: This method is called when the Publisher object completes its role.
  • The onError() method: This method is called when something goes wrong in Publisher and is notified to the Subscriber.
  • The onNext() method: This method is called whenever Publisher has new information to be notified to all Subscribers.
  • The onSubscribe() method: This method is called when Publisher adds Subscriber.
  • Content of this object is as follows:

  • Now, I will create the main() Example class to run the example.

This Example class will create a new Publisher using the SubmissionPublisher class, subscribe to the Consumer object, and submit 10 Student objects with a time interval of 1s.

Interface Publisher has a method as follows:

  • The subscribe() method: This method takes the parameter as a Subscriber object. Publisher will put this Subscriber object in the list of objects it needs to notify whenever new information is added.
  • The content of the Example class is as follows:

At this time, when running the application you only see the results as follows;

Reactive Streams in Java

This is because Publisher only calls the Subscriber’s onSubscribe () method to add it to the list of objects to be notified. Subscriber not yet declared to request information from Publisher.

Now we will use the Subscription object in the onSubscribe() method parameter in the Subscriber object to request the Publisher to retrieve the information.

Interface Subscription has a method as follows:

  • The request() method: This method is used by the Subscriber object to request information from the Publisher.
  • We will revise the onSubscribe() method of the Consumer object as follows:

Here, I only request 2 Student objects added to Publisher.

Result:

Reactive Streams in Java

To request all Student objects each time they are added to Publisher, you can modify the onNext () method using the Subscription object to request.

For example:

Then, you will see when the Publisher add a new Student, Subscriber can get that Student immediately:

Reactive Streams in Java

As you can see, when Publisher completes its work, the onComplete () method in the Subscriber object will be called.

Also, in the Subscription object, we also have a cancel() method so that Subscribers can discard the message from the Publisher if they want.


Add Comment