Request/Reply pattern trong Event-Driven Architecture – Phần 2

Trong bài viết trước, mình đã giới thiệu với các bạn về Request/Reply pattern và cũng đã làm một ví dụ implement pattern này với Correlation ID. Ở bài viết này, mình sẽ làm một ví dụ khác, cũng implement Request/Reply pattern nhưng sẽ sử dụng temporary queue thay vì Correlation ID các bạn nhé!

Đầu tiên, mình cũng sẽ tạo mới một Maven project để làm ví dụ:

Mình sẽ sử dụng RabbitMQ để làm Message Broker thay vì ActiveMQ Artemis cho ví dụ này.

RabbitMQ Client dependency được khai báo như sau:

Với approach sử dụng temporary queue, chúng ta không cần tạo reply queue manually. Reply queue sẽ được tạo tự động, chỉ được sử dụng được 1 lần và nó sẽ tự động bị xoá sau khi sử dụng xong.

Bây giờ chúng ta sẽ implement chi tiết cho ví dụ này các bạn nhé!

Requester

Tương tự như khi làm việc với ActiveMQ Artemis, với RabbitMQ, điều đầu tiên chúng ta cần phải làm là lấy connection tới RabbitMQ server với factory pattern sử dụng class ConnectionFactory, như sau:

Khi làm việc với RabbitMQ, chúng ta sẽ cần phải làm việc với channel sau khi đã có connection tới RabbitMQ server. Nói thêm về channel thì nó giúp chúng ta không cần mở nhiều connection tới RabbitMQ server mà vẫn làm việc được với RabbitMQ server đó các bạn. Một connection, các bạn có thể tạo nhiều channel khác nhau!

Để tạo một temporary reply queue trong RabbitMQ server, các bạn có thể sử dụng phương thức queueDeclare() của đối tượng Channel như sau:

  • Tham số thứ nhất trong phương thức queueDelare() là queue name. Nếu các bạn để empty như mình, tên queue name sẽ được tạo random.
  • Tham số thứ hai trong phương thức queueDeclare() này là durable, giá trị là true nếu các bạn muốn queue này vẫn tồn tại sau khi RabbitMQ server restart. Ngược lại, nếu giá trị là false thì queue sẽ bị xoá.
  • Tham số thứ ba là exclusive, giá trị true nghĩa là queue này chỉ available cho connection này, cho channel này.
    Tham số thứ tư là autoDelete, giá trị true nghĩa là server sẽ tự động xoá queue nếu nó không được sử dụng.
  • Tham số thứ năm là về một số configuration khác cho queue.

Sau khi đã tạo temporary reply queue, bây giờ chúng ta sẽ gửi message sử dụng phương thức basicPublish() của đối tượng Channel như sau:

Để nhận message, chúng ta sẽ sử dụng phương thức basicConsume() của đối tượng Channel. Một trong những tham số trong phương thức này là interface DeliverCallback. Các bạn cần implement interface này để có thể nhận message được gửi tới reply queue của RabbitMQ server.

Mình sẽ implement interface này đơn giản như sau:

Phương thức basicConsume() sẽ được gọi như sau:

Toàn bộ code của Requester như sau các bạn nhé:

Receiver

Phía receiver, chúng ta cũng implement tương tự như requester, chỉ có thêm một bước là sau khi nhận message từ requester, receiver sẽ gửi lại message response tới reply queue.

Toàn bộ code của Receiver, mình viết như sau:

Như các bạn thấy, chúng ta cần phải lấy Correlation ID và reply queue từ request message.

Bây giờ nếu các bạn chạy Receiver trước:

sau đó thì chạy Requester, các bạn sẽ thấy kết quả như sau:

Kiểm tra console log của Receiver, các bạn sẽ thấy tên của reply queue như sau:

Add Comment