Trong những bài viết trước Quản lý Apache Kafka sử dụng class KafkaAdmin của Spring for Apache Kafka và Gửi message tới Apache Kafka sử dụng class KafkaTemplate của Spring for Apache Kafka, mình đã hướng dẫn các bạn cách làm việc với Apache Kafka sử dụng module Spring for Apache Kafka của Spring. Spring Boot cũng đã hỗ trợ Apache Kafka bằng cách auto configuration các thông tin cần thiết để chúng ta dễ dàng làm việc với Apache Kafka sử dụng Spring. Cụ thể như thế nào? Chúng ta hãy cùng nhau tìm hiểu trong bài viết này các bạn nhé!
Đầu tiên, mình sẽ tạo mới một Spring Boot project với Spring for Apache Kafka:
để làm ví dụ.
Như các bạn đã biết, chúng ta sẽ sử dụng class KafkaAdmin để quản lý các topic trong Apache Kafka. Với cơ chế auto-configuration của Spring Boot, các bạn chỉ cần cấu hình thông tin của Apache Kafka server sử dụng property sau:
1 2 3 |
spring: kafka: bootstrap-servers: localhost:9092 |
là có thể ingest bean của class KafkaAdmin để sử dụng rồi.
Ví dụ, nếu mình cấu hình bean của class NewTopic:
1 2 3 4 |
@Bean public NewTopic topic1() { return new NewTopic("huongdanjava", 1, (short) 1); } |
thì khi chạy ứng dụng lên, các bạn cũng sẽ thấy một topic mới được tạo.
Lấy thông tin của topic sử dụng class KafkaAdmin như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
package com.huongdanjava.springboot.apachekafka; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.kafka.core.KafkaAdmin; @SpringBootApplication public class SpringBootApacheKafkaExampleApplication { @Autowired private KafkaAdmin kafkaAdmin; public static void main(String[] args) { SpringApplication.run(SpringBootApacheKafkaExampleApplication.class, args); } @Bean public NewTopic topic1() { return new NewTopic("huongdanjava", 1, (short) 1); } @Bean public CommandLineRunner commandLineRunner(ApplicationContext ctx) { return args -> { System.out.println(kafkaAdmin.describeTopics("huongdanjava")); Thread.sleep(10000); }; } } |
các bạn cũng thấy kết quả tương tự như bài viết trước:
Với class KafkaTemplate, các bạn cũng chỉ cần cấu hình thêm một số property sau là có thể ingest bean của class KafkaTemplate để sử dụng rồi:
1 2 3 4 5 6 |
spring: kafka: bootstrap-servers: localhost:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer |
Ví dụ như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
package com.huongdanjava.springboot.apachekafka; import java.util.concurrent.CompletableFuture; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; @SpringBootApplication public class SpringBootApacheKafkaExampleApplication { @Autowired private KafkaTemplate<String, String> kafkaTemplate; public static void main(String[] args) { SpringApplication.run(SpringBootApacheKafkaExampleApplication.class, args); } @Bean public NewTopic topic1() { return new NewTopic("huongdanjava", 1, (short) 1); } @Bean public CommandLineRunner commandLineRunner(ApplicationContext ctx) { return args -> { CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("huongdanjava", "Hello World!"); future.thenAccept(result -> { System.out.println(result); }); Thread.sleep(10000); }; } } |
Kết quả: