Thông thường chúng ta hay viết ứng dụng Reactive chạy với main thread nhưng sẽ có những trường hợp chúng ta cần sử dụng multithread, ví dụ như sau khi nhận một item dữ liệu từ Publisher, Consumer cần process một thao tác khác với item dữ liệu đó, thao tác này chúng ta có thể chạy trên một thread khác để tăng hiệu suất. Trong Project Reactor, để sử dụng multithread chúng ta có thể sử dụng đối tượng Scheduler. Cụ thể như thế nào? Chúng ta hãy cùng nhau tìm hiểu trong bài viết này các bạn nhé!
Đầu tiên, mình sẽ tạo mới một Maven project để làm ví dụ:
với Project Reactor dependency như sau:
1 2 3 4 5 |
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.5.7</version> </dependency> |
Trong ví dụ của bài viết này, mình sẽ tạo ra một ứng dụng nhỏ với Publisher có thể phát ra các con số, sau đó chúng ta sẽ sử dụng multithread để nhân đôi giá trị của những số này, cuối cùng sẽ trả về cho người dùng kết quả cuối cùng sau khi xử lý.
Nhưng đầu tiên hãy xem xem code của chúng ta sẽ như thế nào trong trường hợp chúng ta không sử dụng multithread các bạn nhé. Code sẽ như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
package com.huongdanjava.projectreactor; import reactor.core.publisher.Flux; public class Application { public static void main(String[] args) { Flux.just(5, 10, 23, 25).log() .flatMap(n -> Flux.just(n * 2)) .subscribe(System.out::println); } } |
Ở đây, mình đã sử dụng đối tượng Flux để phát ra một số number và sử dụng phương thức flatMap() để chuyển đổi giá trị đó thành gấp đôi và sử dụng đối tượng Flux khác để phát ra giá trị sau khi chuyển đổi cho người dùng. Người dùng Consumer sẽ subscribe giá trị cuối cùng sau khi chuyển đổi để in ra console.
Kết quả khi chạy ví dụ trên như sau:
Như các bạn thấy, code của chúng ta hiện tại chỉ đang chạy trên main thread mà thôi.
Bây giờ chúng ta sẽ sử dụng đối tượng Scheduler để chạy multithread xem sao các bạn nhé!
Có 2 phương thức giúp chúng ta có thể chạy ứng dụng Reactive với Project Reactor sử dụng multithread đó là subscribeOn() và publishOn(). Sự khác nhau giữa 2 phương thức này đó là: phương thức subscribeOn() sẽ quyết định quá trình subscription sẽ diễn ra trong thread nào (main thread hay là một thread mới) còn phương thức publishOn() sẽ quyết định phần consumer cũng với ý nghĩa tương tự.
Tham số của 2 phương thức này là đối tượng Scheduler như mình đã nói ở trên. Đối tượng này sẽ return về thread mà chúng ta cần. Chúng ta có thể sử dụng class Schedulers để khởi tạo cho đối tượng Scheduler, với một số strategy như sau:
- Schedulers.parallel(): sử dụng một fixed threadpool để thực thi các tác vụ song song.
- Schedulers.immediate(): sử dụng current thread để thực thi tác vụ
- Schedulers.boundedElastic(): cơ chế thread pool với số lượng thread bằng 10 nhân với số lượng CPU cores mà máy tính của bạn đang có.
- Schedulers.fromExecutor(): sử dụng custom Executor của Java.
- Schedulers.single(): tương tự như Schedulers.parallel() nhưng ở đây chỉ có 1 thread duy nhất được sử dụng để thực thi tác vụ.
OK, giả sử bây giờ mình thêm phương thức subscribeOn() vào ví dụ trên của chúng ta như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
package com.huongdanjava.projectreactor; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; public class Application { public static void main(String[] args) throws InterruptedException { Flux.just(5, 10, 23, 25).log() .flatMap(n -> Flux.just(n * 2)).subscribeOn(Schedulers.boundedElastic()) .subscribe(value -> System.out.println(Thread.currentThread().getName() + " : " + value)); Thread.sleep(10000); } } |
thì khi chạy, kết quả sẽ như sau:
Rõ ràng như các bạn thấy, code của chúng ta đã không còn chạy trên thread main nữa.
Nếu bây giờ các bạn sửa method subscribeOn() thành publishOn():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
package com.huongdanjava.projectreactor; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; public class Application { public static void main(String[] args) throws InterruptedException { Flux.just(5, 10, 23, 25).log() .flatMap(n -> Flux.just(n * 2)).publishOn(Schedulers.boundedElastic()) .subscribe(value -> System.out.println(Thread.currentThread().getName() + " : " + value)); Thread.sleep(10000); } } |
thì các bạn sẽ thấy kết quả như sau:
Như các bạn thấy, lúc này consumer của chúng ta đã chạy ở thread “boundedElastic-1” chứ không còn là thread main như trước nữa.