Reactive Streams là một khái niệm định nghĩa cơ chế để xử lý stream một cách bất đồng bộ (asynchronous) với non-blocking back pressure. Back pressure ở đây chúng ta có thể hiểu nôm na có quá nhiều công việc phải xử lý cùng một lúc nên dẫn tới quá tải. Non-blocking back pressure có nghĩa là để tránh cái việc mà cùng một lúc quá nhiều công việc ập tới, trong 1 thời điểm chúng ta chỉ xử lý một số công việc nào đó thôi, khi nào xử lý xong những công việc đó thì mới nhận tiếp những công việc mới…Trong bài viết này, mình sẽ trình bày về khái niệm Reactive Streams trong Java các bạn nhé!
Ý tưởng chính của Reactive Streams là:
- Chúng ta có một Publisher để phát ra các thông tin.
- Chúng ta có một hoặc nhiều Subscribers tiếp nhận các thông tin mà Publisher phát ra.
- Và một Subscription làm cầu nối giữa Publisher và Subscribers.
Giữa Publisher và Subscribers kết nối với nhau theo nguyên tắc:
- Publisher sẽ add tất cả Subscribers mà nó cần notify.
- Subscribers sẽ nhận tất cả các thông tin được thêm vào Publisher.
- Subscribers sẽ request để yêu cầu và xử lý một hoặc nhiều thông tin từ Publisher theo kiểu bất đồng bộ thông qua đối tượng Subscription.
- Khi một Publisher có một thông tin để publish, thông tin này sẽ được gửi tới tất cả các Subscribers đang yêu cầu.
Từ phiên bản 9, Java hỗ trợ chúng ta tạo ra các ứng dụng Reactive Streams bằng cách giới thiệu 3 interfaces Flow.Publisher, Flow.Subscriber, Flow.Subscription và 1 class tên là SubmissionPublisher hiện thực interface Flow.Publisher. Mỗi interface sẽ đóng vai trò khác nhau, tương ứng với những nguyên tắc của Reactive Streams.
Để các bạn hiểu rõ hơn về Reactive Streams trong Java, mình sẽ làm 1 ví dụ sử dụng các interface trên như sau:
Đầu tiên, mình sẽ tạo ra một đối tượng chứa các thông tin mà Publisher cần thông báo tới cho Subscriber.
Đối tượng này có tên là Student với nội dung đơn giản như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
package com.huongdanjava.reactivestreams; public class Student { private String name; public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "Student{" + "name='" + name + '\'' + '}'; } } |
Đối tượng tiếp theo sẽ là một class implement interface Subscriber.
Interface Subscriber có bốn phương thức:
- Phương thức onComplete(): Phương thức này được gọi khi đối tượng Publisher hoàn thành vai trò của mình.
- Phương thức onError(): Phương thức này được gọi khi một lỗi gì đó đã xảy ra ở Publisher và được thông báo tới Subscriber.
- Phương thức onNext(): Phương thức này được gọi bất cứ khi nào Publisher có thông tin mới cần được thông báo tới tất cả Subscriber.
- Phương thức onSubscribe(): Phương thức này được gọi khi Publisher thêm Subscriber.
Nội dung của đối tượng này tạm thời như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
package com.huongdanjava.reactivestreams; import java.util.concurrent.Flow; public class Consumer implements Flow.Subscriber { public void onSubscribe(Flow.Subscription subscription) { System.out.println("Consumer: onSubscribe"); } public void onNext(Object item) { System.out.println("Consumer: onNext" + item); } public void onError(Throwable throwable) { System.out.println("Consumer: onError"); } public void onComplete() { System.out.println("Consumer: onComplete"); } } |
Bây giờ, mình sẽ tạo main() class Example để chạy ví dụ.
Class Example này sẽ tạo mới một Publisher sử dụng class SubmissionPublisher, subscribe đối tượng Consumer ở trên và submit 10 đối tượng Student với khoảng thời gian giữa các lần là 1s.
Interface Publisher có một phương thức như sau:
- Phương thức subscribe(): Phương thức này có tham số là một đối tượng Subscriber. Publisher sẽ put đối tượng Subscriber này vào trong danh sách các đối tượng mà nó cần thông báo mỗi khi có thông tin mới được thêm vào.
Nội dung của class Example như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
package com.huongdanjava.reactivestreams; import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.TimeUnit; public class Example { public static void main(String[] args) { Consumer c = new Consumer(); SubmissionPublisher<Student> sp = new SubmissionPublisher<>(); sp.subscribe(c); for (int i = 0; i < 10; i++) { Student student = new Student(); student.setName("Student " + i); sp.submit(student); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } sp.close(); } } |
Lúc này, khi chạy ứng dụng các bạn chỉ thấy kết quả như sau;
Đó là bởi vì Publisher chỉ gọi đến phương thức onSubscribe() của Subscriber để thêm nó vào danh sách các đối tượng cần thông báo. Subscriber chưa được khai báo để request thông tin từ Publisher.
Bây giờ chúng ta sẽ sử dụng đối tượng Subscription trong tham số của phương thức onSubscribe của đối tương Subscriber để request tới Publisher để lấy thông tin.
Interface Subscription có một phương thức như sau:
- Phương thức request(): Phương thức này được sử dụng bởi đối tượng Subscriber để request thông tin từ Publisher.
Giờ mình sẽ sửa lại phương thức onSubscribe() của đối tượng Consumer như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
package com.huongdanjava.reactivestreams; import java.util.concurrent.Flow; public class Consumer implements Flow.Subscriber { public void onSubscribe(Flow.Subscription subscription) { System.out.println("Consumer: onSubscribe"); subscription.request(2); } public void onNext(Object item) { System.out.println("Consumer: onNext" + item); } public void onError(Throwable throwable) { System.out.println("Consumer: onError"); } public void onComplete() { System.out.println("Consumer: onComplete"); } } |
Ở đây, mình chỉ request 2 đối tượng Student được thêm vào Publisher khi subscribe Consumer vào Publisher.
Kết quả:
Để request tất cả các đối tượng Student mỗi khi chúng được add vào Publisher, các bạn có thể modify phương thức onNext() sử dụng đối tượng Subscription để request.
Ví dụ như:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
package com.huongdanjava.reactivestreams; import java.util.concurrent.Flow; public class Consumer implements Flow.Subscriber { private Flow.Subscription subscription; public void onSubscribe(Flow.Subscription subscription) { System.out.println("Consumer: onSubscribe"); this.subscription = subscription; subscription.request(2); } public void onNext(Object item) { System.out.println("Consumer: onNext" + item); subscription.request(1); } public void onError(Throwable throwable) { System.out.println("Consumer: onError"); } public void onComplete() { System.out.println("Consumer: onComplete"); } } |
Khi đó, các bạn sẽ thấy mỗi khi Publisher cập nhập mới một Student, Subscriber có thể nhận được Student đó liền:
Như các bạn thấy, khi Publisher hoàn thành công việc của mình thì phương thức onComplete() trong đối tượng Subscriber sẽ được gọi.
Ngoài ra, trong đối tượng Subscription, chúng ta còn có phương thức cancel() để Subscriber có thể huỷ việc nhận message từ Publisher.