In the past, when working with Reactive applications that use database, we would usually use a MongoDB database because very few databases supported the Reactive mechanism except MongoDB. But now you do not need to use MongoDB database anymore, Reactive Relational Database Connectivity (R2DBC) will help us work with many other relational database systems according to Reactive mechanism. R2DBC is a spec that defines how we will work with relational databases like MySQL, PostgreSQL, … according to Reactive mechanism. It provides us with a Service Provider Interface (SPI) that we can implement driver for each respective database system. There are many drivers that implement this SPI, for example, R2DBC PostgreSQL implements R2DBC for PostgreSQL database, similarly, we also have R2DBC MySQL support for MySQL database, … In this tutorial, I will introduce to you all about R2DBC with an example of how to use the R2DBC PostgreSQL driver to manipulate the PostgreSQL database!
You can see details of the specs that R2DBC defines here.
First, I will create a new Maven project:
with R2DBC PostgreSQL driver dependency as follows:
1 2 3 4 5 |
<dependency> <groupId>io.r2dbc</groupId> <artifactId>r2dbc-postgresql</artifactId> <version>0.8.13.RELEASE</version> </dependency> |
Similar to when you work with JDBC, the first thing we need to do with any database is setup a connection to it. With R2DBC, we will use the ConnectionFactory object to do this.
There are two ways we can do this in R2DBC:
- Using the connection URL
- Using directly Java code
With the connection URL, you can use the following URL for PostgreSQL database (for other database systems, the connection URL will be different a bit!):
1 |
r2dbc:postgresql://<username>:<password>@<host>:<port>/<database> |
Inside:
- host is PostgreSQL server,
- port is the running PostgreSQL port. PostgreSQL’s default port is 5432.
- database is the name of the database we will be working with.
- username is the user logged into the PostgreSQL database
- password is the password of the user
Then use the ConnectionFactories object to get the ConnectionFactory object from this connection URL:
1 2 |
ConnectionFactory connectionFactory = ConnectionFactories .get("r2dbc:postgresql://khanh:123456@localhost:5432/test"); |
If you use Java code, you can use the Builder object in the ConnectionFactoryOptions class to do this:
1 2 3 4 5 6 7 8 |
ConnectionFactoryOptions options = ConnectionFactoryOptions.builder() .option(ConnectionFactoryOptions.DRIVER, "postgresql") .option(ConnectionFactoryOptions.HOST, "localhost") .option(ConnectionFactoryOptions.USER, "postgre") .option(ConnectionFactoryOptions.PASSWORD, "123456") .option(ConnectionFactoryOptions.DATABASE, "test").build(); ConnectionFactory connectionFactory = ConnectionFactories.get(options); |
After we have a ConnectionFactory object, we can create a Publisher of connections to the database by calling:
1 |
Publisher<? extends Connection> publisher = connectionFactory.create(); |
Initialize the Project Reactor Flux object to subscribe to this Publisher object:
1 |
Flux<Connection> flux = Flux.from(publisher); |
Now, we can use the connection from this Flux object to manipulate the database, for example:
1 |
flux.flatMap(connection -> connection.createStatement("<sql_script_here>").execute()); |
Here, we use the createStatement() method to pass the query you want to use to the Connection execute object and return the results.
Now, for example, in my PostgreSQL database server test database, there is a table named student with the column name and address as follows:
1 2 3 4 |
CREATE TABLE student ( name VARCHAR(50), address VARCHAR(100) ) |
To insert a new record into this table with R2DBC, I will write the following code:
1 2 3 4 |
flux.flatMap(connection -> connection.createStatement("INSERT INTO student (name, address) VALUES ($1, $2)") .bind("$1", "Khanh") .bind("$2", "Ho Chi Minh") .execute()); |
As you can see, we can bind the value we want to the query using the bind() method of the Statement object and after we have bound the data, we can call the execute() method of this Statement object to run the SQL statement.
The result after executing the SQL statement will be returned in the Result object of R2DBC, we must consume this Result object in order to fully execute this SQL statement. Therefore, we need to write more code like this:
1 2 3 4 5 |
flux.flatMap(connection -> connection.createStatement("INSERT INTO student (name, address) VALUES ($1, $2)") .bind("$1", "Khanh") .bind("$2", "Ho Chi Minh") .execute()); .flatMap(Result::getRowsUpdated) |
The getRowsUpdated() method will return the number of records that are updated after the execution of the SQL statement. The Result object also contains another method called map() that allows us to get the data returned from the database, with a SELECT statement, for example.
We need to call the subscribe() method to subscribe to Publisher.
The entire code could be rewritten as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
package com.huongdanjava.r2dbcpostgresql; import io.r2dbc.spi.ConnectionFactories; import io.r2dbc.spi.ConnectionFactory; import io.r2dbc.spi.Result; import reactor.core.publisher.Flux; public class Application { public static void main(String[] args) throws InterruptedException { ConnectionFactory connectionFactory = ConnectionFactories .get("r2dbc:postgresql://khanh:123456@localhost:5432/test"); Flux.from(connectionFactory.create()) .flatMap(connection -> connection.createStatement("INSERT INTO student (name, address) VALUES ($1, $2)") .bind("$1", "khanh") .bind("$2", "Ho Chi Minh") .execute()) .flatMap(Result::getRowsUpdated) .doOnNext(s -> System.out.println(s)) .subscribe(); Thread.sleep(5000); } } |
Here, I have used the doOnNext() method to get information about the number of records that are updated in the database. And since we need to wait for the program to finish running and then exit, we also use Thread.sleep() for 5 seconds to do this.
The results when running the above code are as follows:
Check database: