Để làm ví dụ về việc hiện thực Outbox Pattern với Debezium, mình sẽ có một hệ thống với 2 service 1 và 2. Service 1 sẽ expose một API để thêm mới thông tin sinh viên vào database. Thông tin sinh viên mới được thêm này sẽ được capture bởi Debezium và được publish tới Service 2 thông qua Apache Kafka. Chúng ta sẽ sử dụng Outbox Pattern để đảm bảo tính nhất quán của dữ liệu.
High level architecture của hệ thống mình sẽ như sau:
Service 1 là một Spring Boot application sử dụng Web Starter, Data JPA Starter, PostgreSQL driver và Docker Compose Support như sau:
PostgreSQL database server mình sẽ sử dụng tính năng của Spring Boot hỗ trợ cho Docker Compose, với nội dung của tập tin Docker Compose như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
services: postgres: image: 'postgres:latest' volumes: - ./src/main/resources/db.sql:/docker-entrypoint-initdb.d/db.sql environment: - POSTGRES_DB=outbox_pattern_example - POSTGRES_PASSWORD=123456 - POSTGRES_USER=khanh command: - postgres - -c - wal_level=logical - -c - shared_preload_libraries=pgoutput ports: - '5430:5432' |
Trong tập tin db.sql ở thư mục src/main/resources, table student được định nghĩa như sau:
1 2 3 4 |
CREATE TABLE student ( id UUID PRIMARY KEY, name VARCHAR(100) ); |
Entity của class Student và class Repository của nó có nội dung như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
package com.huongdanjava.systemdesign; import jakarta.persistence.Entity; import jakarta.persistence.Id; import java.util.UUID; @Entity public class Student { @Id private UUID id; private String name; public UUID getId() { return id; } public void setId(UUID id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } } |
và:
1 2 3 4 5 6 7 8 9 10 |
package com.huongdanjava.systemdesign; import java.util.UUID; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; @Repository public interface StudentRepository extends JpaRepository<Student, UUID> { } |
Cũng trong tập tin db.sql, nội dung của outbox table như sau:
1 2 3 4 5 6 7 |
CREATE TABLE outbox ( id UUID PRIMARY KEY, aggregate_id UUID NOT NULL, type VARCHAR NOT NULL, event_type VARCHAR NOT NULL, payload VARCHAR NOT NULL ); |
- Cột id dùng làm unique id cho mỗi message sẽ được gửi tới Apache Kafka.
- aggregate_id là id của domain object liên quan đến event. Trong ví dụ của mình thì là id của sinh viên đó các bạn.
- type là domain object model liên quan đến event, cho ví dụ của mình là student đó các bạn.
- event_type là loại event liên quan đến domain object model. Type có thể là thêm mới, update, xoá, …
- payload là nội dung dưới dạng JSON của event.
Mình cũng thêm entity và class Repository cho bảng outbox này như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
package com.huongdanjava.systemdesign; import jakarta.persistence.Entity; import jakarta.persistence.Id; import java.util.UUID; @Entity public class Outbox { @Id private UUID id; private UUID aggregateId; private String type; private String eventType; private String payload; public UUID getId() { return id; } public void setId(UUID id) { this.id = id; } public UUID getAggregateId() { return aggregateId; } public void setAggregateId(UUID aggregateId) { this.aggregateId = aggregateId; } public String getType() { return type; } public void setType(String type) { this.type = type; } public String getEventType() { return eventType; } public void setEventType(String eventType) { this.eventType = eventType; } public String getPayload() { return payload; } public void setPayload(String payload) { this.payload = payload; } } |
và:
1 2 3 4 5 6 7 8 9 10 |
package com.huongdanjava.systemdesign; import java.util.UUID; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; @Repository public interface OutboxRepository extends JpaRepository<Outbox, UUID> { } |
StudentController sẽ expose API thêm mới sinh viên có nội dung như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
package com.huongdanjava.systemdesign; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.transaction.Transactional; import java.util.UUID; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class StudentController { private StudentRepository studentRepository; private OutboxRepository outboxRepository; private ObjectMapper om = new ObjectMapper(); public StudentController(StudentRepository studentRepository) { this.studentRepository = studentRepository; this.outboxRepository = outboxRepository; } @PostMapping("/students") @Transactional ResponseEntity<String> addNewStudent(@RequestParam String name) throws JsonProcessingException { Student student = new Student(); student.setName(name); student.setId(UUID.randomUUID()); Student s = studentRepository.save(student); Outbox outbox = new Outbox(); outbox.setId(UUID.randomUUID()); outbox.setAggregateId(s.getId()); outbox.setEventType("StudentCreated"); outbox.setType("Student"); outbox.setPayload(om.writeValueAsString(student)); outboxRepository.save(outbox); return ResponseEntity.ok("Added new student"); } } |
Cùng với việc insert mới một record vào table student, chúng ta cũng sẽ insert 1 record vào table outbox. Table outbox này sẽ được monitor bởi Debezium để mỗi khi có record mới, Debezium sẽ pick up và gửi một message tới Apache Kafka.
Tập tin cấu hình của Debezium cho ví dụ này của mình sẽ có nội dung như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector debezium.source.database.hostname=localhost debezium.source.database.port=5430 debezium.source.database.user=khanh debezium.source.database.password=123456 debezium.source.database.dbname=outbox_pattern_example debezium.source.topic.prefix=outbox_pattern debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0 debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory debezium.source.schema.history.internal.file.filename=data/schema-history.dat debezium.source.plugin.name=pgoutput debezium.sink.type=kafka debezium.sink.kafka.producer.bootstrap.servers=127.0.0.1:9092 debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer |
Với cấu hình trên, một topic có tên là outbox_pattern.public.outbox sẽ được tạo trong Apache Server đó các bạn! Tất cả những thay đổi của table outbox sẽ được capture và public lên topic này.
Các bạn xem thêm bài viết Change Data Capture với Debezium để hiểu rõ hơn về những thông tin cấu hình này nhé!
Cho Service 2, mình cũng tạo một Spring Boot project sử dụng Spring for Apache Kafka để subscribe vào topic của Apache Kafka, cái topic mà Debezium sẽ public data change của Service 1. Sau đó thì sẽ in dữ liệu nhận được ra console.
Service 2 của mình như sau:
Tập tin application.yaml có nội dung như sau:
1 2 3 4 5 6 7 |
spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: huongdanjava key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer |
Class listen topic outbox_pattern.public.outbox mình sẽ viết code đơn giản như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
package com.huongdanjava.springboot; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class AppListener { @KafkaListener(topics = "huongdanjava", groupId = "huongdanjava") public void listen(String message) { System.out.println("Received message: " + message); } } |
Bây giờ, nếu chạy tất các các ứng dụng của hệ thống, sau đó thì request tới địa chỉ http://localhost:8081/students?name=Khanh với phương thức POST, các bạn sẽ thấy kết quả như sau:
Table student và table outbox, các bạn cũng sẽ thấy một record mới như sau:
và:
Một message mới cũng được public tới topic outbox_pattern.public.outbox với nội dung như sau:
Như vậy là chúng ta đã hiện thực thành công Outbox Pattern rồi đó các bạn!