Stream (luồng dữ liệu) trong Apache Kafka là một chuỗi không giới hạn các message, liên tục được gửi vào Apache Kafka topic. Nếu các bạn sử dụng thư viện Spring for Apache Kafka thông thường, các bạn có thể subscribe và nhận các message này một cách dễ dàng. Nhưng nếu các bạn muốn áp dụng nhiều thao tác khác đối với các message nhận được như filter, join, tổng hợp, transformation các message, quản lý state thì các bạn có thể sử dụng thư viện Spring for Apache Kafka cùng với thư viện Apache Kafka Streams. Thư viện Spring for Apache Kafka đóng vai trò là một wrapper của thư viện Apache Kafka Streams, giúp chúng ta không chỉ đơn thuần là nhận các message từ Apache Kafka topic, nó còn giúp chúng ta có thể filter, transform, tổng hợp, join, … các message này, hướng đến việc xây dựng các data processing pipeline có khả năng scale và chịu lỗi. Cụ thể như thế nào? Chúng ta hãy cùng nhau tìm hiểu trong bài viết này, các bạn nhé!
Đầu tiên, mình sẽ tạo mới một Maven project:

với Spring for Apache Kafka và Apache Kafka Streams dependency như sau:
|
1 2 3 4 5 6 7 8 9 10 11 |
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>4.1.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>4.3.0</version> </dependency> |
Để làm ví dụ, để đơn giản thì mình sẽ có một ứng dụng nhận các message từ một topic trên là orders. Nội dung của các message này đơn giản sẽ có key là order ID và value sẽ có các giá trị là “CREATED” hoặc “CANCELLED”. Ứng dụng của chúng ta sẽ sử dụng thư viện Spring for Apache Kafka Streams để filter các message có value là “CREATED” để publish vào một topic khác tên là created-order, các bạn nhé!
Đầu tiên, chúng ta sẽ cấu hình thông tin Apache Kafka server với ứng dụng và enable các cấu hình dành cho Apache Kafka Stream sử dụng annotation @EnableKafkaStreams như sau:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
package com.huongdanjava.springkafka; import java.util.HashMap; import java.util.Map; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.config.KafkaStreamsConfiguration; @Configuration @EnableKafkaStreams public class AppConfig { @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration kafkaStreamsConfiguration() { Map<String, Object> props = new HashMap(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "spring-kafka-streams-example"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); return new KafkaStreamsConfiguration(props); } } |
Annotation @EnableKafkaStreams sẽ tự động đăng ký các bean như StreamsBuilderFactoryBean, KafkaStreams, StreamsBuilder, cần thiết để làm việc với luồng dữ liệu cấu trúc (stream topology), đó các bạn!
Chúng ta cần định nghĩa một bean của class KafkaStreamsConfiguration với tên chính xác là defaultKafkaStreamsConfig. Bean của class KafkaStreamsConfiguration sẽ định nghĩa thông tin của Apache Kafka server, application unique ID để identify application của các bạn với các application khác đang consume message trong Apache Kafka, còn các cấu hình liên quan đến Serdes thì là liên quan đến việc serialize/deserialize các message các bạn nhé! Chúng ta sẽ sử dụng String để serialize/deserialize các message này.
Tiếp theo, chúng ta sẽ định nghĩa một stream topology từ lúc ứng dụng nhận các message, filter hoặc transform các message này, cho đến việc sẽ handle kết quả như thế nào. Trong ví dụ này thì mình sẽ publish kết quả vào một Apache Kafka topic khác là created-orders như sau:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
package com.huongdanjava.springkafka; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class OrderStreamTopology { @Bean public KStream<String, String> orderStream(StreamsBuilder builder) { KStream<String, String> orders = builder.stream("orders"); KStream<String, String> createdOrders = orders .filter((key, value) -> "CREATED".equals(value)) .mapValues(value -> "PROCESSED_" + value); createdOrders.to("created-orders"); return orders; } } |
Chúng ta sẽ sử dụng đối tượng của class StreamBuilder để nhận các message từ Apache Kafka topic orders, dưới dạng stream, được lưu giữ trong đối tượng của class KStream. Sau khi nhận các message liên quan đến order thì chúng ta sẽ filter những message có value là “CREATED” để publish vào một topic có tên là created-orders, đó các bạn!
Còn rất nhiều operation khác mà các bạn có thể sử dụng, các bạn nhé:

Bây giờ thì các bạn hãy khai báo những cấu hình này với Spring như sau:
|
1 2 3 4 5 6 7 8 9 10 |
package com.huongdanjava.springkafka; import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class Application { static void main(){ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class, OrderStreamTopology.class); } } |
Lúc này, nếu chạy ứng dụng và publish một số message vào topic orders với key và value như sau:
các bạn cũng sẽ thấy một số message được publish vào topic created-orders như sau:

Như vậy là chúng ta đã filter thành công các message được publish vào Apache Kafka sử dụng Spring for Apache Kafka Streams rồi đó các bạn!
Nếu ứng dụng của các bạn handle rất nhiều message từ Apache Kafka và các bạn muốn áp dụng nhiều logic để xử lý cho các message này thì các bạn có thể sử dụng thư viện Spring for Apache Kafka Streams, các bạn nhé!
Các bạn có thể xem thêm video ở đây


