Spring Cloud Stream hỗ trợ chức năng consumer routing, cho phép chúng ta quyết định consumer nào sẽ consume message do một producer nhất định gửi tới. Chúng ta có thể route message sử dụng message header, payload content hay header content với expression. Cụ thể như thế nào, hãy cùng mình tìm hiểu trong bài viết này, các bạn nhé!
Đầu tiên, mình sẽ tạo mới một Spring Boot project với Cloud Stream dependency như sau:

Tương tự như bài viết trước, chúng ta cũng thêm Spring Cloud Starter Stream Rabbit dependency:
|
1 2 3 4 |
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> |
và định nghĩa thông tin RabbitMQ trong tập tin application.yml như sau:
|
1 2 3 4 5 6 |
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest |
Để có thể sử dụng consumer routing với Spring Cloud Stream, các bạn hãy enable nó bằng cách định nghĩa các properties sau:
|
1 2 3 4 5 6 |
spring: cloud: function: routing: enabled: true definition: functionRouter |
Với việc enable routing này, Spring sẽ tự động tạo một bean của interface Function tên là functionRouter và một logical binding name tên là functionRouter-in-0. Các bạn có thể sử dụng logical binding name này để bind với exchange với queue trong RabbitMQ và khi message được nhận bởi bean functionRouter này, nó sẽ route message tới consumer mà chúng ta muốn.
Mình sẽ định nghĩa binding cho phần gửi và nhận message như sau:
|
1 2 3 4 5 6 7 8 9 |
spring: cloud: stream: bindings: sendMessage: destination: message.exchange functionRouter-in-0: destination: message.exchange group: huongdanjava |
Như các bạn thấy, mình sử dụng class StreamBridge để gửi message với binding name là sendMessage và cũng định nghĩa 2 consumer có thể consume message sử dụng interface Consumer như sau:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
@Bean public Consumer<String> receiveConsumer1() { return message -> { System.out.println("Received in consumer 1: " + message); }; } @Bean public Consumer<String> receiveConsumer2() { return message -> { System.out.println("Received in consumer 2: " + message); }; } |
Routing sử dụng message header
Để routing sử dụng message header, các bạn hãy sử dụng class MessageBuilder để build message như sau:
|
1 2 3 4 |
Message<String> message = MessageBuilder .withPayload("Hello World") .setHeader("spring.cloud.function.definition", "receiveConsumer1") .build(); |
Payload là message mà chúng ta muốn gửi. Trong phần header, các bạn phải truyền header name là spring.cloud.function.definition với header value là tên bean của consumer mà các bạn muốn nó nhận message. Trong ví dụ này thì mình đang config là receiveConsumer1 như các bạn thấy!
Mình cũng định nghĩa bean của interface CommandLineRunner để gửi message sử dụng class StreamBridge như sau:
|
1 2 3 4 5 6 7 8 9 10 11 |
@Bean CommandLineRunner runner() { return args -> { Message<String> message = MessageBuilder .withPayload("Hello World") .setHeader("spring.cloud.function.definition", "receiveConsumer1") .build(); streamBridge.send("sendMessage", message); }; } |
Bây giờ, nếu các bạn chạy ứng dụng, các bạn sẽ thấy kết quả như sau:

Nếu các bạn đổi header value của header name spring.cloud.function.definition thành receiveConsumer2 các bạn sẽ thấy message được nhận từ receiveConsumer2 như sau:

Routing sử dụng expression
Để có thể routing với expression, các bạn có thể sử dụng property spring.cloud.function.routing-expression, ví dụ như sau:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
spring: cloud: function: routing: enabled: true definition: functionRouter routing-expression: "new String(payload).contains('2') ? 'receiveConsumer2' : 'receiveConsumer1'" stream: bindings: sendMessage: destination: message.exchange functionRouter-in-0: destination: message.exchange group: huongdanjava |
thì lúc này, nếu các bạn gửi message với nội dung chứa ký tự số 2:
|
1 2 3 4 5 6 7 8 9 10 |
@Bean CommandLineRunner runner() { return args -> { Message<String> message = MessageBuilder .withPayload("Hello 2) .build(); streamBridge.send("sendMessage", message); }; } |
thì các bạn sẽ thấy kết quả như sau:

Đổi message payload thành “Hello 1” thì các bạn sẽ thấy consumer 1 nhận được message các bạn nhé!
Các bạn cũng có thể định nghĩa expression với message header, ví dụ như sau:
|
1 2 3 4 |
spring: cloud: function: routing-expression: "headers['eventType']" |
khi đó, các bạn chỉ cần truyền message header eventType với giá trị là tên của consumer sẽ nhận message là được:
|
1 2 3 4 5 6 7 8 9 10 11 |
@Bean CommandLineRunner runner() { return args -> { Message<String> message = MessageBuilder .withPayload("Hello World") .setHeader("eventType", "receiveConsumer1") .build(); streamBridge.send("sendMessage", message); }; } |
Như vậy là trong bài viết này, mình đã hướng dẫn các bạn các cách để có thể routing message tới các consumer nhất định. Tuỳ nhu cầu, các bạn hãy sử dụng cho hợp lý nhé!
Xem thêm video ở đây các bạn nhé:
