Blog
Blog /

Postgres 15: Logical Decoding Row Filters With Debezium

Gunnar Morling
Decodable

Since logical decoding was added to Postgres in version 9.4, this powerful feature for capturing changes from the write-ahead log of the database has been continuously improved. Postgres 15, released in October this year, added support for fine-grained control over which columns (by means of column lists) and rows (via row filters) should be exported from captured tables. This means, in relational terminology, projections and filters are now natively supported by Postgres change event publications.

Reasons for specifically configuring which columns and rows should be contained in a change data stream are manifold:

  • Excluding large columns (say, a binary column with image data) can significantly reduce the size of change events and thus the required network bandwidth
  • Excluding columns or rows with sensitive data can be necessary in order to satisfy privacy requirements, when for instance Personally Identifiable Information (PII) shouldn’t be exposed to external systems
  • Filtering published rows by tenant id can be useful for setting up tenant-specific change streams in a multi-tenant architecture

Before the advent of Postgres-native column lists and row filters, users of Debezium – a popular open-source platform for change data capture (CDC), which also is used by several Decodable CDC connectors – would typically have implemented these kinds of use cases via a combination of configuration options and single message transformations (SMTs).

Projections are supported in Debezium via the column.include.list and column.exclude.list options. These configuration options are applied client-side, i.e. within the Debezium connector, which makes them less efficient to server-side column lists, potentially causing large amounts of data to be streamed to Debezium, only to be discarded there.

Filters are a bit more involved: while there is built-in support for filtering the contents of initial and ad-hoc incremental snapshots, filtering change events emitted from the WAL requires a custom SMT. Pushing this logic into the logical replication mechanism of the database itself makes a lot of sense from a usability and efficiency perspective.

So let’s see how Postgres 15 row filters can be used together with Debezium. Initially, I meant to demonstrate the usage of column lists, too. But in the course of exploring that feature, I discovered a bug in Postgres which causes incorrect events to be emitted for UPDATE and DELETE statements when column lists are present. So this will have to wait for another time. The Postgres community took care of this super fast: a bug fix has already been applied, so that column lists should work as expected in the next Postgres release.

Using Logical Decoding Row Filters With Debezium

To follow along, check out the postgres-publication-filtering demo project from GitHub. It contains a Docker Compose file for running Postgres as well as Apache Kafka and Kafka Connect with Debezium::

That Postgres example container image contains a table products with the following schema:

Let’s set up a change event stream for that table which only contains events if the quantity of the given product item is below 10. We could then for instance envision a microservice which subscribes to that stream and places backfill orders with our suppliers for those products.

Row filters are configured via Postgres publications, as used with the pgoutput logical decoding plug-in. As Debezium can only create publications with the default settings (at least for now), you need to manually create a custom publication with the required configurations and have Debezium make use of it. To do so, launch a Postgres session via pgcli:

Then create a publication like so:

As of Postgres 15, the CREATE PUBLICATION statement allows you to narrow down the events to be emitted for a given table via a custom WHERE clause. A few conditions apply to that clause (see this post for more information), most importantly:

  • If the publication publishes UPDATE or DELETE events, only columns which are part of the table’s replica identity may be referenced
  • Only simple expressions are allowed, for example not referring to user-defined functions or types, system columns etc.

That’s all we need to do on the Postgres side. Now let’s take a look at the required Debezium configuration:

As the Postgres publication only is used when Debezium retrieves change events via logical decoding from the WAL, you also need to customize the SELECT statement used for the products table when snapshotting the table. Otherwise, you’d get snapshot events for all the rows of that table, no matter what their quantity is. This can be done via the following configuration:

Altogether, the connector configuration looks like this:

Now register a connector instance with this configuration. If you have kcctl 🧸 installed (which I highly recommend), that’s as simple as that:

Alternatively, use curl to post the configuration directly to Kafka Connect’s REST API:

Observing Filtered Change Events

Return to your Postgres session and display the contents of the products table:

Out of those nine product items, only those with a quantity of less than ten show up as snapshot events in the corresponding Kafka topic:

Now let’s do some data changes and observe the resulting change events, as retrieved from the database via logical decoding. First, insert a few records into the table:

Nothing too exciting is happening in the Kafka topic: as you would expect, only events for the deck chair and the lamp products show up in Kafka, but not for the paint item, as its quantity is larger than 10. Things get a bit more interesting when doing some updates:

The following three events are emitted to Kafka for those:

Note not all of them have the u (update) operation type, but some are c (create) and d (delete) events. The logic here is that the publication works from a perspective of looking at the row set specified via the WHERE clause for the table. In that light,

  • An update event is emitted for the deck chair quantity update from 7 to 6
  • No event event is emitted for the paint quantity update from 15 to 14, as that row is not part of this row set before and after the change
  • A create event is emitted for the paint quantity update from 14 to 9, as that row now became a part of the row set
  • A delete event is emitted for the lamp quantity update from 3 to 11, as that row now is not a part of the row set any longer

Finally, let’s delete some product items:

In the Kafka topic you can observe that no change event is emitted for the first deletion (as there’s 11 lamps in stock). But there is an event for the deletion of the deck chair record with a quantity of six.

As they say, a picture is worth a thousand words (and I’d never pass on an opportunity for using my favorite tool Excalidraw), so here is an overview of the published events, depending on the specifics of a given data change:

Wrap-Up

Row filters (and column lists) are a great addition to the Postgres logical decoding toolbox. Having fine-grained control over which change events should be published and which field they should contain, opens up many interesting opportunities from a perspective of efficiency and data privacy as well as the ability to set up content specific change data streams, as demonstrated in the example above.

Going forward, a good next step usability-wise would be for Debezium to apply any configured row filters and column lists to the Postgres publications it creates, simplifying things for users a bit. As far as Flink SQL and Decodable are concerned, row filters and column lists potentially allow for the push down of filter and projection operators of streaming SQL queries; Instead of applying these operators within the Flink stream processing engine, SELECT and WHERE clauses of queries could be re-written transparently and these operators executed as part of the logical replication publication within Postgres itself. Flink supports this kind of push down of logic into data sources via the SupportsFilterPushDown and SupportsProjectionPushDown extension points. For example, this could be very interesting to customers who don't want specific segments of their data to leave the realm of their database. Please reach out to us if you think this would be an interesting capability to have.

If you would like to get started with your own experimentations around 15 Postgres row filters using Debezium, you can find the complete source code of the example shown above in this repository on GitHub. You can find more information about row filters in this blog post; also refer to this post to learn more about this and other new features related to logical replication in Postgres 15.

Many thanks to Robert Metzger for his feedback while writing this post!

Gunnar Morling, Former Project Lead For Debezium, Joins Decodable

Gunnar Morling, former project lead for Debezium, has joined Decodable's engineering team, boosting our open source and change data capture (CDC) capabilities. Gunnar brings a wealth of experience as a Java Champion and will also contribute to Decodable's work with Apache Flink and the open source developer community at large.

Learn more

New CDC connectors for MySQL and PostgreSQL: Don’t Let Your Data be a Day Late and a Dollar Short

Decodable’s support for CDC (Change Data Capture) advanced further with the release of source CDC connectors for MySQL and PostgreSQL. Source CDC connectors convert traditional database tables into sources of streaming data for processing and delivery to sink systems via Decodable. This blog explains more about CDC with demos for the CDC connectors and Decodable's change processing capabilities.

Learn more

Real Time Streaming Joins With SQL

Decodable enables continuous SQL joins across streaming sources including streaming systems like Kafka, Pulsar and Kinesis as well as databases with Change Data Capture (CDC). This is exactly the use-case for real-time joins. In this blog we'll explore how this works, and how it helps bring siloed data into play in low-latency streaming applications.

Learn more

What is Change Data Capture?

Change Data Capture (CDC) is the idea of capturing changes made to data in a database and then delivering those change events in real-time to a downstream process or system. In this blog we'll break down why this is useful and how to build a system to process CDC streams.

Learn more

Configuring mTLS for Apache Kafka

In this post, we will step through the process of configuring a single node Apache Kafka cluster with mTLS or mutual TLS. Then we will configure a Decodable connection to read data from that secured Apache Kafka cluster.

Learn more

Heading

Lorem ipsum dolor sit amet, consectetur adipiscing elit. Suspendisse varius enim in eros elementum tristique. Duis cursus, mi quis viverra ornare, eros dolor interdum nulla, ut commodo diam libero vitae erat. Aenean faucibus nibh et justo cursus id rutrum lorem imperdiet. Nunc ut sem vitae risus tristique posuere.

Learn more
Tags
Pintrest icon in black
CDC

Start using Decodable today.