kshitij kumar
btree.dev

Follow

btree.dev

Follow
Notes: Outbox Pattern using change data capture (CDC) with Dabezium

Notes: Outbox Pattern using change data capture (CDC) with Dabezium

In this blog, we implement a basic outbox pattern using change data capture (CDC) with Dabezium

kshitij kumar's photo
kshitij kumar
ยทJun 18, 2022ยท

5 min read

Outbox pattern gives us a reliable option to work with event-driven systems asynchronously. In this pattern, we have one outbox table where we add all the events' metadata for the service. Then we have a reader e.g. Dabezium which fetches the changes in the outbox table and pushes that into a queue. The queue can have multiple topics based on the entity or any other segregation method whichever suits the requirement. Then from the queue, we can fetch data using some application consumer(s) or we can further push them to other infrastructure elements like elasticsearch, redis, etc.

The advantage of this approach is that we don't require manually triggering different tasks on an event from our service. It will happen asynchronously. For example, let's say in an instant grocery delivery business, we have order-service in which we have order table that contains payment_status. Now we want that if payment_status is payment_in_progress, our warehouse partners should start packing supplies. And once the status becomes paid, delivery executives and customer support should be notified.

In the above scenario, triggering these external services from the service itself would lead to a coupling of logic which should be avoided. In this scenario to avoid coupling, we'll update payment_status and add the same event in outbox table as well atomically in a single transaction. Then later on respective service consumers will pick up these changes from the queue topic and process the data and execute the required operation.

We should delete row(s) from the outbox table as they get picked up by Dabezium to avoid an unnecessary increase in size. To prevent duplicate operations, we can maintain idempotency by using an inbox table at the consumer service. In inbox table we maintain the list of processed items from outbox table using its unique id.

Let's go through one simple working implementation.

In this docker-compose file, we are deploying postgreSQL database, pgAdmin to manage DB with UI, zookeeper + kafka as a queue, and dabezium as connector. Here, dabezium will be connected with our kafka broker.

docker-compose.yaml

version: "3.8"
services:
  postgres:
    container_name: postgres
    image: debezium/postgres:14
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=docker
      - POSTGRES_PASSWORD=docker
      - POSTGRES_DB=order-service

  pgadmin:
    container_name: pgadmin
    image: dpage/pgadmin4
    environment:
      PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL:-pgadmin4@pgadmin.org}
      PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD:-admin}
      PGADMIN_CONFIG_SERVER_MODE: 'False'
    ports:
      - "${PGADMIN_PORT:-5050}:80"
    restart: unless-stopped

  zookeeper:
    container_name: zookeeper
    image: confluentinc/cp-zookeeper:7.0.1
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    container_name: kafka
    image: confluentinc/cp-kafka:7.0.1
    depends_on: [zookeeper]
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9991
    ports:
      - 9092:9092
      - 29092:29092

  debezium:
    container_name: dabezium
    image: debezium/connect:1.9
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
    depends_on: [kafka]
    ports:
      - 8083:8083

Once docker containers are up, go to http://localhost:5050 to access pgadmin and set up a connection with DB. Run the below code in the order-service database to create relevant tables.

migration.sql

-- Table: public.order
CREATE TABLE IF NOT EXISTS public."order"
(
    id integer NOT NULL DEFAULT nextval('order_id_seq'::regclass),
    client integer NOT NULL,
    payment_status boolean NOT NULL DEFAULT false,
    amount numeric NOT NULL DEFAULT 0.00,
    is_promotion boolean NOT NULL DEFAULT false,
    "timestamp" timestamp without time zone NOT NULL DEFAULT now(),
    CONSTRAINT order_pkey PRIMARY KEY (id)
)

TABLESPACE pg_default;

ALTER TABLE IF EXISTS public."order"
    OWNER to docker;

-- Table: public.outbox
CREATE TABLE IF NOT EXISTS public.outbox
(
    id uuid NOT NULL DEFAULT gen_random_uuid(), -- unique id for idempotency in inbox table
    aggregate_type character varying(255) COLLATE pg_catalog."default" NOT NULL, -- entity e.g. Order, Customer, Payment, etc.
    aggregate_id character varying(255) COLLATE pg_catalog."default" NOT NULL, -- primary key or any unique id like order_id
    type character varying(255) COLLATE pg_catalog."default" NOT NULL, -- type of operation, CRUD - OrderCreated, OrderUpdated, etc.
    payload jsonb NOT NULL, -- JSON data of actual event
    CONSTRAINT outbox_pkey PRIMARY KEY (id)
)

TABLESPACE pg_default;

ALTER TABLE IF EXISTS public.outbox
    OWNER to docker;

Let's go through outbox table fields one by one:

id: unique id for each event. It can be used for the idempotency of the operation and can be stored in inbox table to prevent duplicate operations.

aggregate_type: entity or domain or context of the event e.g. Order, Customer, Payment, etc.

aggregate_id: this can be the unique id to map the change with the relevant connected event e.g. order_id, customer_id, etc.

type: the exact type of the event e.g. OrderCreated, OrderUpdated, etc.

payload: metadata containing all the event-related data e.g. customer_id, amount, delivery_address, etc.

Next, we'll have dabezium config file like below. In this, we have postgresql connector with database connection-related info. Here, table.include.list will connect with outbox table.

debezium.json

{
    "name": "outbox-connector",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "plugin.name": "pgoutput",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "docker",
      "database.password": "docker",
      "database.dbname": "order-service",
      "database.server.name": "postgres",
      "table.include.list": "public.outbox"
    }
  }

Now to add dabezium config file to dabezium, we have to run the first command. This commands adds our postgres connector to dabezium and dabezium will start picking up changes from outbox table.

Now update network_name in the second command with the network created for docker-compose and run. This connects with kafka as a consumer. Now alter data in outbox table from pgAdmin to see the output in the console. By default, kafka topic will be the table name postgres.public.outbox.

commands

# to register json
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 127.0.0.1:8083/connectors/ --data "@debezium.json"

# kafka topic consumer
docker run --tty --network <network_name> confluentinc/cp-kafkacat kafkacat -b kafka:9092 -G reader postgres.public.outbox -J

Or you can run this go program to fetch the changes from outbox table.

main.go

package main

import (
    "context"
    "fmt"
    "log"
    "os"

    "github.com/segmentio/kafka-go"
)

const (
    topic         = "postgres.public.outbox"
    brokerAddress = "localhost:29092"
)

func main() {
    l := log.New(os.Stdout, "kafka reader: ", 0)

    r := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{brokerAddress},
        Topic:   topic,
        GroupID: "order-service-outbox-consumer-group",
        Logger:  l,
    })

    for {
        msg, err := r.ReadMessage(context.Background())
        if err != nil {
            panic("could not read message " + err.Error())
        }

        fmt.Println("received: ", string(msg.Value))
    }
}

Dabezium supports routing of events to the correct Kafka topic out-of-the-box: Topic Routing

An overview of the architecture can be understood from the following image (source: debezium.io):

outbox_pattern.png

I hope that this blog would have given you a basic understanding of Outbox Pattern. Try to run the above example in your local and try altering and testing different variations to understand the pattern concept as well as different Dabezium options.

ย 
Share this