Concurrency in Project Reactor with Scheduler

Usually, we write Reactive applications that will run on the main thread but there are cases where we need to use multithread. For example, after receiving an item from the Publisher, the Consumer needs to process something different from that data item. That manipulation, we can run on another thread to increase performance. In Project Reactor, to use multithread we can use the Scheduler object. How is it in details? Let’s find out in this tutorial.

First, I will create a new Maven project as an example:

Concurrency in Project Reactor with Scheduler

with Project Reactor dependency as follows:

In the example of this tutorial, I am going to create a small application with Publisher that can generate numbers, then we will use multithread to duplicate the values of these numbers, eventually going back to the user the results after processing.

But first let’s see what our code will look like in case we do not use multithread. The code will look like this:

Here, I used the Flux object to emit a number and used the flatMap() method to convert that value to double and use the other Flux object to output the value after the conversion to the user. The Consumer will subscribe to the final value after conversion for printing to the console.

The result of running the above example is as follows:

Concurrency in Project Reactor with Scheduler

As you can see, our code is currently running on the main thread only.

Now we will use the Scheduler object to run multithread.

There are two methods help we can run Reactive applications with Project Reactor using multithread: subscribeOn() and publishOn(). The difference between these two methods is that the subscribeOn() method determines whether the subscription process will take place in which thread (main thread or new thread), and the publishOn() method will determine the consumer in similar meaning.

Parameters of these two methods are the Scheduler object as I said above. This object will return the thread that we need. We can use Schedulers class to initialize the Scheduler object, with some strategies as follows:

  • Schedulers.parallel(): Use a fixed threadpool to perform parallel tasks.
  • Schedulers.elastic(): The thread pool mechanism can be expanded if the current threads is used up.
  • Schedulers.from(Executor): use custom Executor of Java.
  • Schedulers.single(): Same as Schedulers.parallel () but here only 1 thread.

OK, let’s say I now add the subscribeOn() method to our example above:

when running, the result will look like this:

Concurrency in Project Reactor with Scheduler

Obviously, as you can see, our code no longer runs on the main thread anymore.

If you now modify the subscribeOn() method to publishOn():

You will see the results as follows:

Concurrency in Project Reactor with Scheduler

As you can see, our consumer now runs at the “elastic-2” thread, not the main thread anymore.

Chia sẽ bài viết này ...Share on Facebook
0Tweet about this on Twitter
Share on LinkedIn

Add Comment