Usually, we write Reactive applications that will run on the main thread but there are cases where we need to use multithread. For example, after receiving an item from the Publisher, the Consumer needs to process something different from that data item. With that manipulation, we can run on another thread to increase performance. In Project Reactor, to use multithread we can use the Scheduler object. How is it in detail? Let’s find out in this tutorial.
First, I will create a new Maven project as an example:
with Project Reactor dependency as follows:
1 2 3 4 5 |
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.5.7</version> </dependency> |
In the example of this tutorial, I am going to create a small application with Publisher that can generate numbers, then we will use multithread to duplicate the values of these numbers, eventually going back to the user the results after processing.
But first, let’s see what our code will look like in case we do not use multithread. The code will look like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
package com.huongdanjava.projectreactor; import reactor.core.publisher.Flux; public class Application { public static void main(String[] args) { Flux.just(5, 10, 23, 25).log() .flatMap(n -> Flux.just(n * 2)) .subscribe(System.out::println); } } |
Here, I used the Flux object to emit a number and used the flatMap() method to convert that value to double, and use the other Flux object to output the value after the conversion to the user. The Consumer will subscribe to the final value after conversion for printing to the console.
The result of running the above example is as follows:
As you can see, our code is currently running on the main thread only.
Now we will use the Scheduler object to run multithread.
There are two methods that help we can run Reactive applications with Project Reactor using multithread: subscribeOn() and publishOn(). The difference between these two methods is that the subscribeOn() method determines whether the subscription process will take place in which thread (main thread or new thread), and the publishOn() method will determine the consumer in a similar meaning.
The parameters of these two methods are the Scheduler object as I said above. This object will return the thread that we need. We can use the Schedulers class to initialize the Scheduler object, with some strategies as follows:
- Schedulers.parallel(): Use a fixed thread pool to perform parallel tasks.
- Schedulers.immediate(): Use the current thread to execute a task
- Schedulers.boundedElastic(): thread pool mechanism with the number of threads equal to 10 times the number of CPU cores your computer has.
- Schedulers.fromExecutor(): use custom Executor of Java.
- Schedulers.single(): Same as Schedulers.parallel() but here only 1 thread to execute tasks.
OK, let’s say I now add the subscribeOn() method to our example above:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
package com.huongdanjava.projectreactor; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; public class Application { public static void main(String[] args) throws InterruptedException { Flux.just(5, 10, 23, 25).log() .flatMap(n -> Flux.just(n * 2)).subscribeOn(Schedulers.boundedElastic()) .subscribe(value -> System.out.println(Thread.currentThread().getName() + " : " + value)); Thread.sleep(10000); } } |
when running, the result will look like this:
Obviously, as you can see, our code no longer runs on the main thread anymore.
If you now modify the subscribeOn() method to publishOn():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
package com.huongdanjava.projectreactor; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; public class Application { public static void main(String[] args) throws InterruptedException { Flux.just(5, 10, 23, 25).log() .flatMap(n -> Flux.just(n * 2)).publishOn(Schedulers.boundedElastic()) .subscribe(value -> System.out.println(Thread.currentThread().getName() + " : " + value)); Thread.sleep(10000); } } |
You will see the results as follows:
As you can see, our consumer now runs at the “boundedElastic-1” thread, not the main thread anymore.