Consume message từ Apache Kafka sử dụng annotation @KafkaListener của Spring for Apache Kafka

Mình đã hướng dẫn các bạn cách gửi một message tới Apache Kafka server sử dụng class KafkaTemplate của Spring for Apache Kafka. Để consume các message từ các topics của Apache Kafka sử dụng Spring for Apache Kafka, các bạn có thể sử dụng annotation @KafkaListener. Cụ thể như thế nào? Hãy cùng nhau tìm hiểu trong bài viết này các bạn nhé!

Đầu tiên, mình sẽ tạo mới một Maven project để làm ví dụ:

Spring for Apache Kafka được khai báo như sau:

Để có thể consume message từ Apache Kafka server, đầu tiên, các bạn cần phải cấu hình thông tin bean của một class implement interface KafkaListenerContainerFactory. Mặc định, có 2 implementation chính cho interface KafkaListenerContainerFactory là ConcurrentKafkaListenerContainerFactory và RetryTopicListenerContainerFactoryDecorator. Thông thường chúng ta sẽ sử dụng class ConcurrentKafkaListenerContainerFactory các bạn nhé!

Class ConcurrentKafkaListenerContainerFactory sẽ sử dụng thông tin của Apache Kafka server, được cấu hình sử dụng class ConsumerFactory, để tạo listener đến các topics của Apache Kafka server. Cụ thể các bạn có thể cấu hình bean cho các class ConcurrentKafkaListenerContainerFactory và ConsumerFactory như sau:

Annotation @KafkaListener sẽ được sử dụng để annotate cho các phương thức sẽ consume các message từ Apache Kafka server.

Để Spring có thể process dựa vào annotation @KafkaListener này, các bạn cần phải khai báo thêm một annotation khác là @EnableKafka trong class cấu hình của Spring các bạn nhé. Cụ thể như sau:

Ở đây, mình cũng đã khai báo thêm annotation @ComponentScan để Spring có thể scan tất cả các class có chứa các phương thức được annotate với annotation @KafkaListener và tạo bean cho chúng.

Cho ví dụ của mình, mình có thể khai báo một class với annotation @Component có chứa phương thức listen() được annotate với annotation @KafkaListener như sau:

Bây giờ, mình chỉ cần viết class main của ứng dụng để Spring tạo các bean dựa vào annotation sử dụng class AnnotationConfigApplicationContext như sau:

thì khi chạy ứng dụng, nếu mình gửi một message “Hello World!” sử dụng ví dụ trong bài viết Gửi message tới Apache Kafka sử dụng class KafkaTemplate của Spring for Apache Kafka, các bạn sẽ thấy kết quả như sau:

Như vậy là chúng ta đã consume được message từ Apache Kafka server sử dụng Spring for Apache Kafka rồi đó các bạn!

Add Comment