Spring Cloud Stream là một module của Spring Framework, là một high-level abstraction để xây dựng các ứng dụng event-driven microservices. Nó hỗ trợ nhiều message broker khác nhau như Apache Kafka, RabbitMQ, … bằng cách sử dụng các binder. Các bạn có thể dễ dàng thay đổi message broker vendor mà không cần phải thay đổi code nhiều. Trong bài viết này, mình sẽ hướng dẫn các bạn cách sử dụng Spring Cloud Stream cơ bản với một ví dụ sử dụng RabbitMQ, các bạn nhé!
Đầu tiên, mình sẽ tạo mới một Spring Boot project với Cloud Stream dependency để làm ví dụ:

Để làm việc với RabbitMQ sử dụng Spring Cloud Stream, các bạn cần khai báo thêm Spring Cloud Stream Rabbit Starter dependency như sau:
|
1 2 3 4 |
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> |
Để làm việc với RabbitMQ, các bạn hãy start một RabbitMQ server lên, sau đó thì cấu hình thông tin RabbitMQ server này với ứng dụng Spring Boot của chúng ta sử dụng các property như sau:
|
1 2 3 4 5 6 |
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest |
Nếu đang trong quá trình development, các bạn có thể chạy RabbitMQ sử dụng Docker Compose, như mình đang làm. Các bạn có thể tham khảo thêm hướng dẫn ở đây.
Để gửi message tới một queue trong RabbitMQ sử dụng Spring Cloud Stream với binding, đầu tiên, các bạn có thể định nghĩa bean trong Spring container cho consumer và producer sử dụng các interface Consumer và Supplier của Java, ví dụ như sau:
|
1 2 3 4 5 6 7 8 9 10 11 |
@Bean public Supplier<String> sendSupplier() { return () -> "Hello World"; } @Bean public Consumer<String> receiveConsumer() { return message -> { System.out.println("Received: " + message); }; } |
Cho producer, chúng ta sử dụng interface Supplier để định nghĩa message mà chúng ta muốn gửi tới RabbitMQ, còn interface Consumer sẽ làm nhiệm vụ nhận message này đó các bạn!
Sau đó thì các bạn hãy khai báo sử dụng những property sau:
|
1 2 3 4 5 6 7 8 9 10 11 |
spring: cloud: function: definition: sendSupplier;receiveConsumer stream: bindings: sendSupplier-out-0: destination: message.exchange receiveConsumer-in-0: destination: message.exchange group: huongdanjava |
để Spring tự động generate logical binding name (nói nôm na thì đây là những binding name chỉ tồn tại trong Spring, chưa tồn tại trong RabbitMQ hay Apache Kafka nha các bạn) và tự động binding các logical binding name này với exchange và queue trong RabbitMQ các bạn nhé!
Spring Cloud Stream sử dụng Spring Cloud Function, nên với khai báo bean cho các interface Supplier và Consumer ở trên và property:
|
1 2 3 4 |
spring: cloud: function: definition: sendSupplier;receiveConsumer |
khi các bạn chạy ứng dụng, Spring sẽ tự động tạo các logical binding name là:
- sendSupplier-out-0
- receiveConsumer-in-0
đó các bạn!
Các property:
|
1 2 3 4 5 6 7 8 9 |
spring: cloud: stream: bindings: sendSupplier-out-0: destination: message.exchange receiveConsumer-in-0: destination: message.exchange group: huongdanjava |
sẽ tự động bind những logical binding name trên với exchange sử dụng property spring.cloud.stream.bindings.<logical-binding-name>.destination và queue sử dụng property spring.cloud.stream.bindings.<logical-binding-name>.group.
Sau khi binding xong, Spring Cloud Stream cũng sẽ tự động tạo các exchange hay queue này, nếu chúng chưa tồn tại trong RabbitMQ nha các bạn! Cho ví dụ của mình thì khi chạy, các bạn sẽ thấy các exchange sendSupplier-out-0, receiveConsumer-in-0 được tạo trong RabbitMQ như sau:

Một queue mới với tên được combine từ các properties destination và group cũng sẽ được tạo trong RabbitMQ như sau:

Bây giờ khi chạy ứng dụng ví dụ của mình, các bạn sẽ thấy liên tục, một message được public vào RabbitMQ và ứng dụng của mình sẽ nhận message đó và in ra dòng chữ như sau:

Như vậy là, chỉ với vài dòng code đơn giản, chúng ta đã có thể dễ dàng publish và nhận message từ RabbitMQ sử dụng Spring Cloud Stream đó các bạn.