Blog

Capture data change in PostgreSQL with Debezium and NATS JetStream

Change Data Capture (CDC) is a design pattern that aims to solve the problem of tracking data changes in a source/master system and distributing those changes to other systems. Debezium is an open-source project that serves as a platform for (usually) using Kafka as the channel to stream changes in the database.

In this blog, I will walk you through how to set up Debezium on a local PostgreSQL instance and stream the changes occurring in the database using NATS streams instead of Kafka. Furthermore, to complicate things a bit and go beyond a simple “hello world” example, I will also show you how to configure Debezium to monitor the changes only in the filtered set of tables located in two different schemas.

To capture changes in the PostgreSQL instance, Debezium relies on the Postgres logical decoding output plug-in.

At the moment, there are two available plugins:

  • pgoutput: the standard PostgreSQL decoding output plug-in, available on each PostgreSQL instance since version 10.
  • decoderbufs: a plugin maintained by the Debezium community and based on Protobuf protocol. This plugin requires manual installation on the target Postgres instance.

More on the differences and limitations of each of these plugins can be found in the official Debezium documentation.

To set up Debezium and PostgreSQL, this time I will be using the built-in pgoutput decoding plugin.

For a hands-on experience, you need the following prerequisites:

  • Installed and configured local PostgreSQL instance (version 14+)
  • Access to credentials of a user with non-admin privileges
  • Access to credentials of a user with admin (superuser) privileges (for example, postgres user)
  • Installed docker-engine

Configure and setup the postgres database

To function Debezium relies on the internal Postgres replication mechanism. Therefore, the first step is to modify pg_hba.conf file of your Postgres instance by adding the following line:

host replication debezium 0.0.0.0/0 trust

Now, for the changes to take effect, restart the postgres instance. Depending on your OS, this command can vary. For Ubuntu based systems execute the following:

sudo /etc/init.d/postgresql restart

For this exercise, we will create a new database named “test” with two schemas and several tables prefilled with test data. Connect to Postgres default database (postgres) as a user with administrative privileges and execute the following command:

CREATE DATABASE test;

Grant access to this database to an existing non-admin user (in this example, the user is named domagoj) by running this statement:

GRANT ALL PRIVILEGES ON DATABASE test TO domagoj;

It grants CONNECT and CREATE privileges to this user on the database test.

Now disconnect from the database and connect to the test database as a non-admin user (domagoj), and execute the following:

psql -d test -U domagoj

create table organization (id serial primary key, name text, code text);

insert into organization (name, code) values ('Notch Software solutions', 'NSS');
insert into organization (name, code) values ('Notch Consulting Group', 'NCG');Code language: JavaScript (javascript)

Listing of all tables in the public schema should now give us the following result:

\dt *.*
 
Schema |  Name        | Type  |         Owner          
--------+--------------+-------+------------------------
 public | organization | table | domagoj
(1 row)Code language: PHP (php)

Setup replication role for Debezium

Since we use pgoutput as the logical decoding plugin, Debezium must operate on the database as a user with a specific set of privileges. Namely, it should have a REPLICATION role, with the CONNECT and CREATE privileges granted for the “test” database. Thus, the next step is to create a user that Debezium will use to access the database (we will name that user debezium), which conforms with the above requirements. 

To do so, connect to the test database as a user with administrative privileges and execute the following set of commands:

-- setup REPLICATION role ---
CREATE ROLE debezium REPLICATION LOGIN password 'debezium';

-- Grant CREATE, CONNECT privileges on database to replication role
GRANT ALL PRIVILEGES ON DATABASE test TO debezium;

-- GRANT Replication role access to existing tables in existing schemas
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO debezium;Code language: PHP (php)

With this, we have covered access for the role “debezium” to all existing tables in the “test” database. However, for the “debezium” user to access tables created in the future, we must also alter default privileges for the database. We can do that with the following query:

ALTER DEFAULT PRIVILEGES GRANT ALL ON TABLES TO debezium;Code language: PHP (php)

Furthermore, this user must also be the owner of the tables we want to monitor for the data changes. That’s because of the PostgreSQL restrictions, which require the tables to be added to publication only by their owner. Transferring ownership of the table organization to debezium would leave user domagoj without access to it, and we should then specifically grant privileges to the domagoj user to be able to operate on this table. To avoid this, we will set up shared ownership of the organization table between its original owner (domagoj) and the debezium user.

The mechanism to achieve this is to create a PostgreSQL replication group and add both users to this group.

The following queries do just that:

  • Create a new replication group.
  • Transfer ownership of the table to it.
  • Add debezium and domagoj users to this group.
-- CREATE Replication group role and add users to it
CREATE ROLE REPLICATION_GROUP_TEST;
GRANT REPLICATION_GROUP_TEST TO domagoj;
GRANT REPLICATION_GROUP_TEST TO debezium;
– Transfer table ownership
ALTER TABLE public.organization OWNER TO REPLICATION_GROUP_TEST;Code language: CSS (css)

Finally, we must also modify user domagoj to inherit the privileges granted by the replication group. Otherwise, he will not be able to access the organization table. The following query alters user domagoj and fixes this problem:

ALTER ROLE domagoj INHERIT;

Note this step is not necessary if we created the target user with the INHERIT parameter.

Starting up Debezium and NATS services

We will start both Debezium and NATS Jetstream services as docker containers. To do so, inside the folder of your choice, create the docker-compose.yml file with the following content:

version: '3.9'
services:
  nats:
    image: docker.io/nats:latest
    ports:
      - "4222:4222"
      - "8222:8222"
    command:
      - "--debug"
      - "--http_port=8222"
      - "--js"

  debezium:
    image: docker.io/debezium/server:2.3.0.Final
    volumes:
      - ./application.properties:/debezium/conf/application.properties
    depends_on:
      - nats
    extra_hosts:
      - "host.docker.internal:host-gateway"Code language: JavaScript (javascript)

In the same folder, create a file named application.properties, which the Debezium server will use as its configuration file.

debezium.source.slot.name=test_debezium
# Only for dev, change to false in prod env !
debezium.source.slot.drop.on.stop=true
debezium.source.publication.name=dbz_test_publication
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0

debezium.source.plugin.name=pgoutput

# Connection details
debezium.source.database.hostname=host.docker.internal
debezium.source.database.port=5432
debezium.source.database.user=debezium
debezium.source.database.password=debezium
debezium.source.database.dbname=test

# NATS configuration
debezium.sink.type=nats-jetstream
debezium.sink.nats-jetstream.url=nats://nats:4222
debezium.sink.nats-jetstream.create-stream=true
debezium.sink.nats-jetstream.subjects=postgres.*.*
debezium.source.topic.prefix=postgres

debezium.source.publication.autocreate.mode=filtered
debezium.source.table.include.list=public.organization

This configuration requires some explanation. The first thing to note is the following line:

debezium.source.plugin.name=pgoutput

Debezium relies on the PostgreSQL logical decoding output plug-in to capture changes in the database. There are two available plugins that we mentioned above, pgoutput and decoderebufs.

In this exercise, we have opted to use PostgreSQL’s built-in functionality since it requires no additional installation steps.

The following two configuration lines instruct Debezium to monitor the data changes only for the table “organization” in the “public” schema.

debezium.source.publication.autocreate.mode=filtered
debezium.source.table.include.list=public.organizationCode language: PHP (php)

Now we can start up Debezium and NATS JetStream by executing (inside the folder with docker-compose.yml file):

docker-compose up

To check that all is working well, open another terminal and run the following command:

nats stream subjects DebeziumStream

If all went well, Debezium should have published two messages (since there are two rows in the organization table) to the stream named DebeziumStream, under the subject “postgres.public.organization

As can be seen, the last two parts of the subject name match schema and table names, while the first part of the name, “postgres”, we have configured in the application.properties file with the following two lines:

debezium.sink.nats-jetstream.subjects=postgres.*.*
debezium.source.topic.prefix=postgres

If we now add another record to the organization table:

insert into organization (name, code) values ('Notch LLC, 'NLC');Code language: JavaScript (javascript)

and list the subjects in the DebeziumStream again, we should see that “postgres.public.organization” now contains three messages.

Configure Debezium to monitor for changes in an additional table

As noted in the introduction of this blog post, in the last part of this exercise, we will go through the steps necessary to configure Debezium to monitor for the changes in an additional table (named country) located in the separate schema (geodata).

First shutdown Debezium and NATS JetStream:

docker-compose down

Then, connect again to the test database with the administrative user and execute the following:

CREATE ROLE geodata NOSUPERUSER NOCREATEDB NOCREATEROLE INHERIT LOGIN PASSWORD 'geodatapwd';
-- setup geodata user ---
GRANT ALL PRIVILEGES ON DATABASE test TO geodata;
CREATE SCHEMA IF NOT EXISTS AUTHORIZATION "geodata";Code language: JavaScript (javascript)

Now, reconnect to the test database as the geodata user we just created and execute the following SQL statements:

CREATE TABLE geodata.country (
	id int8 NOT NULL GENERATED BY DEFAULT AS IDENTITY( INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START 1 CACHE 1 NO CYCLE),
	"name" varchar(255) NOT NULL,
	code varchar(20) NOT NULL,
	code_a_2 varchar(2) NOT NULL,
	code_a_3 varchar(3) NOT NULL,
	flag varchar(255) NULL,
	active bool NOT NULL DEFAULT true,
	CONSTRAINT country_pkey PRIMARY KEY (id)
);

CREATE TABLE geodata.currency (
	id int8 NOT NULL GENERATED BY DEFAULT AS IDENTITY( INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START 1 CACHE 1 NO CYCLE),
	"name" varchar(255) NOT NULL,
	code varchar(20) NOT NULL,
	num_code varchar(3) NOT NULL,
	preferred bool NOT NULL,
	CONSTRAINT currency_pkey PRIMARY KEY (id)
);Code language: PHP (php)

It will create two new tables (country and currency) in the geodata schema, both owned by the geodata user. For Debezium to monitor for changes in the country table, we need to transfer its ownership to the previously created replication role: REPLICATION_GROUP_TEST.

GRANT USAGE ON SCHEMA geodata TO debezium;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA geodata TO debezium;
GRANT REPLICATION_GROUP_TEST TO geodata;
ALTER TABLE geodata.country OWNER TO REPLICATION_GROUP_TEST;Code language: CSS (css)

In the end, you should have a database with the two schemas (public and geodata), each with several tables and with the ownership as displayed below:

test=# \dt public.*
                   List of relations
 Schema |     Name     | Type  |         Owner          
--------+--------------+-------+------------------------
 public | organization | table | replication_group_test
(1 row)

test=# \dt geodata.*
                  List of relations
  Schema  |   Name   | Type  |         Owner          
----------+----------+-------+------------------------
 geodata  | country  | table | replication_group_test
 geodata  | currency | table | geodata
(2 rows)Code language: PHP (php)

We need to populate tables in the geodata schema to have some initial data. We can do that by running the following two sql scripts:

Finally, add geodata.country table to the list of tables monitored for data changes by the Debezium service by modifying the last line in the application.properties file. 

debezium.source.table.include.list=public.organization,geodata.countryCode language: PHP (php)

All we need to do now is again start Debezium and NATS JetStream services:

docker-compose up

And check the NATS stream for new messages. If all went well, you should see a similar output, signaling 248 new messages with postgres.geodata.country subject.

nats stream subjects DebeziumStream
  postgres.public.organization: 3 	postgres.geodata.country: 248Code language: CSS (css)

Conclusion

This blog post has just scratched the surface of the CDC with Debezium. We have showcased the basic setup of PosgtreSQL, Debezium and NATS, and due to the length of this blog, the topic of consuming the data change event messages has been left out, to be covered in some future blog. 

Debezium is a really powerful platform with dozens of available configuration options. For a more in-depth look at all the options and capability, be sure to consult official documentation which is both extensive and comprehensive.

CONTACT US

Exceptional ideas need experienced partners.