I have guided you on how to use the KafkaAdmin class of Spring for Apache Kafka to manage topics in Apache Kafka. To send or delete messages in Apache Kafka Topics in projects using Spring, you can use the KafkaTemplate class of this Spring for Apache Kafka module. How is it in detail? We will find out together in this tutorials!
First, I will create a new Maven project as an example:
Spring for Apache Kafka is declared as follows:
1 2 3 4 5 |
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>3.3.0</version> </dependency> |
We can initialize the bean of KafkaTemplate in Spring Container as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
@Bean KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(props); } |
The parameter when initializing the KafkaTemplate object is the information of Apache Kafka that the application will work with! As you can see, it is the object of the DefaultKafkaProducerFactory class, an implementation of the ProducerFactory interface. We will declare the information of the Apache Kafka server, the classes to serialize the key and value of the Kafka message using this DefaultKafkaProducerFactory class.
There are many overloaded send() methods in the KafkaTemplate class that help us send a message to Apache Kafka, as follows:
You can choose the method you want.
For example, I will use the send() method with 2 parameters: the topic name and the data I want to send. My code is as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
package com.huongdanjava.springapachekafka; import java.util.concurrent.CompletableFuture; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; public class Application { public static void main(String[] args) throws InterruptedException { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext( AppConfiguration.class); KafkaTemplate<String, String> kafkaTemplate = context.getBean(KafkaTemplate.class); CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("huongdanjava", "Hello World!"); future.thenAccept(result -> { System.out.println(result); }); Thread.sleep(10000); } } |
Shortened with Lambda Expression would be as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
package com.huongdanjava.springapachekafka; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.kafka.core.KafkaTemplate; public class Application { public static void main(String[] args) throws InterruptedException { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext( AppConfiguration.class); KafkaTemplate<String, String> kafkaTemplate = context.getBean(KafkaTemplate.class); kafkaTemplate.send("huongdanjava", "Hello World!").thenAccept(System.out::println); Thread.sleep(10000); } } |
As you can see, the object returned when we call the send() method is a CompletableFuture and the result will be returned asynchronously.
The result when I run this example is as follows:
So we have successfully sent a message to Apache Kafka!