Back
November 20, 2023
12
min read

“Change Data Capture Breaks Encapsulation”. Does it, though?

By
Gunnar Morling
Share this post

Having worked on Debezium—an open-source platform for Change Data Capture (CDC)—for several years, one concern I’ve heard repeatedly is this: aren’t you breaking the encapsulation of your application when you expose change event feeds directly from your database? After all, CDC exposes your internal persistent data model to the outside world, which may have unintended consequences, e.g. in terms of data exposure but also when it comes to changes to the schema of your data, which may break downstream consumers. 

In this blog post I am going to dive into this problem space, discuss when—and when not—CDC can break encapsulation, whether it matters, and explore strategies for avoiding these problems when it does.

CDC—A Quick Primer

With log-based change data capture—for instance, using Debezium—you can expose realtime change event streams for the tables of your database, sourced from the database’s transaction log. For each executed <span class="inline-code">INSERT</span>, <span class="inline-code">UPDATE</span>, and <span class="inline-code">DELETE</span>, an event is appended to the log, from where it is captured by the CDC tool and propagated to consumers, usually through data streaming platforms such as Apache Kafka or Amazon Kinesis.

These event streams enable a large variety of use cases, such as low-latency data feeds for analytical data stores, cache updates, full-text search indexes, and many more. While there are different alternatives for implementing CDC systems (for instance based on polling for changed rows, or using database triggers), log-based CDC is generally the most powerful and efficient approach and should be preferred whenever possible.

Does CDC Break Encapsulation?

In software design, encapsulation refers to the practice of hiding the implementation details and inner data structures of a component from the outside world, providing access to the component’s functionality and data only through well-defined interfaces. By publishing change event streams for the tables of an application’s database, this encapsulation may be violated.

Over time, several people have touched on this aspect, for instance Chris Riccomini in this blog post and Yaroslav Tkachenko here. The following implications of using using log-based CDC are typically at the center of this discussion:

  • Your table model becomes your API: by default, your table’s column names and types correspond to fields in the change events emitted by the CDC tool. This can yield less-than-ideal event schemas, particularly for legacy applications.
  • Fine-grained events: CDC event streams typically expose one event per affected table row, whereas it can be desirable to publish higher-level events to consumers. An example of this would be wanting one event for one purchase order with all its order lines, even if they are stored within two separate tables in an RDBMS. The loss of transaction semantics in CDC event streams can aggravate that concern, as consumers cannot easily correlate the events originating from one and the same transaction in the source database.
  • Schema changes might break things: Downstream consumers of the change events will expect the data to adhere to the schema known to them. As there is no abstraction between your internal data model and consumers, any changes to the database schema, such as renaming a column or changing its type, could cause downstream event consumers to break, unless they are updated in lockstep.
  • You may accidentally leak sensitive data: a change event stream will, by default, contain all the rows of a table with all its columns. This means that sensitive data which shouldn’t leave the security perimeter of your application could be exposed to external consumers.

Now, this perhaps sounds a bit scarier than it actually is! In order to understand whether there actually is a problem here or not, it helps to look at how and where your change event streams are consumed. Specifically, do change events permeate multiple bounded contexts (in terminology of Domain-Driven Design)? Are they propagated across system (and team) boundaries, or not?

If a change event stream is consumed within the same context as the source database itself—such as updating an in-memory cache managed by the application or service owning the database—then I would argue that there actually isn’t much to be concerned about. You actually want the data in the cache to match the original data. Similarly, if a change event stream is used for feeding a search index owned and managed by the same team also building the source application itself, aspects like schema changes can be coordinated by the team itself and applied to the database and index together.

Things look different though when crossing context or organizational boundaries. Your change events are consumed by the analytics team sitting at the end of the hallway to whom you only speak once a year? You are using CDC to propagate data changes between microservices created by different parts of your organization? In situations like these, directly exposing change streams from your internal data model indeed may be problematic. Tying these kinds of external consumers to your own data model and its lifecycle can lead to a loss of agility (changes to your model require convoluted and time-consuming change control processes tightly synchronized between different teams) and service disruptions (downstream pipelines and consumers fail due to incompatible schema changes).

Entering Data Contracts

So how do you mitigate these risks of using CDC between bounded contexts and/or teams? The solution is similar to what you’d do for exposing any other remote API—such as REST or gRPC—from your application: you have an API layer which is separate from the internal data model. This layer exposes a service’s functionality and data in exactly the way it’s needed, evolving at its own pace and independently from changes you make to your internal model, with a strong notion of not breaking compatibility in mind.

In databases, views have historically been a proven means of establishing module and system boundaries, for instance providing an explicit interface to traditional pull-based ETL tools. Unfortunately, (non-materialized) database views cannot be exposed via CDC because they don’t operate via the transaction log. But as we’ll see below, there are other ways for setting up separate change streams which don’t directly mirror the raw streams corresponding to the tables of your data model. These public change event streams adhere to their own well-defined and deliberately crafted data contract

Data contracts have been quite the hotness lately, and for good reason. They typically comprise the following aspects as a formal agreement between data providers and consumers:

  • Data schema: describes the structure and format of the events in a stream, i.e. the fields, their names and types, constraints, etc.
  • Data semantics: describe the meaning of event attributes, e.g. units of measurement for numeric values
  • Service level agreements (SLAs): describe qualitative aspects of a data stream, such as mean and maximum latency, event rates, etc.
  • Evolution rules: describe how and when a data contract can be changed, in particular its schema, so that consumers can prepare for any changes without breaking
  • Metadata and policies: describes attributes like the owner of a data contract, what the data can (and cannot) be used for, etc.
  • Examples: show how events adhering to the contract may look like

This “contract” can be expressed and managed using different formats and tools. For instance, data schemas can be defined using JSON Schema, Avro schemas, or ProtoBuf definitions. Another option would be describing your change event streams with help of the AsyncAPI specification. Evolution rules can be managed and enforced using schema registries such as Confluent’s or Apicurio. Other contract elements may lend themselves to a textual representation, perhaps on a wiki page or some other kind of document. But also a more formal, machine-readable contract definition is possible, such as  in the form of YAML (will this ever stop?!), as suggested by the Data Contracts specification.

Regardless of how it’s implemented, the development team building a service from whose database change event streams are exposed should also own any data contracts for these streams. That way, the contracts are part of that team’s product—developed and maintained by that team, just as they would for any other APIs of the service.

Implementation Approaches For Data Contracts

Having established that explicitly designed data contracts are very useful, how can you go about implementing them—specifically event schemas and their evolution—for your CDC events? In the following, I’d like to describe two approaches for doing so: the Outbox Pattern, and stream processing using something like Apache Flink. I’ll also illustrate exactly how data contracts help you address some of the potential encapsulation risks identified earlier on.

The Outbox Pattern

The idea of the Outbox Pattern is that instead of capturing the changes from your internal domain model tables, your application emits bespoke events via a separate table, typically called the outbox table. A CDC tool like Debezium will capture only the events inserted into that outbox table and relay them to any downstream consumers. Very importantly, any actual data changes (e.g. an update to a customer record), and the insertion of the outbox event, must happen in one single database transaction, ensuring atomic all-or-nothing guarantees.

The contract of the outbox events is kept separated from the internal data model, allowing you to expose the data in exactly the way you want to expose it. For instance you could emit a single event for an entire aggregate root which is persisted in multiple tables in your internal model, as shown below:

Fig. 1: The outbox pattern with Debezium (click to enlarge)

But you may also decide to adjust the types of exposed fields, only expose a subset of all your data attributes—giving you the opportunity to omit any sensitive or implementation-specific attributes—and much more. Having a separate contract allows you to evolve it independently from your internal model, too. Say, you rename a column in one of your tables; it’s a conscious decision then to also rename it in the schema of your outbox events (potentially requiring a new major version of the same), or keep it as is.

Debezium comes with powerful support for implementing the Outbox Pattern. This includes a routing component for propagating outbox events to specific topics in Kafka, based on configurable event metadata. If you are on Postgres, an interesting implementation option is logical decoding messages: instead of having a bespoke outbox table, Postgres lets you write arbitrary events solely to its write-ahead log, from where they can be retrieved using CDC. This spares you from implementing your own housekeeping routines (removal of events from the outbox table after they have been captured and sent), as the database itself will discard any obsolete segments of the transaction log automatically.

So, the Outbox Pattern is a rather simple option for implementing data contracts (at least the schema portion) in a reliable way. It avoids unsafe dual writes (e.g. to your service’s database and Apache Kafka), while not exposing your internal data model to external consumers. On the downside, it does require you to modify your service so that it emits the outbox events, and you must make sure to consistently do this for all the write operations of your service. There is a potential performance impact (as you do another insert call in write transactions) and for high-volume use cases, the additional disk space required for storing outbox events in the database could be an issue.

Stream Processing

As we saw above, when we implement data contracts using the Outbox Pattern, the application itself is responsible for forming and emitting the change events that adhere to the contract specified. But what if we can’t—or don’t want to, for whatever reason—change the application to do this? The alternative is to use stream processing to publish change event streams with explicit data contracts after the fact. The idea here is to take the unprocessed table-level change events from a CDC source, process and convert them as needed, and re-publish them as separate streams with defined data contracts.

While simple event transformations can be implemented with stateless tools like Single Message Transforms in Kafka Connect, stateful stream processing engines like Apache Flink provide much greater flexibility and more possibilities. This is particularly relevant when it comes to joining multiple input streams into a single output stream, creating multiple output versions for one input stream, enriching change events with contextual metadata, and more. With its support for SQL, Flink also has the benefit of using a commonly-understood language to define contracts, making them more accessible and maintainable than the kind of non-portable JSON configuration that other tools might provide for data transformation.

The following image shows the overall approach, using Flink SQL to establish a public data contract for a table <span class="inline-code">customers</span> in a Postgres database:

Fig. 2: Stream processing pipeline with Apache Flink (click to enlarge)

Flink CDC is used to ingest the raw CDC feed into a Flink SQL table. The data is transformed and published to a Kafka topic which adheres to a stable data contract (specifically, the topic’s schema represents the schema part of the contract). The source and sink connectors are represented as tables in Flink SQL; here’s the definition of the source table, <span class="inline-code">customers</span>, using the <span class="inline-code">postgres-cdc</span> connector, which itself is based on top of the Debezium connector for Postgres (you can find the complete source code for this blog post in the decodableco/examples repository on GitHub):

CREATE TABLE customers (
  id INT,
  fname STRING,
  lname STRING,
  email STRING,
  street STRING,
  zip STRING,
  city STRING,
  status INT,
  registered TIMESTAMP,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'postgres',
  'port' = '5432',
  'username' = 'postgres',
  'password' = 'postgres',
  'database-name' = 'postgres',
  'schema-name' = 'inventory',
  'table-name' = 'customers',
  'slot.name' = 'customers_replication_slot',
  'decoding.plugin.name' = 'pgoutput'
);

<div style="text-align: center">Listing 1: Configuration of the Postgres source connector.</div>

For publishing the change events into a Kafka topic <span class="inline-code">customers</span>, another table, <span class="inline-code">customers_public</span>, is created which looks like this:

CREATE TABLE customers_public (
  id INT,
  first_name STRING,
  last_name STRING,
  email STRING,
  zip STRING,
  status STRING,
  registration_date STRING,
  PRIMARY KEY (id) NOT ENFORCED
)
WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'customers',
  'properties.bootstrap.servers' = 'kafka:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);

<div style="text-align: center">Listing 2: Configuration of the Kafka sink connector.</div>

Note that the <span class="inline-code">upsert-kafka</span> connector must be used (i.e. not the <span class="inline-code">kafka</span> one), as I’ve discussed recently in this Data Streaming Quick Tip episode. Finally, the transformation from the source stream into the published format is a simple <span class="inline-code">INSERT</span> statements, using just a few lines of Flink SQL:

INSERT INTO customers_public
  SELECT
    id,
    fname,
    lname,
    email,
    zip,
    phone,
    CASE
      WHEN status = 1 THEN 'NEW'
      WHEN status = 2 THEN 'VIP'
      WHEN status = 3 THEN 'BLOCKED'
      ELSE 'STANDARD'
    END,
    DATE_FORMAT(registered, 'dd-MM-yyyy')
  FROM customers;

<div style="text-align: center">Listing 3: Flink SQL job for transforming the source table into the sink table.</div>

When submitting this query, a job will be deployed onto your Flink cluster which executes the query in a continuous fashion, propagating any changes from the source to the destination, whenever there is a row inserted, updated, or deleted in the source table. The following transformations are applied:

  • Renamings: The names of some fields are changed, e.g. from <span class="inline-code">fname</span> to <span class="inline-code">first_name</span> (by means of inserting the selected values into fields with the desired alternative names in the sink table)
  • Omissions: Some sensitive fields are excluded from the published stream (all address fields besides <span class="inline-code">zip</span>)
  • Type changes: The <span class="inline-code">registered</span> field is changed from <span class="inline-code">TIMESTAMP</span> to <span class="inline-code">STRING</span>
  • Value conversions: Instead of the original numeric constants, corresponding string labels are emitted for the <span class="inline-code">status</span> field

All these transformations are projections generally speaking. But depending on your requirements, you could take things much further and for instance:

  • apply filters using a <span class="inline-code">WHERE</span> clause, excluding test accounts or logically deleted customer records from the change event stream,
  • join multiple change event streams into a single one, which becomes particularly useful when publishing change events for one aggregate which is persisted in multiple tables in a relational database (see further below for an example for that),
  • add derived fields, e.g. with a customer’s full name, and much more.

Instead of publishing a single data contract for a table like the <span class="inline-code">customers</span> table in the example above, you also may decide to create multiple streams with slightly different contracts, geared towards different consumers and use cases. For instance, you may have two public customer change event streams, one with address data (which can only be accessed by a small number of authorized clients) and one without (which would be accessible by a larger number of clients).

There is an interesting tension here between defining contracts which are widely applicable vs. contracts which are optimized for specific consumers. A useful guiding principle for resolving this tension could be “As general as possible, as specific as needed”. This means that, for instance, you would include the widest set of fields possible in a change stream’s data contract by default, only keeping sensitive data exclusive to streams for specific privileged consumers. Other kinds of filtering on the other hand—such as filtering out any backfilling events—would be the responsibility of individual clients, based on their specific requirements.

As far as filtering specific rows or columns is concerned, you have different options. You could ingest everything into Flink but only publish a subset, as shown in the example above. This would be useful when you want to have the option to publish multiple variants of a data contract, as just discussed. But you also could decide to omit specific sensitive fields in the Flink source table definition, thus making sure they never can be part of any published data contracts. Depending on your database, you may even exclude specific data from the ingested change stream altogether. As an example, Postgres supports the definition of column lists and row filters, providing fine-grained control over the contents of any logical replication streams, thus helping to reduce network traffic and any potential cost associated with it.

Streaming Data Contracts—Beyond the Basics

Once you have embarked onto your journey of creating data contracts with stateful stream processing, the sky's the limit, and you have all kinds of interesting related capabilities at your disposal.

So let’s discuss how to expose a complexly structured change event stream, derived from two tables in the source database. Imagine that the domain model from the example above is changed so that a customer can have multiple phone numbers, instead of just a single one. To model that, instead of having a phone column within the <span class="inline-code">customers</span> table, let’s assume that there’s a separate table <span class="inline-code">phone_numbers</span>, with a 1:n relationship between the two tables. 

A stream processing job could then be used to join the two tables, emitting the phone numbers as part of the data contract for the customer change event stream. That way, instead of having to deal with two table-level streams, consumers would be able to ingest all the data pertaining to a customer from one single stream, independent from how that data is organized in the source database. To make things a bit more interesting, let’s emit one of the numbers (the customer’s preferred one) via its own field <span class="inline-code">phone</span>, and all the others via an array-typed field <span class="inline-code">further_phones</span>:

Fig. 3: Joining two change event streams (click to enlarge)

A Flink SQL user-defined function (UDF) for aggregating the elements of the “many” side of the join into an array in a type-safe way can come in handy here, something I’ve recently explored in this video. Using the <span class="inline-code">ARRAY_AGGR()</span> function discussed in that quick tip, the two source tables could be joined like this:

INSERT INTO customers_public
  SELECT
    c.id,
    c.fname,
    c.lname,
    c.email,
    c.zip,
    preferred.`value`,
    ARRAY_AGGR(further_phones.`value`),
    CASE
      WHEN status = 1 THEN 'NEW'
      WHEN status = 2 THEN 'VIP'
      WHEN status = 3 THEN 'BLOCKED'
      ELSE 'STANDARD'
    END,
    DATE_FORMAT(registered, 'dd-MM-yyyy')
  FROM
    customers c
    LEFT JOIN
      (SELECT * FROM phone_numbers WHERE preferred = true) preferred
      ON c.id = preferred.customer_id
    LEFT JOIN
      (SELECT * FROM phone_numbers WHERE preferred = false) further_phones
      ON c.id = further_phones.customer_id
  GROUP BY
    c.id, fname, lname, email, zip, c.phone, registered, status, preferred.`value`;

<div style="text-align: center">Listing 4: Joining two source change streams into a single public stream.</div>

The <span class="inline-code">phone_numbers</span> table is left-joined here twice, once to obtain the preferred phone number and once for all the non-preferred numbers, which then are exposed as an array via the aforementioned <span class="inline-code">ARRAY_AGGR()</span> function.

But you also can go beyond the pure needs of data contracts themselves. One example is the expansion of partial change events: <span class="inline-code">UPDATE</span> events emitted by the Debezium connector for Cassandra only contain those fields of a record whose values actually changed, whereas any unchanged values are not contained. Similarly, the Postgres connector won’t emit values for unchanged TOAST columns (large column values stored by the database in a specific way). If a consumer only supports full record updates, it won’t be easily able to process such partial change events. This could be addressed by implementing a job for publishing a data contract with Flink’s DataStream API which leverages a state store for expanding any partial events into full ones, retrieving any missing field values from the store.

Another very interesting option would be taking advantage of the metadata emitted by Debezium for transaction boundaries; with this information you could implement buffering logic for emitting change events originating from one and the same transaction only when all the events from that transaction have been ingested, which is particularly useful when joining multiple raw change event streams into a single one.

Handling Schema Changes

As the saying goes: Nothing is permanent except change. It’s only a question of time until new columns are added to your application’s data schema, existing ones are renamed or removed, or their types get changed. With explicitly defined data contracts in place, you have taken the first step for making sure that any changes to your internal data schema do not directly affect the consumers of your change event streams.

From a procedural perspective, it’s important that the team owning and publishing a data contract can apply changes to the contract without having to synchronize with any event consumers, who perhaps may not even be known to the upstream team. At the same time, any changes to the contract should not break existing consumers—after a schema change they should be able to continue to process a change event stream based on the previous schema known to them. Of course, they will need to be adjusted eventually, so as to take advantage of the capabilities of a new contract version, such as any added fields. The guarantees around duration of support for particular versions of a contract is something that would be built into them along with the other metadata previously discussed such as SLAs.

This means data contracts for change event streams should be evolved in a forward compatible way, which allows for the addition of new fields and the removal of optional fields, whereas existing non-optional fields may not be removed.

Fig. 4: Producer-driven evolution of a schema (click to enlarge)

To learn more about the guidelines for schema evolution, I highly recommend referring to Gwen Shapira’s presentation “Streaming Microservices: Contracts & Compatibility”, where she discusses this topic around 16:50 min. A schema registry should be used in order to ensure that any changes to a data contract adhere to these requirements. Before rolling out any data contract changes to production, a CI/CD pipeline would validate any schema changes using the compatibility rules configured in the registry, as for instance described in this excellent blog post by Chad Sanderson and Adrian Kreuziger. Any contract changes which would actually break existing consumers, would fail the build process and thus be prevented from being deployed.

<div class="side-note">Note that evolving data contracts in a forward-compatible manner means that consumers cannot replay any events from the beginning using only the latest schema version. This would fail when, for instance, re-processing an event lacking a non-optional field added in a later schema version. Instead, each event should be processed with the schema version valid at the time when the event was originally created.</div>

Now, how can stream processing help you with managing these kinds of data contract changes? As an example, consider the case of renaming a column within a source table. The schema of the table’s change stream would change correspondingly, whenever the first change event after the name change is ingested. Exposing this schema change as-is to any downstream consumers would be an incompatible change and thus should be avoided. Different options for solving the issue exist:

  • Creating another version of the stream with the new schema (i.e. new field name); both, old and new stream versions, would co-exist, and clients could migrate from the old to the new one at their own pace
  • Expand the schema of the existing stream, so that it contains another field with the new name, next to the existing field with the old name
  • Keep the existing stream schema, i.e. don’t change the field name in the public data contract

In every case, stream processing can be used to apply the required transformations between the source events (containing either old or new field name, depending on the specific stream position) and the published counterparts(s). Let’s see how the last option—completely shielding consumers from that name change—can be implemented with Flink SQL.

Fig. 5: Renaming a column in the source table (click to enlarge)

The key idea is to use Flink’s savepoint mechanism for pausing the job, while applying the required schema changes to the database and the Flink job, making sure the job maps both old and new incoming field names to the existing name in the public contract. The exact sequence of events would be this:

  1. In Flink, stop the job with a savepoint: <span class="inline-code">STOP JOB '&lt;job id&gt;' WITH SAVEPOINT;</span>
    This makes sure the job, after restarting, will continue to process the source change stream from the exact position where it left off, not missing any changes which happened in between.
  2. In the source database, rename the column:
    <span class="inline-code">ALTER TABLE customers RENAME COLUMN fname TO first_name;</span>
  3. In Flink, add a column with the new name to the table, keeping the one with the old name too:
    <span class="inline-code">ALTER TABLE customers ADD first_name STRING;</span>
  4. In Flink, configure the savepoint path:
    <span class="inline-code">SET 'execution.savepoint.path' = '/path/to/savepoints/savepoint-&lt;job id&gt;';</span>
  5. In Flink, create a new version of the job, using the <span class="inline-code">COALESCE()</span> function to retrieve the first name either from the old or the new field, depending on which one exists in the incoming event:
INSERT INTO customers_public
  SELECT
    id,
    COALESCE(fname, first_name),
    ...
  FROM customers;

<div style="text-align: center">Listing 5: Retrieving the first name from the correct column, depending on which value is present</div>

With this procedure, any consumers of the public data contract are fully shielded from the column name change in the database. The job will source the first name from the correct incoming field, no matter whether it processes a change event from before or after the schema change was made in the database.

Note that the correct order of steps is vital here; in particular, the Flink job must be stopped before applying the schema change in the source database. Otherwise, it would not pick up the value from change events emitted after the column has been renamed. Therefore, the development team owning the database schema should also be in charge of the CDC pipeline and the Flink job for creating the public change stream with the data contract.

Beyond renaming columns, also other schema changes can be handled with a stream processing engine. New columns could be added just like above. For dropped <span class="inline-code">NOT NULL</span> columns, the streaming job could omit a sentinel value such as “n/a” to ensure compatibility with existing consumers. Also cardinality changes—for instance, going from a single <span class="inline-code">phone</span> column within the <span class="inline-code">customers</span> table to a separate table with multiple phone numbers per customer, as shown above—are possible by aggregating all the values into a new array-typed field in the public data stream.

Summary

Now, does CDC break data encapsulation? As we’ve seen, the answer to this question is surprisingly nuanced and depends a lot on how and where change event streams are consumed: the key consideration is whether events cross team and/or context boundaries, or not.

In cases where encapsulation is a concern, consciously designed data contracts can be a great tool to shield external consumers of a change event stream from the implementation details of an application’s persistent data model and any changes to its schema. With the help of stream processing, for instance using Apache Flink, you can establish well-defined APIs for your data, resulting in more robust and reliable data pipelines. Flink SQL makes the creation of data contracts a matter of describing the shape of your data with a few lines of SQL, while Flink’s DataStream API can be used for implementing more advanced requirements such as expanding incoming partial change events into full events.

With those tools and corresponding processes in place, you don’t need to be concerned about accidentally exposing your internal data model and changes to the schema of the same breaking your change stream consumers.

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

👍 Got it!
Oops! Something went wrong while submitting the form.
Gunnar Morling

Gunnar is an open-source enthusiast at heart, currently working on Apache Flink-based stream processing. In his prior role as a software engineer at Red Hat, he led the Debezium project, a distributed platform for change data capture. He is a Java Champion and has founded multiple open source projects such as JfrUnit, kcctl, and MapStruct.

Table of contents

Let's Get Decoding

Having worked on Debezium—an open-source platform for Change Data Capture (CDC)—for several years, one concern I’ve heard repeatedly is this: aren’t you breaking the encapsulation of your application when you expose change event feeds directly from your database? After all, CDC exposes your internal persistent data model to the outside world, which may have unintended consequences, e.g. in terms of data exposure but also when it comes to changes to the schema of your data, which may break downstream consumers. 

In this blog post I am going to dive into this problem space, discuss when—and when not—CDC can break encapsulation, whether it matters, and explore strategies for avoiding these problems when it does.

CDC—A Quick Primer

With log-based change data capture—for instance, using Debezium—you can expose realtime change event streams for the tables of your database, sourced from the database’s transaction log. For each executed <span class="inline-code">INSERT</span>, <span class="inline-code">UPDATE</span>, and <span class="inline-code">DELETE</span>, an event is appended to the log, from where it is captured by the CDC tool and propagated to consumers, usually through data streaming platforms such as Apache Kafka or Amazon Kinesis.

These event streams enable a large variety of use cases, such as low-latency data feeds for analytical data stores, cache updates, full-text search indexes, and many more. While there are different alternatives for implementing CDC systems (for instance based on polling for changed rows, or using database triggers), log-based CDC is generally the most powerful and efficient approach and should be preferred whenever possible.

Does CDC Break Encapsulation?

In software design, encapsulation refers to the practice of hiding the implementation details and inner data structures of a component from the outside world, providing access to the component’s functionality and data only through well-defined interfaces. By publishing change event streams for the tables of an application’s database, this encapsulation may be violated.

Over time, several people have touched on this aspect, for instance Chris Riccomini in this blog post and Yaroslav Tkachenko here. The following implications of using using log-based CDC are typically at the center of this discussion:

  • Your table model becomes your API: by default, your table’s column names and types correspond to fields in the change events emitted by the CDC tool. This can yield less-than-ideal event schemas, particularly for legacy applications.
  • Fine-grained events: CDC event streams typically expose one event per affected table row, whereas it can be desirable to publish higher-level events to consumers. An example of this would be wanting one event for one purchase order with all its order lines, even if they are stored within two separate tables in an RDBMS. The loss of transaction semantics in CDC event streams can aggravate that concern, as consumers cannot easily correlate the events originating from one and the same transaction in the source database.
  • Schema changes might break things: Downstream consumers of the change events will expect the data to adhere to the schema known to them. As there is no abstraction between your internal data model and consumers, any changes to the database schema, such as renaming a column or changing its type, could cause downstream event consumers to break, unless they are updated in lockstep.
  • You may accidentally leak sensitive data: a change event stream will, by default, contain all the rows of a table with all its columns. This means that sensitive data which shouldn’t leave the security perimeter of your application could be exposed to external consumers.

Now, this perhaps sounds a bit scarier than it actually is! In order to understand whether there actually is a problem here or not, it helps to look at how and where your change event streams are consumed. Specifically, do change events permeate multiple bounded contexts (in terminology of Domain-Driven Design)? Are they propagated across system (and team) boundaries, or not?

If a change event stream is consumed within the same context as the source database itself—such as updating an in-memory cache managed by the application or service owning the database—then I would argue that there actually isn’t much to be concerned about. You actually want the data in the cache to match the original data. Similarly, if a change event stream is used for feeding a search index owned and managed by the same team also building the source application itself, aspects like schema changes can be coordinated by the team itself and applied to the database and index together.

Things look different though when crossing context or organizational boundaries. Your change events are consumed by the analytics team sitting at the end of the hallway to whom you only speak once a year? You are using CDC to propagate data changes between microservices created by different parts of your organization? In situations like these, directly exposing change streams from your internal data model indeed may be problematic. Tying these kinds of external consumers to your own data model and its lifecycle can lead to a loss of agility (changes to your model require convoluted and time-consuming change control processes tightly synchronized between different teams) and service disruptions (downstream pipelines and consumers fail due to incompatible schema changes).

Entering Data Contracts

So how do you mitigate these risks of using CDC between bounded contexts and/or teams? The solution is similar to what you’d do for exposing any other remote API—such as REST or gRPC—from your application: you have an API layer which is separate from the internal data model. This layer exposes a service’s functionality and data in exactly the way it’s needed, evolving at its own pace and independently from changes you make to your internal model, with a strong notion of not breaking compatibility in mind.

In databases, views have historically been a proven means of establishing module and system boundaries, for instance providing an explicit interface to traditional pull-based ETL tools. Unfortunately, (non-materialized) database views cannot be exposed via CDC because they don’t operate via the transaction log. But as we’ll see below, there are other ways for setting up separate change streams which don’t directly mirror the raw streams corresponding to the tables of your data model. These public change event streams adhere to their own well-defined and deliberately crafted data contract

Data contracts have been quite the hotness lately, and for good reason. They typically comprise the following aspects as a formal agreement between data providers and consumers:

  • Data schema: describes the structure and format of the events in a stream, i.e. the fields, their names and types, constraints, etc.
  • Data semantics: describe the meaning of event attributes, e.g. units of measurement for numeric values
  • Service level agreements (SLAs): describe qualitative aspects of a data stream, such as mean and maximum latency, event rates, etc.
  • Evolution rules: describe how and when a data contract can be changed, in particular its schema, so that consumers can prepare for any changes without breaking
  • Metadata and policies: describes attributes like the owner of a data contract, what the data can (and cannot) be used for, etc.
  • Examples: show how events adhering to the contract may look like

This “contract” can be expressed and managed using different formats and tools. For instance, data schemas can be defined using JSON Schema, Avro schemas, or ProtoBuf definitions. Another option would be describing your change event streams with help of the AsyncAPI specification. Evolution rules can be managed and enforced using schema registries such as Confluent’s or Apicurio. Other contract elements may lend themselves to a textual representation, perhaps on a wiki page or some other kind of document. But also a more formal, machine-readable contract definition is possible, such as  in the form of YAML (will this ever stop?!), as suggested by the Data Contracts specification.

Regardless of how it’s implemented, the development team building a service from whose database change event streams are exposed should also own any data contracts for these streams. That way, the contracts are part of that team’s product—developed and maintained by that team, just as they would for any other APIs of the service.

Implementation Approaches For Data Contracts

Having established that explicitly designed data contracts are very useful, how can you go about implementing them—specifically event schemas and their evolution—for your CDC events? In the following, I’d like to describe two approaches for doing so: the Outbox Pattern, and stream processing using something like Apache Flink. I’ll also illustrate exactly how data contracts help you address some of the potential encapsulation risks identified earlier on.

The Outbox Pattern

The idea of the Outbox Pattern is that instead of capturing the changes from your internal domain model tables, your application emits bespoke events via a separate table, typically called the outbox table. A CDC tool like Debezium will capture only the events inserted into that outbox table and relay them to any downstream consumers. Very importantly, any actual data changes (e.g. an update to a customer record), and the insertion of the outbox event, must happen in one single database transaction, ensuring atomic all-or-nothing guarantees.

The contract of the outbox events is kept separated from the internal data model, allowing you to expose the data in exactly the way you want to expose it. For instance you could emit a single event for an entire aggregate root which is persisted in multiple tables in your internal model, as shown below:

Fig. 1: The outbox pattern with Debezium (click to enlarge)

But you may also decide to adjust the types of exposed fields, only expose a subset of all your data attributes—giving you the opportunity to omit any sensitive or implementation-specific attributes—and much more. Having a separate contract allows you to evolve it independently from your internal model, too. Say, you rename a column in one of your tables; it’s a conscious decision then to also rename it in the schema of your outbox events (potentially requiring a new major version of the same), or keep it as is.

Debezium comes with powerful support for implementing the Outbox Pattern. This includes a routing component for propagating outbox events to specific topics in Kafka, based on configurable event metadata. If you are on Postgres, an interesting implementation option is logical decoding messages: instead of having a bespoke outbox table, Postgres lets you write arbitrary events solely to its write-ahead log, from where they can be retrieved using CDC. This spares you from implementing your own housekeeping routines (removal of events from the outbox table after they have been captured and sent), as the database itself will discard any obsolete segments of the transaction log automatically.

So, the Outbox Pattern is a rather simple option for implementing data contracts (at least the schema portion) in a reliable way. It avoids unsafe dual writes (e.g. to your service’s database and Apache Kafka), while not exposing your internal data model to external consumers. On the downside, it does require you to modify your service so that it emits the outbox events, and you must make sure to consistently do this for all the write operations of your service. There is a potential performance impact (as you do another insert call in write transactions) and for high-volume use cases, the additional disk space required for storing outbox events in the database could be an issue.

Stream Processing

As we saw above, when we implement data contracts using the Outbox Pattern, the application itself is responsible for forming and emitting the change events that adhere to the contract specified. But what if we can’t—or don’t want to, for whatever reason—change the application to do this? The alternative is to use stream processing to publish change event streams with explicit data contracts after the fact. The idea here is to take the unprocessed table-level change events from a CDC source, process and convert them as needed, and re-publish them as separate streams with defined data contracts.

While simple event transformations can be implemented with stateless tools like Single Message Transforms in Kafka Connect, stateful stream processing engines like Apache Flink provide much greater flexibility and more possibilities. This is particularly relevant when it comes to joining multiple input streams into a single output stream, creating multiple output versions for one input stream, enriching change events with contextual metadata, and more. With its support for SQL, Flink also has the benefit of using a commonly-understood language to define contracts, making them more accessible and maintainable than the kind of non-portable JSON configuration that other tools might provide for data transformation.

The following image shows the overall approach, using Flink SQL to establish a public data contract for a table <span class="inline-code">customers</span> in a Postgres database:

Fig. 2: Stream processing pipeline with Apache Flink (click to enlarge)

Flink CDC is used to ingest the raw CDC feed into a Flink SQL table. The data is transformed and published to a Kafka topic which adheres to a stable data contract (specifically, the topic’s schema represents the schema part of the contract). The source and sink connectors are represented as tables in Flink SQL; here’s the definition of the source table, <span class="inline-code">customers</span>, using the <span class="inline-code">postgres-cdc</span> connector, which itself is based on top of the Debezium connector for Postgres (you can find the complete source code for this blog post in the decodableco/examples repository on GitHub):

CREATE TABLE customers (
  id INT,
  fname STRING,
  lname STRING,
  email STRING,
  street STRING,
  zip STRING,
  city STRING,
  status INT,
  registered TIMESTAMP,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'postgres',
  'port' = '5432',
  'username' = 'postgres',
  'password' = 'postgres',
  'database-name' = 'postgres',
  'schema-name' = 'inventory',
  'table-name' = 'customers',
  'slot.name' = 'customers_replication_slot',
  'decoding.plugin.name' = 'pgoutput'
);

<div style="text-align: center">Listing 1: Configuration of the Postgres source connector.</div>

For publishing the change events into a Kafka topic <span class="inline-code">customers</span>, another table, <span class="inline-code">customers_public</span>, is created which looks like this:

CREATE TABLE customers_public (
  id INT,
  first_name STRING,
  last_name STRING,
  email STRING,
  zip STRING,
  status STRING,
  registration_date STRING,
  PRIMARY KEY (id) NOT ENFORCED
)
WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'customers',
  'properties.bootstrap.servers' = 'kafka:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);

<div style="text-align: center">Listing 2: Configuration of the Kafka sink connector.</div>

Note that the <span class="inline-code">upsert-kafka</span> connector must be used (i.e. not the <span class="inline-code">kafka</span> one), as I’ve discussed recently in this Data Streaming Quick Tip episode. Finally, the transformation from the source stream into the published format is a simple <span class="inline-code">INSERT</span> statements, using just a few lines of Flink SQL:

INSERT INTO customers_public
  SELECT
    id,
    fname,
    lname,
    email,
    zip,
    phone,
    CASE
      WHEN status = 1 THEN 'NEW'
      WHEN status = 2 THEN 'VIP'
      WHEN status = 3 THEN 'BLOCKED'
      ELSE 'STANDARD'
    END,
    DATE_FORMAT(registered, 'dd-MM-yyyy')
  FROM customers;

<div style="text-align: center">Listing 3: Flink SQL job for transforming the source table into the sink table.</div>

When submitting this query, a job will be deployed onto your Flink cluster which executes the query in a continuous fashion, propagating any changes from the source to the destination, whenever there is a row inserted, updated, or deleted in the source table. The following transformations are applied:

  • Renamings: The names of some fields are changed, e.g. from <span class="inline-code">fname</span> to <span class="inline-code">first_name</span> (by means of inserting the selected values into fields with the desired alternative names in the sink table)
  • Omissions: Some sensitive fields are excluded from the published stream (all address fields besides <span class="inline-code">zip</span>)
  • Type changes: The <span class="inline-code">registered</span> field is changed from <span class="inline-code">TIMESTAMP</span> to <span class="inline-code">STRING</span>
  • Value conversions: Instead of the original numeric constants, corresponding string labels are emitted for the <span class="inline-code">status</span> field

All these transformations are projections generally speaking. But depending on your requirements, you could take things much further and for instance:

  • apply filters using a <span class="inline-code">WHERE</span> clause, excluding test accounts or logically deleted customer records from the change event stream,
  • join multiple change event streams into a single one, which becomes particularly useful when publishing change events for one aggregate which is persisted in multiple tables in a relational database (see further below for an example for that),
  • add derived fields, e.g. with a customer’s full name, and much more.

Instead of publishing a single data contract for a table like the <span class="inline-code">customers</span> table in the example above, you also may decide to create multiple streams with slightly different contracts, geared towards different consumers and use cases. For instance, you may have two public customer change event streams, one with address data (which can only be accessed by a small number of authorized clients) and one without (which would be accessible by a larger number of clients).

There is an interesting tension here between defining contracts which are widely applicable vs. contracts which are optimized for specific consumers. A useful guiding principle for resolving this tension could be “As general as possible, as specific as needed”. This means that, for instance, you would include the widest set of fields possible in a change stream’s data contract by default, only keeping sensitive data exclusive to streams for specific privileged consumers. Other kinds of filtering on the other hand—such as filtering out any backfilling events—would be the responsibility of individual clients, based on their specific requirements.

As far as filtering specific rows or columns is concerned, you have different options. You could ingest everything into Flink but only publish a subset, as shown in the example above. This would be useful when you want to have the option to publish multiple variants of a data contract, as just discussed. But you also could decide to omit specific sensitive fields in the Flink source table definition, thus making sure they never can be part of any published data contracts. Depending on your database, you may even exclude specific data from the ingested change stream altogether. As an example, Postgres supports the definition of column lists and row filters, providing fine-grained control over the contents of any logical replication streams, thus helping to reduce network traffic and any potential cost associated with it.

Streaming Data Contracts—Beyond the Basics

Once you have embarked onto your journey of creating data contracts with stateful stream processing, the sky's the limit, and you have all kinds of interesting related capabilities at your disposal.

So let’s discuss how to expose a complexly structured change event stream, derived from two tables in the source database. Imagine that the domain model from the example above is changed so that a customer can have multiple phone numbers, instead of just a single one. To model that, instead of having a phone column within the <span class="inline-code">customers</span> table, let’s assume that there’s a separate table <span class="inline-code">phone_numbers</span>, with a 1:n relationship between the two tables. 

A stream processing job could then be used to join the two tables, emitting the phone numbers as part of the data contract for the customer change event stream. That way, instead of having to deal with two table-level streams, consumers would be able to ingest all the data pertaining to a customer from one single stream, independent from how that data is organized in the source database. To make things a bit more interesting, let’s emit one of the numbers (the customer’s preferred one) via its own field <span class="inline-code">phone</span>, and all the others via an array-typed field <span class="inline-code">further_phones</span>:

Fig. 3: Joining two change event streams (click to enlarge)

A Flink SQL user-defined function (UDF) for aggregating the elements of the “many” side of the join into an array in a type-safe way can come in handy here, something I’ve recently explored in this video. Using the <span class="inline-code">ARRAY_AGGR()</span> function discussed in that quick tip, the two source tables could be joined like this:

INSERT INTO customers_public
  SELECT
    c.id,
    c.fname,
    c.lname,
    c.email,
    c.zip,
    preferred.`value`,
    ARRAY_AGGR(further_phones.`value`),
    CASE
      WHEN status = 1 THEN 'NEW'
      WHEN status = 2 THEN 'VIP'
      WHEN status = 3 THEN 'BLOCKED'
      ELSE 'STANDARD'
    END,
    DATE_FORMAT(registered, 'dd-MM-yyyy')
  FROM
    customers c
    LEFT JOIN
      (SELECT * FROM phone_numbers WHERE preferred = true) preferred
      ON c.id = preferred.customer_id
    LEFT JOIN
      (SELECT * FROM phone_numbers WHERE preferred = false) further_phones
      ON c.id = further_phones.customer_id
  GROUP BY
    c.id, fname, lname, email, zip, c.phone, registered, status, preferred.`value`;

<div style="text-align: center">Listing 4: Joining two source change streams into a single public stream.</div>

The <span class="inline-code">phone_numbers</span> table is left-joined here twice, once to obtain the preferred phone number and once for all the non-preferred numbers, which then are exposed as an array via the aforementioned <span class="inline-code">ARRAY_AGGR()</span> function.

But you also can go beyond the pure needs of data contracts themselves. One example is the expansion of partial change events: <span class="inline-code">UPDATE</span> events emitted by the Debezium connector for Cassandra only contain those fields of a record whose values actually changed, whereas any unchanged values are not contained. Similarly, the Postgres connector won’t emit values for unchanged TOAST columns (large column values stored by the database in a specific way). If a consumer only supports full record updates, it won’t be easily able to process such partial change events. This could be addressed by implementing a job for publishing a data contract with Flink’s DataStream API which leverages a state store for expanding any partial events into full ones, retrieving any missing field values from the store.

Another very interesting option would be taking advantage of the metadata emitted by Debezium for transaction boundaries; with this information you could implement buffering logic for emitting change events originating from one and the same transaction only when all the events from that transaction have been ingested, which is particularly useful when joining multiple raw change event streams into a single one.

Handling Schema Changes

As the saying goes: Nothing is permanent except change. It’s only a question of time until new columns are added to your application’s data schema, existing ones are renamed or removed, or their types get changed. With explicitly defined data contracts in place, you have taken the first step for making sure that any changes to your internal data schema do not directly affect the consumers of your change event streams.

From a procedural perspective, it’s important that the team owning and publishing a data contract can apply changes to the contract without having to synchronize with any event consumers, who perhaps may not even be known to the upstream team. At the same time, any changes to the contract should not break existing consumers—after a schema change they should be able to continue to process a change event stream based on the previous schema known to them. Of course, they will need to be adjusted eventually, so as to take advantage of the capabilities of a new contract version, such as any added fields. The guarantees around duration of support for particular versions of a contract is something that would be built into them along with the other metadata previously discussed such as SLAs.

This means data contracts for change event streams should be evolved in a forward compatible way, which allows for the addition of new fields and the removal of optional fields, whereas existing non-optional fields may not be removed.

Fig. 4: Producer-driven evolution of a schema (click to enlarge)

To learn more about the guidelines for schema evolution, I highly recommend referring to Gwen Shapira’s presentation “Streaming Microservices: Contracts & Compatibility”, where she discusses this topic around 16:50 min. A schema registry should be used in order to ensure that any changes to a data contract adhere to these requirements. Before rolling out any data contract changes to production, a CI/CD pipeline would validate any schema changes using the compatibility rules configured in the registry, as for instance described in this excellent blog post by Chad Sanderson and Adrian Kreuziger. Any contract changes which would actually break existing consumers, would fail the build process and thus be prevented from being deployed.

<div class="side-note">Note that evolving data contracts in a forward-compatible manner means that consumers cannot replay any events from the beginning using only the latest schema version. This would fail when, for instance, re-processing an event lacking a non-optional field added in a later schema version. Instead, each event should be processed with the schema version valid at the time when the event was originally created.</div>

Now, how can stream processing help you with managing these kinds of data contract changes? As an example, consider the case of renaming a column within a source table. The schema of the table’s change stream would change correspondingly, whenever the first change event after the name change is ingested. Exposing this schema change as-is to any downstream consumers would be an incompatible change and thus should be avoided. Different options for solving the issue exist:

  • Creating another version of the stream with the new schema (i.e. new field name); both, old and new stream versions, would co-exist, and clients could migrate from the old to the new one at their own pace
  • Expand the schema of the existing stream, so that it contains another field with the new name, next to the existing field with the old name
  • Keep the existing stream schema, i.e. don’t change the field name in the public data contract

In every case, stream processing can be used to apply the required transformations between the source events (containing either old or new field name, depending on the specific stream position) and the published counterparts(s). Let’s see how the last option—completely shielding consumers from that name change—can be implemented with Flink SQL.

Fig. 5: Renaming a column in the source table (click to enlarge)

The key idea is to use Flink’s savepoint mechanism for pausing the job, while applying the required schema changes to the database and the Flink job, making sure the job maps both old and new incoming field names to the existing name in the public contract. The exact sequence of events would be this:

  1. In Flink, stop the job with a savepoint: <span class="inline-code">STOP JOB '&lt;job id&gt;' WITH SAVEPOINT;</span>
    This makes sure the job, after restarting, will continue to process the source change stream from the exact position where it left off, not missing any changes which happened in between.
  2. In the source database, rename the column:
    <span class="inline-code">ALTER TABLE customers RENAME COLUMN fname TO first_name;</span>
  3. In Flink, add a column with the new name to the table, keeping the one with the old name too:
    <span class="inline-code">ALTER TABLE customers ADD first_name STRING;</span>
  4. In Flink, configure the savepoint path:
    <span class="inline-code">SET 'execution.savepoint.path' = '/path/to/savepoints/savepoint-&lt;job id&gt;';</span>
  5. In Flink, create a new version of the job, using the <span class="inline-code">COALESCE()</span> function to retrieve the first name either from the old or the new field, depending on which one exists in the incoming event:
INSERT INTO customers_public
  SELECT
    id,
    COALESCE(fname, first_name),
    ...
  FROM customers;

<div style="text-align: center">Listing 5: Retrieving the first name from the correct column, depending on which value is present</div>

With this procedure, any consumers of the public data contract are fully shielded from the column name change in the database. The job will source the first name from the correct incoming field, no matter whether it processes a change event from before or after the schema change was made in the database.

Note that the correct order of steps is vital here; in particular, the Flink job must be stopped before applying the schema change in the source database. Otherwise, it would not pick up the value from change events emitted after the column has been renamed. Therefore, the development team owning the database schema should also be in charge of the CDC pipeline and the Flink job for creating the public change stream with the data contract.

Beyond renaming columns, also other schema changes can be handled with a stream processing engine. New columns could be added just like above. For dropped <span class="inline-code">NOT NULL</span> columns, the streaming job could omit a sentinel value such as “n/a” to ensure compatibility with existing consumers. Also cardinality changes—for instance, going from a single <span class="inline-code">phone</span> column within the <span class="inline-code">customers</span> table to a separate table with multiple phone numbers per customer, as shown above—are possible by aggregating all the values into a new array-typed field in the public data stream.

Summary

Now, does CDC break data encapsulation? As we’ve seen, the answer to this question is surprisingly nuanced and depends a lot on how and where change event streams are consumed: the key consideration is whether events cross team and/or context boundaries, or not.

In cases where encapsulation is a concern, consciously designed data contracts can be a great tool to shield external consumers of a change event stream from the implementation details of an application’s persistent data model and any changes to its schema. With the help of stream processing, for instance using Apache Flink, you can establish well-defined APIs for your data, resulting in more robust and reliable data pipelines. Flink SQL makes the creation of data contracts a matter of describing the shape of your data with a few lines of SQL, while Flink’s DataStream API can be used for implementing more advanced requirements such as expanding incoming partial change events into full events.

With those tools and corresponding processes in place, you don’t need to be concerned about accidentally exposing your internal data model and changes to the schema of the same breaking your change stream consumers.

📫 Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

👍 Got it!
Oops! Something went wrong while submitting the form.
Gunnar Morling

Gunnar is an open-source enthusiast at heart, currently working on Apache Flink-based stream processing. In his prior role as a software engineer at Red Hat, he led the Debezium project, a distributed platform for change data capture. He is a Java Champion and has founded multiple open source projects such as JfrUnit, kcctl, and MapStruct.

Let's Get Decoding