Partitioning với RabbitMQ sử dụng Spring Cloud Stream

Partitioning trong message queue là cơ chế chia nhỏ các topic hoặc queue thành nhiều partition khác nhau. Mỗi partition sẽ lưu trữ một phần của dữ liệu và các dữ liệu này sẽ được xử lý song song. Nhờ đó mà quá trình nhận và xử lý các message sẽ xảy ra nhanh hơn, tối ưu hơn đó các bạn!

Apache Kafka hỗ trợ partitioning natively, còn RabbitMQ thì không. Nhưng các bạn có thể dễ dàng implement partitioning với RabbitMQ bằng cách sử dụng Spring Cloud Stream. Spring Cloud Stream hỗ trợ cho chúng ta các giải pháp để có thể implement partitioning cho một message broker bất kỳ. Chúng ta sẽ implement partitioning trong application code và dễ dàng switch giữa các message broker khác nhau mà không cần đổi code nhiều. Spring Cloud Stream sẽ chắc chắn rằng các message liên quan, ví dụ như cùng một userId, sẽ được handle bởi cùng một consumer instance (trong trường hợp mà có nhiều consumer instance cùng subscribe vào một queue và process message đó các bạn). Cụ thể như thế nào? Chúng ta hãy cùng nhau tìm hiểu trong bài viết này, các bạn nhé!

Đầu tiên, mình sẽ tạo mới một Maven multiple module project để làm ví dụ:

Cả 2 project producer và consumer đều là các Spring Boot project sử dụng Spring Cloud Starter Stream Rabbit để gửi và nhận message tới RabbitMQ sử dụng Spring Cloud Stream các bạn nhé.

Cho ví dụ của bài viết này, để minh họa cho phần partitioning thì mình sẽ có một ứng dụng producer để publish message tới exchange với 3 partition khác nhau. Mình sẽ chạy 3 instance của ứng dụng consumer để consume các message từ 3 partition khác nhau này.

Thông tin RabbitMQ server, mình sẽ được định nghĩa trong 2 project như sau:

Cho phần producer, mình sẽ sủ dụng class StreamBridge để gửi message như sau:

Như các bạn thấy, mình sử dụng phương thức send() của class StreamBridge để gửi message tới RabbitMQ thông qua binding name là sendMessage. Nội dung của class Order như sau:

Cho phần binding thì trong tập tin application.yml,  ngoài việc định nghĩa binding của binding name sendMessage tới exchange của RabbitMQ thì các bạn còn cần định nghĩa thêm 2 properties như sau:

Chúng ta sẽ sử dụng property spring.cloud.stream.binding.<binding-name>.producer.partition-count để định nghĩa số lượng partition mà chúng ta muốn và property spring.cloud.stream.binding.<binding-name>.producer.partition-key-expression để định nghĩa routing key, để Spring Cloud Stream có thể route các message liên quan tới một partition nhất định.

Cho phần consumer thì mình sẽ định nghĩa bean của interface Consumer để nhận message từ RabbitMQ:

Nội dung của class Order thì như trong phần producer, các bạn nhé!
Chúng ta cũng định nghĩa thông tin binding và một số property khác liên quan đến partitioning trong tập tin application.yml như sau:

Property spring.cloud.stream.binding.<binding-name>.consumer.partitioned dùng để nói cho Spring Cloud Stream biết là chúng ta muốn enable partitioning cho phần consumer này. Property spring.cloud.stream.binding.instance-count dùng để định nghĩa số lượng partition, các bạn nhé. Ngoài ra thì cũng còn một property khác tên là spring.cloud.stream.instance-index mà chúng ta sẽ sử dụng khi chạy ứng dụng consumer để đánh dấu consumer instance number tương ứng.

Trong ví dụ của mình thì mình sẽ chạy 1 ứng dụng producer và 3 instance của ứng dụng consumer, nên để tránh bị conflict port, mình sẽ định nghĩa property server.port với giá trị 0:

để các ứng dụng này có thể chạy sử dụng random port.

Cho ứng dụng consumer, mình sẽ chạy 3 instance với property spring.cloud.stream.instance-index, ví dụ cho instance 1 như sau:

Các bạn chạy ứng dụng producer như một Spring Boot bình thường các bạn nhé!

Lúc này, sau khi đã chạy 3 instance của consumer và cuối cùng là ứng dụng producer, các bạn sẽ thấy kết quả cho từng consumer instance như sau:

Consumer 1:

Consumer 2:

Consumer 3:

Chạy ứng dụng producer nhiều lần, các bạn sẽ thấy các message liên quan đều được consumer bởi một consumer nhất định, ví dụ như sau:

Nếu các bạn kiểm tra thông tin queue trong RabbitMQ thì các bạn sẽ thấy Spring Cloud Stream đã tự động tạo 3 queue tương ứng với 3 partition mà chúng ta muốn:

Bản chất là từng consumer instance sẽ subscribe vào queue tương ứng với consumer instance number, các message liên quan sẽ được publish vào từng queue này đó các bạn!

Như vậy là trong bài viết này, mình đã hướng dẫn các bạn cách hiện thực partitioning với RabbitMQ sử dụng Spring Cloud Stream. Đối với các message broker mà Spring Cloud Stream có hỗ trợ thì cũng tương tự các bạn nhé!

Xem thêm video ở đây các bạn nhé:

Add Comment