How to set up a CDC pipeline to capture and analyze real-time database changes with Parseable

How to set up a CDC pipeline to capture and analyze real-time database changes with Parseable

Databases are critical for any application. Data constantly gets updated, inserted, and deleted. In most of cases it is important for the business to keep track of these changes due to security concerns, auditing requirements, and to keep other relevant systems up to date.

Change Data Capture (CDC) has gained popularity, precisely to address this problem. CDC is a technique used to track all changes in a database and capture them in destination systems. Debezium is a popular CDC tool that leverages database logs as the source of truth, and streams the changes to Kafka and compatible systems like Redpanda.

The key thing to note here is that while databases are typically mutable, the changes captured by the CDC system should be immutable. Only then users have a system of record that can be trusted. Parseable is built for immutable logs and streams of events and data. It is a great fit for CDC targets. The way we designed Parseable allows for long term retention and querying of logs, and it also provides a rich set of tools for analysis and monitoring.

In this tutorial, we use Redpanda, PostgreSQL, and Debezium to ingest CDC events into Parseable.

cdc-pipeline-image

If you prefer videos, we've also created a video tutorial to help you get started with CDC pipeline setup.

Prerequisites

  • Docker should be installed on your system.

  • You need at least 4GB memory and 30GB disk space available.

Set up Docker Compose

For ease of reproducibility, we'll use Docker Compose to set up the components of our CDC pipeline. Here are the tools we'll use in this blog.

  • PostgreSQL: The database which will be the source of our data changes.

  • Debezium: A CDC tool that captures changes from our database.

  • Redpanda & Redpanda Connect A Kafka-compatible event streaming platform that will act as our message broker. Redpanda Connect enables you to connect with 20+ different connectors.

  • Parseable: Our final destination for log ingestion and analysis.

For the first step, let's set up the tools and ensure they are able to communicate with each other. We'll then dive into the configuration of each component.

Run the following commands to set up the Docker Compose environment:

export DEBEZIUM_VERSION=2.7
wget https://raw.githubusercontent.com/parseablehq/blog-samples/main/redpanda-debezium-cdc-pipeline/docker-compose.yaml
docker compose -f docker-compose.yaml up -d
docker ps

This will setup PostgreSQL, Redpanda, and Parseable in your local environment. If everything is set up correctly, you should see the relevant containers running in docker ps output.

Debezium Connect

We'll now configure Debezium to capture changes from our database. Here we're using the Debezium connector for PostgreSQL.

Create a file named register-postgres.json with the following content:

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "postgres",
        "database.history.kafka.bootstrap.servers": "redpanda-0:9092",
        "database.history.kafka.topic": "dbserver1.inventory.customers",
        "topic.prefix": "dbserver1",
        "schema.include.list": "inventory"
    }
}

This file defines how Debezium will connect to the Postgres database and stream changes. The database.history.kafka.topic parameter specifies the topic where schema changes are recorded, ensuring that changes to database schemas are also tracked.

To set up the connector, use the following command:

curl -X POST -H "Content-Type: application/json" --data @register-postgres.json http://localhost:8083/connectors

This command registers, enabling it to start capturing and forwarding changes from PostgreSQL to Redpanda.

Parseable

Parseable is a powerful yet flexible tool for log ingestion and analysis. It allows us to capture, store, and analyze the data changes captured by our CDC pipeline. Your Docker Compose setup will instance the Parseable container under the same service. You can access the Parseable dashboard at http://localhost:8000.

You can navigate to the Parseable dashboard from the left-hand corner where you will find an option to create stream. Create a stream called cdcpipeline.

create-stream

Redpanda Connect

We're using Redpanda Connect to stream CDC events from Redpanda to Parseable. A Redpanda Connect stream pipeline is configured with a single config file. But before that, we need to login into our Redpanda instance. You can use the command:

docker exec -u root -t -i redpanda-0 /bin/bash

Once logged in, download the config file with the command:

curl https://raw.githubusercontent.com/parseablehq/blog-samples/main/redpanda-debezium-cdc-pipeline/connect.yaml > connect.yaml

config.yaml has details of input and output processors for Redpanda. The input is from Debezium as it broadcasts changes in database as a message in the topic. The output is HTTP Connector for sending out data to Parseable.

Now, run the config setup in the Redpanda instance:

rpk connect run connect.yaml

Once you run the above command, your terminal should show that the http_client is active and running!

root@eab0e4c20d78:/# rpk connect run connect.yaml
INFO Running main config from specified file       @service=benthos benthos_version=24.1.9 path=connect.yaml
INFO Launching a Redpanda Connect instance, use CTRL+C to close  @service=benthos
INFO Listening for HTTP requests at: http://0.0.0.0:4195  @service=benthos
INFO Output type http_client is now active         @service=benthos label=parseableout path=root.output
INFO Input type stdin is now active                @service=benthos label="" path=root.input

Make sure this is running in the background. Only then exit the Redpanda instance.

Test the CDC pipeline

Now that we've set up our new CDC pipeline, it's time for the test. Let's see the pipeline in action!

  1. Start up your psql client and let's make some changes.

First, let's log into our psql client.

docker exec -it cdc-quickstart-postgres-1 env PGOPTIONS="--search_path=inventory" bash -c 'psql -U $POSTGRES_USER postgres'

Once logged in, navigate to your current database using the command \c

Now, let's add a new row:

INSERT INTO inventory.customers (id, first_name, last_name, email) VALUES (10111, 'Shantanu', 'Sharma', 'shantanu@acme.com');

Now, head over to Parseable dashboard and check for the received data. If you're seeing these events pop up at your dashboard, congratulations! You've successfully set up a real-time CDC pipeline.✨

Additional Tips

Use this to keep an eye on your Redpanda Connect logs:

docker logs redpanda-0 -f

This command will give you a live feed of what's happening in your pipeline. It's particularly useful for catching any hiccups in real-time.

Use Cases

  • Real-Time Data Synchronization: This setup ensures that your applications can always access the latest data, which is crucial for applications that require up-to-date information.

  • Security Monitoring: Tracking changes to your database can help you identify and respond to security threats, ensuring that your data remains secure.

  • Data Migration: CDC can facilitate the migration of data from legacy systems to modern platforms, minimizing downtime and data loss.

  • Auditing and Compliance: By maintaining a log of all database changes, you can meet compliance requirements and perform thorough audits when necessary.

Conclusion

In this tutorial, you are able to successfully build a CDC pipeline that captures changes from MySQL, streams them through Redpanda, and ingests them into Parseable. This setup provides real-time data synchronization, opening up a world of possibilities for data analysis and system integration.

To see Parseable in action, watch this YouTube video. Get started with Parseable in just a single command.