Trong bài viết trước, mình đã giới thiệu với các bạn về Reactive Streams trong Java hiện thực Reactive Streams specification. Trong bài viết này, mình sẽ giới thiệu với các bạn một thư viện nữa cũng hiện thực Reactive Stream specification luôn: đó chính là Project Reactor. Cụ thể nó như thế nào? Các bạn hãy tiếp tục đọc nhé!
Đầu tiên, như mọi tutorial khác, mình cũng sẽ tạo một Maven project để làm ví dụ:
Để làm việc với Project Reactor, chúng ta cần thêm dependency cho nó:
1 2 3 4 5 |
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.5.9</version> </dependency> |
Vì là hiện thực Reactive Streams specification nên trong Project Reactor chúng ta cũng có các khái niệm Publisher, Subscriber và Subscription nhưng với tên gọi khác.
Trong Project Reactor, chúng ta có hai đối tượng đóng vai trò là Publisher:
- Một là đối tượng Flux: đối tượng này có thể publish từ 0 tới N items.
- Hai là đối tượng Mono: đối tượng này thì chỉ có thể publish 1 item hoặc là không có item nào.
Sự khác nhau giữa 2 đối tượng này nằm ở chỗ: nếu code của bạn return Mono object thì chúng ta có thể biết rằng đối tượng này chỉ có thể publish tối đa là 1 item, còn đối với đối tượng Flux thì chúng ta phải code làm sao để ứng dụng của chúng ta có thể handle đến tối đa N items. Từ đó chúng ta có cách phát triển ứng dụng của chúng ta một cách phù hợp.
Đối với Subscriber thì chúng ta có thể dùng một abstract class tên là BaseSubscriber để hiện thực Subscriber. Abstract class này có định nghĩa sẵn các phương thức tương tự như trong Java như hookOnSubscribe(), hookOnNext(), hookOnComplete(), hookOnError(), hookOnCancel() và hookFinally() giúp chúng ta hiện thực phần Subscriber trong một ứng dụng Reactive Streams.
OK, bây giờ chúng ta thử làm một ví dụ xem sao nhé các bạn!
Đầu tiên, mình sẽ định nghĩa một đối tượng extend BaseSubscriber để hiện thực phần Subscriber.
Đối tượng của mình như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
package com.huongdanjava.projectreactor; import org.reactivestreams.Subscription; import reactor.core.publisher.BaseSubscriber; public class Consumer extends BaseSubscriber<String> { @Override protected void hookOnSubscribe(Subscription subscription) { System.out.println("Consumer: hookOnSubscribe."); request(1); } @Override protected void hookOnNext(String value) { System.out.println("Consumer: hookOnNext " + value); request(1); } } |
Trong đối tượng Consumer này, ngay khi Publisher subscribe, nó sẽ request để lấy item đầu tiên liền. Và mỗi khi Publisher publish một dữ liệu mới thì nó cũng request để lấy dữ liệu đó.
Tiếp theo mình sẽ định nghĩa một Publisher sử dụng đối tượng Flux để publish dữ liệu.
Flux có nhiều phương thức static để tạo dữ liệu như just(), fromArray(), fromIterable(), …. Trong bài viết này, mình chỉ sử dụng đơn giản như sau:
1 |
Flux<String> flux = Flux.just("Khanh", "Quan", "Thanh"); |
Bước cuối cùng là subscribe đối tượng Consumer của chúng ta vào Publisher.
1 |
flux.subscribe(new Consumer()); |
Toàn bộ code như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
package com.huongdanjava.projectreactor; import java.util.concurrent.TimeUnit; import reactor.core.publisher.Flux; public class Example { public static void main(String[] args) throws InterruptedException { Flux<String> flux = Flux.just("Khanh", "Quan", "Thanh"); flux.subscribe(new Consumer()); TimeUnit.SECONDS.sleep(10); } } |
Kết quả: