Để làm ví dụ về việc hiện thực Outbox Pattern với Debezium, mình sẽ có một hệ thống với 2 service là student-service và other-service:
- Student service sẽ expose một API để thêm mới thông tin sinh viên vào database.
- Thông tin sinh viên mới được thêm này cần được inform cho Other Service để xử lý thêm một số logic khác.
Chúng ta sẽ sử dụng Outbox Pattern để đảm bảo tính nhất quán của dữ liệu, các bạn nhé! Thông tin sinh viên mới thêm sẽ được capture bởi Debezium và được publish tới Other service thông qua Apache Kafka.
High level architecture của hệ thống mình sẽ như sau:

Để làm ví dụ, mình sẽ tạo mới một Maven multiple project với 2 project là student-service và other-service, như sau:

Student Service
Student Service là một Spring Boot application sử dụng Web Starter, Data JPA Starter, PostgreSQL driver, Lombok và Docker Compose Support.
Cho PostgreSQL database server, mình sẽ sử dụng tính năng của Spring Boot hỗ trợ cho Docker Compose để có thể chạy một PostgreSQL database khi chạy ứng dụng lên. PostgreSQL database này sẽ enable sẵn các cấu hình liên quan đến change data capture, các bạn nhé! Nội dung của tập tin Docker Compose lúc này như sau:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
services: postgres: image: 'postgres:18.4' volumes: - ./src/main/resources/db.sql:/docker-entrypoint-initdb.d/db.sql - ./postgresql_data:/var/lib/postgresql environment: - POSTGRES_DB=outbox_pattern_example - POSTGRES_PASSWORD=123456 - POSTGRES_USER=khanh command: - postgres - -c - wal_level=logical - -c - shared_preload_libraries=pgoutput ports: - '5430:5432' |
Tập tin compose.yaml này sẽ nằm ở thư mục root của project nha các bạn.
Trong tập tin db.sql ở thư mục src/main/resources, table student sẽ được mình định nghĩa như sau:
|
1 2 3 4 |
CREATE TABLE student ( id UUID PRIMARY KEY, name VARCHAR(100) ); |
Entity của class Student và class Repository của nó, có nội dung như sau:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
package com.huongdanjava.systemdesign.outboxpattern.repository.entity; import jakarta.persistence.Entity; import jakarta.persistence.Id; import java.util.UUID; import lombok.Getter; import lombok.Setter; @Entity @Getter @Setter public class Student { @Id private UUID id; private String name; } |
và:
|
1 2 3 4 5 6 7 8 9 10 11 |
package com.huongdanjava.systemdesign.outboxpattern.repository; import com.huongdanjava.systemdesign.outboxpattern.repository.entity.Student; import java.util.UUID; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; @Repository public interface StudentRepository extends JpaRepository<Student, UUID> { } |
Cũng trong tập tin db.sql này, mình cũng định nghĩa nội dung của outbox table như sau:
|
1 2 3 4 5 6 7 |
CREATE TABLE outbox ( id UUID PRIMARY KEY, aggregate_id UUID NOT NULL, type VARCHAR NOT NULL, event_type VARCHAR NOT NULL, payload VARCHAR NOT NULL ); |
- Cột id dùng làm unique id cho mỗi message sẽ được gửi tới Apache Kafka.
- aggregate_id là id của domain object liên quan đến event. Trong ví dụ của mình thì là id của sinh viên đó các bạn.
- type là domain object model liên quan đến event, cho ví dụ của mình là student, đó các bạn.
- event_type là loại event liên quan đến domain object model. Type có thể là thêm mới, update, xoá, …
- payload là nội dung dưới dạng JSON của event.
Mình cũng thêm entity và class Repository cho outbox table này như sau:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
package com.huongdanjava.systemdesign.outboxpattern.repository.entity; import jakarta.persistence.Entity; import jakarta.persistence.Id; import java.util.UUID; import lombok.Getter; import lombok.Setter; @Entity @Getter @Setter public class Outbox { @Id private UUID id; private UUID aggregateId; private String type; private String eventType; private String payload; } |
và:
|
1 2 3 4 5 6 7 8 9 10 11 |
package com.huongdanjava.systemdesign.outboxpattern.repository; import com.huongdanjava.systemdesign.outboxpattern.repository.entity.Student; import java.util.UUID; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; @Repository public interface StudentRepository extends JpaRepository<Student, UUID> { } |
StudentController sẽ expose API thêm mới sinh viên có nội dung như sau:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
package com.huongdanjava.systemdesign.outboxpattern.controller; import com.huongdanjava.systemdesign.outboxpattern.controller.dto.AddNewStudentRequest; import com.huongdanjava.systemdesign.outboxpattern.controller.dto.AddNewStudentResponse; import com.huongdanjava.systemdesign.outboxpattern.repository.OutboxRepository; import com.huongdanjava.systemdesign.outboxpattern.repository.StudentRepository; import com.huongdanjava.systemdesign.outboxpattern.repository.entity.Outbox; import com.huongdanjava.systemdesign.outboxpattern.repository.entity.Student; import jakarta.transaction.Transactional; import java.util.UUID; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; import tools.jackson.databind.ObjectMapper; @RestController public class StudentController { private final StudentRepository studentRepository; private final OutboxRepository outboxRepository; private final ObjectMapper om = new ObjectMapper(); public StudentController(StudentRepository studentRepository, OutboxRepository outboxRepository) { this.studentRepository = studentRepository; this.outboxRepository = outboxRepository; } @PostMapping("/students") @Transactional ResponseEntity<AddNewStudentResponse> addNewStudent(@RequestBody AddNewStudentRequest request) { Student student = new Student(); student.setName(request.getName()); student.setId(UUID.randomUUID()); Student s = studentRepository.save(student); Outbox outbox = new Outbox(); outbox.setId(UUID.randomUUID()); outbox.setAggregateId(s.getId()); outbox.setEventType("StudentCreated"); outbox.setType("Student"); outbox.setPayload(om.writeValueAsString(student)); outboxRepository.save(outbox); return ResponseEntity.ok(new AddNewStudentResponse(true, "Added new student")); } } |
Cùng với việc insert mới một record vào table student, chúng ta cũng sẽ insert 1 record vào outbox table. Table outbox này sẽ được monitor bởi Debezium để mỗi khi có record mới, Debezium sẽ pick up và gửi một message tới Apache Kafka.
Nội dung của class AddNewStudentRequest và AddNewStudentResponse như sau:
|
1 2 3 4 5 6 7 8 9 10 |
package com.huongdanjava.systemdesign.outboxpattern.controller.dto; import lombok.Data; @Data public class AddNewStudentRequest { private String name; } |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
package com.huongdanjava.systemdesign.outboxpattern.controller.dto; import lombok.AllArgsConstructor; import lombok.Data; @Data @AllArgsConstructor public class AddNewStudentResponse { private boolean success; private String message; } |
Other Service
Cho Other Service, mình cũng tạo một Spring Boot project sử dụng Spring Boot Starter Kafka.
Other Service sẽ subscribe vào một topic của Apache Kafka, cái topic mà Debezium sẽ public data change của Student Service, đó các bạn! Sau đó thì sẽ in dữ liệu nhận được ra console. Mình sẽ viết code đơn giản như sau:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
package com.huongdanjava.systemdesign.outboxpattern; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class AppListener { @KafkaListener(topics = "outboxpattern.public.outbox", groupId = "huongdanjava") public void listen(String message) { System.out.println("Received message: " + message); } } |
outboxpattern.public.outbox sẽ là tên của topic mà Debezium publish message data change nhé các bạn.
Tập tin application.yaml của service này, có nội dung như sau:
|
1 2 3 4 5 6 7 |
spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: huongdanjava key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer |
Mình sẽ định nghĩa một service để chạy Apache Kafka trong tập tin Docker Compose ở trên như sau:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
kafka: image: confluentinc/cp-kafka:8.2.1 environment: KAFKA_NODE_ID: 1 KAFKA_PROCESS_ROLES: 'broker,controller' KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093' KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092' KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' volumes: - ./kafka_data:/var/lib/kafka/data healthcheck: test: [ "CMD-SHELL", "kafka-topics --bootstrap-server localhost:9092 --list" ] interval: 5s retries: 10 ports: - 9092:9092 |
Debezium Server
Cho Debezium Server, mình sẽ định nghĩa một service mới trong tập Docker Compose ở trên, để chạy nó, như sau:
|
1 2 3 4 5 6 7 8 9 10 |
debezium-server: image: quay.io/debezium/server:3.5.2.Final depends_on: - postgres - kafka volumes: - ./debezium_data:/debezium/data - ./src/main/resources/conf:/debezium/config ports: - 8080:8080 |
Tập tin cấu hình của Debezium trong thư mục src/main/resource/conf của root project sẽ có nội dung như sau:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
debezium.source.database.hostname=postgres debezium.source.database.port=5432 debezium.source.database.user=khanh debezium.source.database.password=123456 debezium.source.database.dbname=outbox_pattern_example debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector debezium.source.topic.prefix=outboxpattern debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0 debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory debezium.source.schema.history.internal.file.filename=data/schema-history.dat debezium.source.plugin.name=pgoutput debezium.sink.type=kafka debezium.sink.kafka.producer.bootstrap.servers=kafka:29092 debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer |
Với cấu hình trên, một topic có tên là outboxpattern.public.outbox sẽ được tạo trong Apache Kafka đó các bạn! Tất cả những thay đổi của table outbox sẽ được capture và public lên topic này.
Các bạn xem thêm bài viết Change Data Capture với Debezium để hiểu rõ hơn về những thông tin cấu hình này nhé!
Bây giờ, nếu chạy tất cả các ứng dụng của hệ thống, sau đó thì request tới Student Service http://localhost:8081/students với phương thức POST, các bạn sẽ thấy kết quả như sau:

Console log của Other Service, các bạn sẽ thấy một message được nhận, như sau:
Table student và table outbox, các bạn cũng sẽ thấy một record mới như sau:

và:

Như vậy là chúng ta đã hiện thực thành công Outbox Pattern với Debezium rồi đó các bạn!
Các bạn có thể xem thêm video ở đây:

