5 min read

Change Data Capture (CDC) for PostgreSQL from Scratch

Ever wanted to implement your own Change Data Capture (CDC) tool for PostgreSQL in Python? Here's a quick getting started guide.
Change Data Capture (CDC) for PostgreSQL from Scratch

Change Data Capture (CDC) is a technique used to identify and capture changes made to data in a database. For PostgreSQL, one of the most efficient CDC methods is log-based CDC, which relies on the database's Write-Ahead Log (WAL). This article will delve into the technical details of implementing CDC for PostgreSQL, covering the basics of log-based CDC, the replication protocol, and a simple Python application to tail the WAL that you can run yourself.

Log-based CDC for PostgreSQL

PostgreSQL uses a Write-Ahead Log (WAL) to ensure data integrity. The WAL is a sequence of records describing changes made to the database. These changes are written to the WAL before they are applied to the actual data files, hence the name "Write-Ahead."

source: https://martinfowler.com

Key concepts of log-based CDC in PostgreSQL:

  1. WAL Segments: The WAL is divided into segments, typically 16MB.
  2. LSN (Log Sequence Number): A unique identifier for each WAL record.
  3. Replication Slots: Mechanisms to keep track of which WAL segments are still needed by consumers.

WAL Record Types

PostgreSQL WAL records can represent various database operations:

  • INSERT: Adding a new row
  • UPDATE: Modifying an existing row
  • DELETE: Removing a row
  • COMMIT: Finalizing a transaction
  • ROLLBACK: Abandoning a transaction

Example WAL record (simplified):

LSN: 0/16B5D68
rmgr: Heap
len: 65
prev: 0/16B5D30
desc: INSERT: rel 1663/16384/1255 tuple: (1, "example")

Let's dissect the fields a bit:

LSN: 0/16B5D68

  • LSN stands for Log Sequence Number.
  • The format is typically two hexadecimal numbers separated by a slash.
  • In this case, "0" is the timeline ID, and "16B5D68" is the byte position within the WAL.

rmgr: Heap

  • This indicates the resource manager responsible for this record. "Heap" refers to the main data storage area in PostgreSQL where table data is kept.
  • Other possible values could include "Transaction", "CLOG" (Commit Log), etc.

len: 65

  • This is the length of the WAL record in bytes. In this case, the record is 65 bytes long.

prev: 0/16B5D30

  • This is the LSN of the previous WAL record. It allows for backward chaining through the WAL.

desc: INSERT: rel 1663/16384/1255 tuple: (1, "example")

  • This is a description of the operation recorded in this WAL entry.
  • INSERT: Indicates that this is an insert operation.
  • rel 1663/16384/1255:
    • This identifies the relation (table) affected. Format: database OID / tablespace OID / table OID.
  • tuple: (1, "example")
    • This shows the actual data being inserted. In this case, it's a tuple with two values: an integer (1) and a string ("example").

PostgreSQL Replication Protocol

source: www.fatihacar.com

PostgreSQL supports physical and logical replication.

Physical replication copies the entire database cluster at the block level, ideal for high availability but inflexible. It's faster but limited to identical PostgreSQL versions.

Logical replication operates at the table level, allowing selective replication and cross-version compatibility. It's more flexible, supporting data transformation and integration, but slightly slower due to WAL decoding.

Physical replicates everything with minimal setup, while logical offers granular control but requires more configuration.

The PostgreSQL replication protocol is a communication mechanism that allows clients to receive real-time updates from a PostgreSQL server's WAL. Here's a super high-level explanation:

  1. Connection: The client establishes a connection to the PostgreSQL server using replication credentials.
  2. Initialization: The client sends a START_REPLICATION command, specifying a starting Log Sequence Number (LSN).
  3. Streaming: The server begins streaming WAL records to the client. These records contain all database changes since the specified LSN.
  4. Feedback: The client periodically sends feedback to the server, acknowledging received data and allowing the server to manage resources.
  5. Decoding: For logical replication, WAL records are decoded into a logical format using an output plugin (e.g., pgoutput, wal2json).
  6. Heartbeats: The server sends periodic heartbeats to maintain the connection, even when there are no new WAL records.
  7. Error Handling: The protocol includes mechanisms for handling network issues and reconnecting without data loss.

Establishing a Replication Connection

It's fairly easy to establish a replication connection:

  1. Connect to PostgreSQL with replication privileges.
  2. Send a START_REPLICATION command with the starting LSN.
  3. Receive and process WAL records.

We'll also need a replication slot. A replication slot in PostgreSQL is a persistent mechanism that tracks replication progress and prevents premature removal of Write-Ahead Log (WAL) segments.

It can be physical or logical, it survives server restarts and is crucial for Change Data Capture (CDC) setups. Replication slots ensure data consistency during replication but require management to avoid excessive disk usage.

They can be created, viewed, and dropped via SQL commands, providing a reliable way to maintain replication streams.

Example SQL to create a replication slot:

SELECT * FROM pg_create_logical_replication_slot('my_slot', 'test_decoding');

Python Application to Tail the WAL

Now, let's create a simple Python application that can tail the WAL using the psycopg2 library. If you are interested in the lower-level implementation, just check out the source code for the functions we'll use.

First, install the required library:

pip install psycopg2-binary

Here's the Python script:

import psycopg2
import psycopg2.extras
import sys

def main():
    conn = psycopg2.connect(
        "dbname=your_database user=your_user password=your_password host=your_host",
        connection_factory=psycopg2.extras.LogicalReplicationConnection
    )

    cur = conn.cursor()

    # Create a replication slot if it doesn't exist
    try:
        cur.create_replication_slot('python_slot', output_plugin='test_decoding')
    except psycopg2.errors.DuplicateObject:
        print("Replication slot 'python_slot' already exists")

    # Start replication
    cur.start_replication(slot_name='python_slot', options={'pretty-print': 1}, decode=True)

    def process_wal(msg):
        print(f"LSN: {msg.data_start}")
        print(f"Data: {msg.payload}")
        msg.cursor.send_feedback(flush_lsn=msg.data_start)

    print("Starting replication, press Ctrl+C to stop")
    try:
        cur.consume_stream(process_wal)
    except KeyboardInterrupt:
        print("Stopping replication")
    finally:
        conn.close()

if __name__ == "__main__":
    main()

This script does the following:

  1. Connects to the PostgreSQL database using a logical replication connection.
  2. Creates a replication slot if it doesn't exist.
  3. Starts replication from the created slot.
  4. Processes incoming WAL records, printing the LSN and payload.
  5. Sends feedback to the server to acknowledge receipt of the records.

To test this script:

  1. Ensure your PostgreSQL server is configured for logical replication (wal_level = logical in postgresql.conf).
  2. Run the script.
  3. In another session, make some changes to the database.

You should see output similar to this:

Starting replication, press Ctrl+C to stop
LSN: 0/16B5D68
Data: BEGIN
LSN: 0/16B5DA0
Data: table public.test_table: INSERT: id[integer]:1 data[text]:'example'
LSN: 0/16B5E38
Data: COMMIT

And that's pretty much it! 🎉

Advanced Concepts

Decoding Plugins

PostgreSQL uses output plugins to decode WAL data into a specific format. The test_decoding plugin used in our example provides a simple text representation. For production use, consider using more robust plugins like wal2json or pgoutput.

Example using wal2json:

cur.create_replication_slot('python_slot', output_plugin='wal2json')
cur.start_replication(slot_name='python_slot', options={'format-version': 2}, decode=True)

Handling Large Transactions

For large transactions, you may need to implement a more sophisticated feedback mechanism:

last_status = time.time()
written_lsn = 0

def process_wal(msg):
    global last_status, written_lsn
    # Process message...
    written_lsn = msg.data_start
    
    now = time.time()
    if now - last_status > 10:
        msg.cursor.send_feedback(write_lsn=written_lsn)
        last_status = now

Resuming Replication

To resume replication after a disconnect, store the last processed LSN and use it when restarting:

cur.start_replication(slot_name='python_slot', start_lsn=last_processed_lsn, options={'pretty-print': 1}, decode=True)

Conclusion

In this short article, we covered the fundamentals of Change Data Capture for PostgreSQL, including log-based CDC, the replication protocol, and a basic Python implementation.

While the provided example is a starting point, a production-ready CDC system would require many many additional features such as sophisticated error handling, connection retries, handling WAL bloat, and efficient data processing and caching mechanisms.