Notes: Outbox Pattern using change data capture (CDC) with Dabezium
In this blog, we implement a basic outbox pattern using change data capture (CDC) with Dabezium
Outbox pattern gives us a reliable option to work with event-driven systems asynchronously. In this pattern, we have one outbox
table where we add all the events' metadata for the service. Then we have a reader e.g. Dabezium which fetches the changes in the outbox
table and pushes that into a queue. The queue can have multiple topics based on the entity or any other segregation method whichever suits the requirement. Then from the queue, we can fetch data using some application consumer(s) or we can further push them to other infrastructure elements like elasticsearch, redis, etc.
The advantage of this approach is that we don't require manually triggering different tasks on an event from our service. It will happen asynchronously. For example, let's say in an instant grocery delivery business, we have order-service
in which we have order
table that contains payment_status
. Now we want that if payment_status
is payment_in_progress
, our warehouse partners should start packing supplies. And once the status becomes paid
, delivery executives and customer support should be notified.
In the above scenario, triggering these external services from the service itself would lead to a coupling of logic which should be avoided. In this scenario to avoid coupling, we'll update payment_status
and add the same event in outbox
table as well atomically in a single transaction. Then later on respective service consumers will pick up these changes from the queue topic and process the data and execute the required operation.
We should delete row(s) from the outbox
table as they get picked up by Dabezium to avoid an unnecessary increase in size. To prevent duplicate operations, we can maintain idempotency by using an inbox
table at the consumer service. In inbox
table we maintain the list of processed items from outbox
table using its unique id.
Let's go through one simple working implementation.
In this docker-compose file, we are deploying postgreSQL
database, pgAdmin
to manage DB with UI, zookeeper
+ kafka
as a queue, and dabezium
as connector. Here, dabezium
will be connected with our kafka
broker.
docker-compose.yaml
version: "3.8"
services:
postgres:
container_name: postgres
image: debezium/postgres:14
ports:
- 5432:5432
environment:
- POSTGRES_USER=docker
- POSTGRES_PASSWORD=docker
- POSTGRES_DB=order-service
pgadmin:
container_name: pgadmin
image: dpage/pgadmin4
environment:
PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL:-pgadmin4@pgadmin.org}
PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD:-admin}
PGADMIN_CONFIG_SERVER_MODE: 'False'
ports:
- "${PGADMIN_PORT:-5050}:80"
restart: unless-stopped
zookeeper:
container_name: zookeeper
image: confluentinc/cp-zookeeper:7.0.1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
container_name: kafka
image: confluentinc/cp-kafka:7.0.1
depends_on: [zookeeper]
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9991
ports:
- 9092:9092
- 29092:29092
debezium:
container_name: dabezium
image: debezium/connect:1.9
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
depends_on: [kafka]
ports:
- 8083:8083
Once docker containers are up, go to http://localhost:5050
to access pgadmin and set up a connection with DB. Run the below code in the order-service
database to create relevant tables.
migration.sql
-- Table: public.order
CREATE TABLE IF NOT EXISTS public."order"
(
id integer NOT NULL DEFAULT nextval('order_id_seq'::regclass),
client integer NOT NULL,
payment_status boolean NOT NULL DEFAULT false,
amount numeric NOT NULL DEFAULT 0.00,
is_promotion boolean NOT NULL DEFAULT false,
"timestamp" timestamp without time zone NOT NULL DEFAULT now(),
CONSTRAINT order_pkey PRIMARY KEY (id)
)
TABLESPACE pg_default;
ALTER TABLE IF EXISTS public."order"
OWNER to docker;
-- Table: public.outbox
CREATE TABLE IF NOT EXISTS public.outbox
(
id uuid NOT NULL DEFAULT gen_random_uuid(), -- unique id for idempotency in inbox table
aggregate_type character varying(255) COLLATE pg_catalog."default" NOT NULL, -- entity e.g. Order, Customer, Payment, etc.
aggregate_id character varying(255) COLLATE pg_catalog."default" NOT NULL, -- primary key or any unique id like order_id
type character varying(255) COLLATE pg_catalog."default" NOT NULL, -- type of operation, CRUD - OrderCreated, OrderUpdated, etc.
payload jsonb NOT NULL, -- JSON data of actual event
CONSTRAINT outbox_pkey PRIMARY KEY (id)
)
TABLESPACE pg_default;
ALTER TABLE IF EXISTS public.outbox
OWNER to docker;
Let's go through outbox table fields one by one:
id: unique id for each event. It can be used for the idempotency of the operation and can be stored in inbox
table to prevent duplicate operations.
aggregate_type: entity or domain or context of the event e.g. Order, Customer, Payment, etc.
aggregate_id: this can be the unique id to map the change with the relevant connected event e.g. order_id, customer_id, etc.
type: the exact type of the event e.g. OrderCreated, OrderUpdated, etc.
payload: metadata containing all the event-related data e.g. customer_id, amount, delivery_address, etc.
Next, we'll have dabezium config file like below. In this, we have postgresql connector with database connection-related info. Here, table.include.list
will connect with outbox
table.
debezium.json
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "docker",
"database.password": "docker",
"database.dbname": "order-service",
"database.server.name": "postgres",
"table.include.list": "public.outbox"
}
}
Now to add dabezium config file to dabezium, we have to run the first command. This commands adds our postgres connector to dabezium and dabezium will start picking up changes from outbox
table.
Now update network_name in the second command with the network created for docker-compose and run. This connects with kafka as a consumer. Now alter data in outbox
table from pgAdmin to see the output in the console. By default, kafka topic will be the table name postgres.public.outbox
.
commands
# to register json
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 127.0.0.1:8083/connectors/ --data "@debezium.json"
# kafka topic consumer
docker run --tty --network <network_name> confluentinc/cp-kafkacat kafkacat -b kafka:9092 -G reader postgres.public.outbox -J
Or you can run this go program to fetch the changes from outbox
table.
main.go
package main
import (
"context"
"fmt"
"log"
"os"
"github.com/segmentio/kafka-go"
)
const (
topic = "postgres.public.outbox"
brokerAddress = "localhost:29092"
)
func main() {
l := log.New(os.Stdout, "kafka reader: ", 0)
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{brokerAddress},
Topic: topic,
GroupID: "order-service-outbox-consumer-group",
Logger: l,
})
for {
msg, err := r.ReadMessage(context.Background())
if err != nil {
panic("could not read message " + err.Error())
}
fmt.Println("received: ", string(msg.Value))
}
}
Dabezium supports routing of events to the correct Kafka topic out-of-the-box: Topic Routing
An overview of the architecture can be understood from the following image (source: debezium.io):
I hope that this blog would have given you a basic understanding of Outbox Pattern. Try to run the above example in your local and try altering and testing different variations to understand the pattern concept as well as different Dabezium options.