Để quản lý Apache Kafka trong các project có sử dụng Spring, các bạn có thể sử dụng class KafkaAdmin trong module Spring for Apache Kafka của Spring. Class KafkaAdmin này là một wrapper của class AdminClient trong thư viện kafka-client đó các bạn! Sử dụng đối tượng của class KafkaAdmin, các bạn có thể lấy thông tin tất cả các Topic trong Apache Kafka, thêm, xoá, sửa các Topic này. Cụ thể như thế nào? Chúng ta hãy cùng nhau tìm hiểu trong bài viết này các bạn nhé!
Đầu tiên, mình sẽ tạo mới một Maven project để làm ví dụ:
Spring for Apache Kafka được khai báo như sau:
1 2 3 4 5 |
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>3.3.0</version> </dependency> |
Các bạn có thể khởi tạo bean của đối tượng KafkaAdmin trong Spring container như sau:
1 2 3 4 5 6 7 |
@Bean public KafkaAdmin kafkaAdmin() { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); return new KafkaAdmin(configs); } |
Tạo mới Topic
Với khai báo bean cho class KafkaAdmin ở trên thì giờ, các bạn chỉ cần khai báo bean của class NewTopic, ví dụ như sau:
1 2 3 4 |
@Bean public NewTopic topic1() { return new NewTopic("huongdanjava", 1, (short) 1); } |
thì khi chạy ứng dụng:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
package com.huongdanjava.springapachekafka; import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class Application { public static void main(String[] args) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext( AppConfiguration.class); } } |
các bạn sẽ thấy một topic mới được tạo trong Apache Kafka, ví dụ như mình như sau:
Nếu các bạn muốn tạo topic manually thì có thể sử dụng phương thức createOrModifyTopics() với tham số là đối tượng NewTopic của class KafkaAdmin, ví dụ như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
package com.huongdanjava.springapachekafka; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.kafka.core.KafkaAdmin; public class Application { public static void main(String[] args) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext( AppConfiguration.class); KafkaAdmin kafkaAdmin = context.getBean(KafkaAdmin.class); kafkaAdmin.createOrModifyTopics(new NewTopic("huongdanjava1", 1, (short) 1)); } } |
Kết quả khi chạy lại ví dụ, các bạn cũng sẽ thấy topic “huongdanjava1” sẽ được tạo.
Xem thông tin của một hoặc nhiều Topic
Các bạn có thể sử dụng phương thức describeTopics() của class KafkaAdmin để xem thông tin của một hoặc nhiều Topic, ví dụ như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
package com.huongdanjava.springapachekafka; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.kafka.core.KafkaAdmin; public class Application { public static void main(String[] args) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext( AppConfiguration.class); KafkaAdmin kafkaAdmin = context.getBean(KafkaAdmin.class); System.out.println(kafkaAdmin.describeTopics("huongdanjava", "huongdanjava1")); } } |
Tham số của phương thức describeTopics() này là danh sách tên các Topic mà các bạn muốn xem thông tin nhé các bạn!
Kết quả của mình khi chạy ví dụ này như sau: