Request/Reply pattern là một pattern mô tả cách các ứng dụng có thể gửi và nhận message asynchronously. Pattern này định nghĩa 2 queue trong Message Broker, một queue sẽ đảm nhận việc nhận message, queue còn lại sẽ đảm nhận message cho việc response.
Service A sẽ gửi request tới Service B thông qua Request Queue. Service B khi nhận message từ Request Queue sẽ process request message và gửi trả lại kết quả cho Service A thông qua Reply Queue.
Có 2 cách để implement Request/Reply pattern này:
- Tạo 2 queue trong Message Broker dành cho việc gửi và nhận message, request queue và reply queue. Request queue để nhận và reply queue để response. Application muốn gửi request tới một ứng dụng khác, có thể gửi request message tới request queue. Ứng dụng khác đó hoặc một service nào đó, sẽ subscribe để nhận message từ request queue, process message và để trả về kết quả, sẽ publish một message tới reply queue. Application sẽ subscribe vào reply queue để nhận message. Để identify response message là dành cho request message nào, thông thường chúng ta sẽ gửi thêm thông tin về Correlation ID cùng với nội dung message. Khi request message được process, message response sẽ cũng được publish cùng với Correlation ID này để application có thể process cho đúng các message request và response tương ứng.
- Một cách khác để implement Request/Reply pattern đó là chỉ có một request queue được định nghĩa cố định trong Message Broker. Message Broker sẽ tạo mới một reply queue tạm thời cho từng request message. Message Broker sẽ trả về thông tin của reply queue tạm thời này sau khi nhận message của application. Application khi nhận thông tin về reply queue tạm thời sẽ subscribe vào queue đó để nhận reply message.
Các bạn có thể sử dụng ActiveMQ hoặc RabbitMQ để làm message broker cho phần implementation của pattern này!
Trong các phần tiếp theo, mình sẽ làm các ví dụ nhỏ để xem Request/Reply pattern này hoạt động như thế nào các bạn nhé!
Request/Reply pattern với Correlation ID
Mình sẽ tạo một Maven project để làm ví dụ và sẽ sử dụng ActiveMQ Artemis làm Message Broker cho ví dụ này.
Chúng ta sẽ khai báo dependency của ActiveMQ Artemis như sau:
1 2 3 4 5 |
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>artemis-jms-client-all</artifactId> <version>2.41.0</version> </dependency> |
Các steps để gửi và nhận message với Request/Reply pattern sử dụng Correlation ID cho ActiveMQ sẽ như sau nha các bạn:
- Client hay còn gọi là producer sẽ gửi message tới request queue
- Client sẽ khai báo reply queue với setting JMSReplyTo và gán JMSCorrelationId trong request message
- Server hay còn gọi là consumer sẽ nhận message từ request queue, xử lý message này rồi gửi lại kết quả tới JMSReplyTo queue. Message gửi tới JMSReplyTo queue sẽ có JMSCorrelationId giống với lúc client gửi lên.
Requester
Mình sẽ tạo mới class main để làm ví dụ:
1 2 3 4 5 6 7 8 |
package com.huongdanjava.eda.requestreply; public class Requester { public static void main(String[] args) { } } |
Đầu tiên, mình sẽ tạo mới connection tới ActiveMQ Artemis bằng cách sử dụng class ActiveMQConnectionFactory của ActiveMQ Artemis như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
package com.huongdanjava.eda.requestreply; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Session; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; public class Requester { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); try (Connection connection = connectionFactory.createConnection("artemis", "artemis")) { connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination requestQueue = session.createQueue("eda.request.queue"); Destination replyQueue = session.createQueue("eda.reply.queue"); } } } |
Class ActiveMQConnectionFactory được hiện thực sử dụng Factory Pattern. Như các bạn thấy ở trên, tham số để khởi tạo đối tượng của class ActiveMQConnectionFactory là brokerURL, là thông tin của ActiveMQ Artemis server đó các bạn! Sau khi khởi tạo đối tượng factory xong, các bạn có thể tạo mới đối tượng Connection với thông tin credentials, mặc định như mình để ở trên là “artemis/artemis”.
Sau khi đã có connection thì mình cũng đã tạo mới session với cơ chế auto acknowledge. Điều này có nghĩa message khi đã được gửi tới queue, session sẽ tự động gửi thông báo kết quả cho producer. Chúng ta không cần làm việc này manually.
Ở đây, mình cũng đã thêm code để ứng dụng tự động tạo mới 2 queue là request queue “eda.request.queue” và reply queue “eda.reply.queue” cho chúng ta. Các bạn cũng không cần làm việc này manually luôn! Khi code chạy, các bạn sẽ thấy trong ActiveMQ Artemis, 2 address mới được tạo. Và trong mỗi address mới này, một queue cùng tên cũng được tạo.
Sau khi đã tạo các address và queue, các bạn có thể tạo mới các đối tượng MessageProducer và MessageConsumer để gửi và nhận message từ ActiveMQ Artemis:
1 2 |
MessageProducer producer = session.createProducer(requestQueue); MessageConsumer consumer = session.createConsumer(replyQueue); |
Có nhiều loại message mà các bạn có thể gửi tới queue của ActiveMQ Artemis như text message, byte message, object message, … Trong ví dụ này mình sẽ gửi một text message bằng cách sử dụng class TextMessage như sau:
1 2 3 4 |
TextMessage requestMessage = session.createTextMessage("Huong Dan Java"); String correlationId = UUID.randomUUID().toString(); requestMessage.setJMSReplyTo(replyQueue); requestMessage.setJMSCorrelationID(correlationId); |
Sau khi đã có đối tượng TextMessage, giờ thì các bạn có thể sử dụng đối tượng producer để gửi message và đối tượng consumer để nhận message như sau:
1 2 3 4 5 6 7 8 9 |
producer.send(requestMessage); System.out.println("Sent request with correlation ID: " + correlationId); Message reply = consumer.receive(5000); if (reply instanceof TextMessage) { System.out.println("Received reply: " + ((TextMessage) reply).getText()); } session.close(); |
Như vậy là chúng ta đã viết xong code của đối tượng Requester rồi đó các bạn! Toàn bộ code của class Requester sẽ như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
package com.huongdanjava.eda.requestreply; import java.util.UUID; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; public class Requester { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); try (Connection connection = connectionFactory.createConnection("artemis", "artemis")) { connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination requestQueue = session.createQueue("eda.request.queue"); Destination replyQueue = session.createQueue("eda.reply.queue"); MessageProducer producer = session.createProducer(requestQueue); MessageConsumer consumer = session.createConsumer(replyQueue); TextMessage requestMessage = session.createTextMessage("Huong Dan Java"); String correlationId = UUID.randomUUID().toString(); requestMessage.setJMSReplyTo(replyQueue); requestMessage.setJMSCorrelationID(correlationId); producer.send(requestMessage); System.out.println("Sent request with correlation ID: " + correlationId); Message reply = consumer.receive(5000); if (reply instanceof TextMessage) { System.out.println("Received reply: " + ((TextMessage) reply).getText()); } session.close(); } } } |
Bây giờ, chúng ta sẽ viết code subscribe vào request queue “eda.request.queue” để nhận message, xử lý message và trả về kết quả cho reply queue các bạn nhé!
Receiver
Tương tự như Requester, chúng ta cũng sẽ lấy connection từ factory, tạo mới session, tạo mới consumer để nhận message từ request queue như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
package com.huongdanjava.eda.requestreply; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; public class Receiver { public static void main(String[] args) throws JMSException, InterruptedException { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); try (Connection connection = factory.createConnection("artemis", "artemis")) { connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination requestQueue = session.createQueue("eda.request.queue"); MessageConsumer consumer = session.createConsumer(requestQueue); // Keep the server alive System.out.println("Server is listening..."); Thread.sleep(60000); } } } |
Sau khi đã có đối tượng consumer, các bạn có thể sử dụng phương thức setMessageListener() để nhận các message từ queue và xử lý các message đó. Ví dụ của mình như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
consumer.setMessageListener(message -> { try { if (message instanceof TextMessage) { String requestText = ((TextMessage) message).getText(); System.out.println("Server received request: " + requestText); Destination replyTo = message.getJMSReplyTo(); if (replyTo != null) { MessageProducer replyProducer = session.createProducer(replyTo); TextMessage replyMessage = session.createTextMessage("Hello: " + requestText); replyMessage.setJMSCorrelationID(message.getJMSCorrelationID()); replyProducer.send(replyMessage); replyProducer.close(); } } } catch (JMSException e) { e.printStackTrace(); } }); |
Các bạn cần phải set lại Correlation ID đã nhận từ Requester trước khi gửi kết quả tới reply queue các bạn nhé!
Toàn bộ code của class Receiver sẽ như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
package com.huongdanjava.eda.requestreply; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; public class Receiver { public static void main(String[] args) throws JMSException, InterruptedException { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); try (Connection connection = factory.createConnection("artemis", "artemis")) { connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination requestQueue = session.createQueue("eda.request.queue"); MessageConsumer consumer = session.createConsumer(requestQueue); consumer.setMessageListener(message -> { try { if (message instanceof TextMessage) { String requestText = ((TextMessage) message).getText(); System.out.println("Server received request: " + requestText); Destination replyTo = message.getJMSReplyTo(); if (replyTo != null) { MessageProducer replyProducer = session.createProducer(replyTo); TextMessage replyMessage = session.createTextMessage("Hello, " + requestText); replyMessage.setJMSCorrelationID(message.getJMSCorrelationID()); replyProducer.send(replyMessage); replyProducer.close(); } } } catch (JMSException e) { e.printStackTrace(); } }); // Keep the server alive System.out.println("Server is listening..."); Thread.sleep(60000); } } } |
Như vậy là mình đã hoàn thành một ứng dụng nhỏ sử dụng Request/Reply pattern với Correlation ID rồi đó các bạn! Để kiểm tra kết quả, mình sẽ chạy class Receiver trước:
sau đó thì chạy class Requester, kết quả của mình như sau:
Trong phần tiếp theo, mình sẽ hướng dẫn các bạn cách hiện thực Request/Reply pattern sử dụng temporary queue các bạn nhé!