Mình đã hướng dẫn các bạn cách sử dụng class KafkaAdmin của Spring for Apache Kafka để quản lý các topic trong Apache Kafka. Để gửi hoặc xoá các message trong các Topic của Apache Kafka trong các project có sử dụng Spring, các bạn có thể sử dụng class KafkaTemplate của module Spring for Apache Kafka này. Cụ thể như thế nào? Chúng ta sẽ cùng nhau tìm hiểu trong bài viết này các bạn nhé!
Đầu tiên, mình sẽ tạo mới một Maven project để làm ví dụ:
Spring for Apache Kafka được khai báo như sau:
1 2 3 4 5 |
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>3.3.0</version> </dependency> |
Chúng ta có thể khởi tạo bean của KafkaTemplate trong Spring Container như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
@Bean KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(props); } |
Tham số khi khởi tạo đối tượng KafkaTemplate là thông tin của Apache Kafka mà ứng dụng sẽ làm việc nha các bạn! Như các bạn thấy, đó chính là đối tượng của class DefaultKafkaProducerFactory, một implementation của interface ProducerFactory. Chúng ta sẽ khai báo thông tin của Apache Kafka server, các class để serialize key và value của Kafka message sử dụng class DefaultKafkaProducerFactory này.
Có nhiều phương thức send() overloading trong class KafkaTemplate giúp chúng ta có thể gửi một message tới Apache Kafka, như sau:
Các bạn có thể chọn phương thức mà các bạn muốn.
Để làm ví dụ, mình sẽ sử dụng phương thức send() với 2 tham số là tên topic và data mà mình muốn gửi. Code của mình như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
package com.huongdanjava.springapachekafka; import java.util.concurrent.CompletableFuture; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; public class Application { public static void main(String[] args) throws InterruptedException { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext( AppConfiguration.class); KafkaTemplate<String, String> kafkaTemplate = context.getBean(KafkaTemplate.class); CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("huongdanjava", "Hello World!"); future.thenAccept(result -> { System.out.println(result); }); Thread.sleep(10000); } } |
Viết gọn lại với Lambda Expression sẽ như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
package com.huongdanjava.springapachekafka; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.kafka.core.KafkaTemplate; public class Application { public static void main(String[] args) throws InterruptedException { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext( AppConfiguration.class); KafkaTemplate<String, String> kafkaTemplate = context.getBean(KafkaTemplate.class); kafkaTemplate.send("huongdanjava", "Hello World!").thenAccept(System.out::println); Thread.sleep(10000); } } |
Như các bạn thấy, đối tượng trả về khi chúng ta gọi phương thức send() là một CompletableFuture và kết quả sẽ được trả về asynchronously.
Kết quả khi mình chạy ví dụ này như sau:
Như vậy là chúng ta đã gửi thành công message tới Apache Kafka rồi đó các bạn!