Reactive Streams are a concept that defines the mechanism for handling streams asynchronously with non-blocking back pressure. Back pressure here, we can understand that there is too much work to do at the same time, then it should lead to overload. Non-blocking back pressure means avoiding the fact that at the same time, too much work is involved, at a time we only handle some tasks. As soon as these tasks are finished, we can receive new other tasks … In this tutorial, I will present the concept of Reactive Streams in Java for you all!
The main idea of Reactive Streams is:
- We have a Publisher to broadcast the information.
- We have one or more Subscribers to receive the information that Publisher publishes.
- And a Subscription bridges the gap between Publisher and Subscribers.
Publisher connects with Subscribers in principle:
- The Publisher will add all Subscribers that it needs to notify.
- Subscribers will receive all the information added to the Publisher.
- Subscribers will request and process one or more information from the Publisher asynchronously through the Subscription object.
- When a Publisher has information to publish, this information is sent to all Subscribers who are requesting it.
From version 9, Java supports us to create Reactive Streams applications by introducing three interfaces Flow.Publisher, Flow.Subscriber, Flow.Subscription, and a class named SubmissionPublisher that implements the Flow.Publisher interface. Each interface will play a different role, corresponding to the principles of Reactive Streams.
To better understand Reactive Streams in Java, I will make an example using these interfaces:
First, I will create an object that contains the information that the Publisher needs inform to Subscribers.
This object is called Student with simple content as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
package com.huongdanjava.reactivestreams; public class Student { private String name; public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "Student{" + "name='" + name + '\'' + '}'; } } |
The next object would be a class implement interface Subscriber.
Interface Subscriber has four methods:
- The onComplete() method: This method is called when the Publisher object completes its role.
- The onError() method: This method is called when something goes wrong in the Publisher and is notified to the Subscriber.
- The onNext() method: This method is called whenever the Publisher has new information to be notified to all Subscribers.
- The onSubscribe() method: This method is called when Publisher adds Subscriber.
The content of this object is as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
package com.huongdanjava.reactivestreams; import java.util.concurrent.Flow; public class Consumer implements Flow.Subscriber { public void onSubscribe(Flow.Subscription subscription) { System.out.println("Consumer: onSubscribe"); } public void onNext(Object item) { System.out.println("Consumer: onNext" + item); } public void onError(Throwable throwable) { System.out.println("Consumer: onError"); } public void onComplete() { System.out.println("Consumer: onComplete"); } } |
Now, I will create the main() Example class to run the example.
This Example class will create a new Publisher using the SubmissionPublisher class, subscribe to the Consumer object, and submit 10 Student objects with a time interval of 1s.
Interface Publisher has a method as follows:
- The subscribe() method: This method takes the parameter as a Subscriber object. The Publisher will put this Subscriber object in the list of objects it needs to notify whenever new information is added.
The content of the Example class is as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
package com.huongdanjava.reactivestreams; import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.TimeUnit; public class Example { public static void main(String[] args) { Consumer c = new Consumer(); SubmissionPublisher<Student> sp = new SubmissionPublisher<>(); sp.subscribe(c); for (int i = 0; i < 10; i++) { Student student = new Student(); student.setName("Student " + i); sp.submit(student); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } sp.close(); } } |
At this time, when running the application you only see the results as follows;
This is because the Publisher only calls the Subscriber’s onSubscribe() method to add it to the list of objects to be notified. Subscriber not yet declared to request information from Publisher.
Now we will use the Subscription object in the onSubscribe() method parameter in the Subscriber object to request the Publisher to retrieve the information.
Interface Subscription has a method as follows:
- The request() method: This method is used by the Subscriber object to request information from the Publisher.
We will revise the onSubscribe() method of the Consumer object as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
package com.huongdanjava.reactivestreams; import java.util.concurrent.Flow; public class Consumer implements Flow.Subscriber { public void onSubscribe(Flow.Subscription subscription) { System.out.println("Consumer: onSubscribe"); subscription.request(2); } public void onNext(Object item) { System.out.println("Consumer: onNext" + item); } public void onError(Throwable throwable) { System.out.println("Consumer: onError"); } public void onComplete() { System.out.println("Consumer: onComplete"); } } |
Here, I only request 2 Student objects added to Publisher.
Result:
To request all Student objects each time they are added to Publisher, you can modify the onNext () method using the Subscription object to request.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
package com.huongdanjava.reactivestreams; import java.util.concurrent.Flow; public class Consumer implements Flow.Subscriber { private Flow.Subscription subscription; public void onSubscribe(Flow.Subscription subscription) { System.out.println("Consumer: onSubscribe"); this.subscription = subscription; subscription.request(2); } public void onNext(Object item) { System.out.println("Consumer: onNext" + item); subscription.request(1); } public void onError(Throwable throwable) { System.out.println("Consumer: onError"); } public void onComplete() { System.out.println("Consumer: onComplete"); } } |
Then, you will see when the Publisher adds a new Student, the Subscriber can get that Student immediately:
As you can see, when the Publisher completes its work, the onComplete() method in the Subscriber object will be called.
Also, in the Subscription object, we also have a cancel() method so that Subscribers can discard the message from the Publisher if they want.