Trong những bài viết trước Quản lý Apache Kafka sử dụng class KafkaAdmin của Spring for Apache Kafka và Gửi message tới Apache Kafka sử dụng class KafkaTemplate của Spring for Apache Kafka, mình đã hướng dẫn các bạn cách làm việc với Apache Kafka sử dụng module Spring for Apache Kafka của Spring. Spring Boot cũng đã hỗ trợ Apache Kafka bằng cách auto configuration các thông tin cần thiết để chúng ta dễ dàng làm việc với Apache Kafka sử dụng Spring. Cụ thể như thế nào? Chúng ta hãy cùng nhau tìm hiểu trong bài viết này các bạn nhé!
Đầu tiên, mình sẽ tạo mới một Spring Boot project với Spring for Apache Kafka:
để làm ví dụ.
Quản lý các topics
Như các bạn đã biết, chúng ta sẽ sử dụng class KafkaAdmin để quản lý các topic trong Apache Kafka. Với cơ chế auto-configuration của Spring Boot, các bạn chỉ cần cấu hình thông tin của Apache Kafka server sử dụng property sau:
1 2 3 |
spring: kafka: bootstrap-servers: localhost:9092 |
là có thể ingest bean của class KafkaAdmin để sử dụng rồi.
Ví dụ, nếu mình cấu hình bean của class NewTopic:
1 2 3 4 |
@Bean public NewTopic topic1() { return new NewTopic("huongdanjava", 1, (short) 1); } |
thì khi chạy ứng dụng lên, các bạn cũng sẽ thấy một topic mới được tạo.
Lấy thông tin của topic sử dụng class KafkaAdmin như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
package com.huongdanjava.springboot.apachekafka; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.kafka.core.KafkaAdmin; @SpringBootApplication public class SpringBootApacheKafkaExampleApplication { @Autowired private KafkaAdmin kafkaAdmin; public static void main(String[] args) { SpringApplication.run(SpringBootApacheKafkaExampleApplication.class, args); } @Bean public NewTopic topic1() { return new NewTopic("huongdanjava", 1, (short) 1); } @Bean public CommandLineRunner commandLineRunner(ApplicationContext ctx) { return args -> { System.out.println(kafkaAdmin.describeTopics("huongdanjava")); Thread.sleep(10000); }; } } |
các bạn cũng thấy kết quả tương tự như bài viết trước:
Gửi message tới topics
Với class KafkaTemplate, các bạn cũng chỉ cần cấu hình thêm một số property sau là có thể ingest bean của class KafkaTemplate để sử dụng rồi:
1 2 3 4 5 6 |
spring: kafka: bootstrap-servers: localhost:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer |
Ví dụ như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
package com.huongdanjava.springboot.apachekafka; import java.util.concurrent.CompletableFuture; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; @SpringBootApplication public class SpringBootApacheKafkaExampleApplication { @Autowired private KafkaTemplate<String, String> kafkaTemplate; public static void main(String[] args) { SpringApplication.run(SpringBootApacheKafkaExampleApplication.class, args); } @Bean public NewTopic topic1() { return new NewTopic("huongdanjava", 1, (short) 1); } @Bean public CommandLineRunner commandLineRunner(ApplicationContext ctx) { return args -> { CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("huongdanjava", "Hello World!"); future.thenAccept(result -> { System.out.println(result); }); Thread.sleep(10000); }; } } |
Kết quả:
Nhận message
Spring Boot hỗ trợ một số properties sau cho phần consumer, giúp chúng ta dễ dàng consume các message của Apache Kafka server trong các ứng dụng Spring Boot. Cụ thể như sau:
1 2 3 4 5 6 7 8 9 10 |
spring: kafka: bootstrap-servers: localhost:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: huongdanjava key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer |
Chỉ cần khai báo những properties này, Spring Boot sẽ khởi tạo bean cho class implement interface KafkaListenerContainerFactory và bean cho class ConsumerFactory.
Các bạn giờ chỉ cần định nghĩa class chứa phương thức được annotate với annotation @KafkaListener mà thôi. Ví dụ của mình như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
package com.huongdanjava.springboot.apachekafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class AppListener { @KafkaListener(topics = "huongdanjava", groupId = "huongdanjava") public void listen(String message) { System.out.println("Received message: " + message); } } |
Kết quả nếu mình chạy ứng dụng, gửi một message “Hello World!” tới Apache Kafka server sử dụng ví dụ trong bài viết Gửi message tới Apache Kafka sử dụng class KafkaTemplate của Spring for Apache Kafka, như sau: