Back
October 29, 2024
7
min read

Schema Evolution in Change Data Capture Pipelines

Streaming ETL pipelines built on top of change data capture (CDC) are a powerful mechanism and drive many real-time workloads in businesses across various different industries these days. Since new requirements keep popping up on a regular basis, change is inevitable. This article highlights the importance of schema evolution in the context of real-time data pipelines and discusses how to approach modifications to data models at the source side without breaking compatibility with downstream consumers.

Why does schema evolution matter?

Just use a well-defined data schema when initially setting up a streaming ETL pipeline based on change data capture (CDC). Oh, and make sure to never modify any table definitions at the source side ... then it will all be fine. đŸ€Ą

In an ideal world, we might be able to do just that. However, the reality is that changing business needs require applications and their underlying data stores to adapt accordingly. Hence, it's just a matter of time until an initially set up real-time ETL pipeline powered by CDC will start to suffer from hiccups and eventually come to a grinding halt.

In fact, one of the most crucial aspects for any real-world CDC use case to be successful is to consider schema evolution from the very beginning. Common challenging aspects to deal with include:

  • Data format inconsistencies: Depending on the chosen data format, type changes to table columns may cause errors during the (de)serialization of CDC events.
  • Misalignment of data structures: Schema changes at the source database will almost certainly lead to a mismatch between the old and new data structures describing the change events. If not handled systematically, this may well cause incorrect or at least inconsistent data being propagated to consuming applications.
  • Incompatibility with sink systems: Downstream systems such as data warehouses, analytics platforms, or relational databases typically expect a fixed schema. Unexpected changes to this schema can break compatibility and will most likely lead to data ingestion errors, failed queries and broken data flows.
  • Replaying of historical data: If schema changes occur to CDC events that get stored in an event streaming platform like Apache Kafka, it's not necessarily straight-forward to handle a mix of newer and historic CDC events when reprocessing records.
  • State management trouble: In streaming ETL pipelines, it's not uncommon to perform stateful operations such as aggregations or joins which depend on the schema of CDC events and become affected due to changes thereof. This may cause internal state to become invalid after the fact.

Schema Changes at the Source Side

Let's make matters a bit more concrete and look at a few typical schema-related changes in the context of relational databases. It's helpful to broadly categorize data definition language (DDL) statements and differentiate at least between table- versus column-related changes.

Table-level Changes

CDC solutions are ideally capable of properly reacting to more coarse-grained schema changes such as the addition and removal of tables:

  • <span class="inline-code">CREATE TABLE my_table_t1</span>: Whenever new tables get created with their respective columns and type definitions they should be automatically discovered and processed given the CDC tool is configured to do so. For instance, this could be achieved by providing settings to define prefix/suffix or full regex matching on the respective table names to have them either included or excluded. One aspect to consider is that the CDC tool should provide the possibility to perform an initial snapshot to synchronize all the existing data in the added tables to enable backfill scenarios.
  • <span class="inline-code">DROP TABLE my_table_t2</span>: conversely a CDC tool is supposed to properly react to tables getting dropped on the fly by effectively ignoring them without causing any interruption for the remaining tables being captured.

Another modification that is also reflected on the table-level would be to change the cardinality between entities and, for instance, switch from a 1:1 to 1:n relationship. It's a rather invasive schema change that typically requires more upfront thought, careful preparation and additional manual effort to get it done properly, such that the different data model semantics are also propagated throughout the ETL pipeline accordingly.

Column-level Changes

It gets a bit more nuanced when thinking of fine-grained schema changes that are applied to selected columns of database tables using ALTER TABLE statements like the ones below:

  • <span class="inline-code">ADD COLUMN col_a</span>: adding one or more columns to an existing table
  • <span class="inline-code">DROP COLUMN col_b</span>: removing one or more columns from an existing table
  • <span class="inline-code">RENAME COLUMN col_c TO col_d</span>: renaming one or more columns in an existing table

Another column-related schema change worth mentioning that can be tricky to pull off in the context of data pipelines is modifying a column's data type in any of the following ways:

  • Type Widening: changing the type from <span class="inline-code">float</span> to <span class="inline-code">double</span> or from <span class="inline-code">int</span> to <span class="inline-code">long</span>, in general widen the current type of column(s)
  • Type Narrowing: changes which narrow a field's type, for instance, from <span class="inline-code">double</span> to <span class="inline-code">float</span> or from <span class="inline-code">long</span> to <span class="inline-code">int</span>
  • ‍Type Swapping: switching between inherently different data types, for instance, <span class="inline-code">double</span> to <span class="inline-code">enum</span> or <span class="inline-code">string</span> to <span class="inline-code">boolean</span>

What any of these source side column-level changes actually mean for a streaming ETL pipeline and how they affect the sink side or specific downstream consumers is discussed in the next section.

Consequences at the Sink Side

Let's switch focus to understand what happens at the sink side after any of the DDL statements discussed right above are executed at the source side. In general, the consequences can be vastly different—ranging from negligible to disastrous—depending on the streaming ETL pipeline specifics and the concrete CDC scenario at hand:

  • Example 1: Given a streaming ETL pipeline between a relational database (e.g. PostgreSQL) at the source side and some object storage (e.g. S3) on the sink side, schema changes in the database typically won't be disruptive, at least not from a pure pipeline or data flow perspective. Whether or not applications on top—or more broadly speaking any consumer of this data at the sink side—can deal with these changes is a totally different question.
  • Example 2: Consider a 1:1 replication scenario between heterogeneous data stores such as MySQL and PostgreSQL. In this case it's absolutely vital that any modifications to the data model at the source side get properly reflected at the sink side's data model in a timely fashion. Otherwise, the streaming ETL pipeline itself would most likely collapse immediately because any CDC events subsequent to the source side's schema change would fail to be applied at the sink side.

Admittedly, these two examples are pretty much at opposite ends of a spectrum and the actual consequences to expect in streaming ETL pipelines due to schema changes are most often found somewhere in-between for real use cases.

Irrespective of the actual schema changes in question, explicitly reacting to any such DDL statements in a potentially fully-automated way at the downstream side requires the CDC solution to accurately capture and propagate all these schema modifications.

Quick Detour: Schema Changes in Debezium

How does Debezium react to schema-related modifications behind the scenes? The short answer is it depends. 🙃

Unfortunately, due to peculiarities concerning the inner workings of specific databases, Debezium cannot simply treat and expose schema changes across the databases it supports in an agnostic way. At the moment, there are two fundamentally different ways how this is handled for relational databases:

  1. Explicit schema change events as a result of applying DDL statements can be published to a dedicated channel i.e. a separate Kafka topic. This behavior is available for most RDBMS supported by Debezium, for instance, MySQL, or SQL Server.
  1. For PostgreSQL, there is the limitation that the logical decoding feature used by Debezium does not support DDL changes, which in turn means that schema change events are not separately published. Instead, any schema modifications are only reflected as part of the data change events themselves.

That said, let's return to the schema changes on column-level from the previous section to understand what they would mean for schema compatibility towards consumers.

Adding, Dropping, Renaming Columns

When trying to understand the implications of schema changes and what they mean for downstream consumers, there is this notion of forward versus backward compatibility of schemas. In this article, the terms "forward-compatible" and "backward-compatible" are applied with their semantics as commonly used in the context of Kafka schema registries. A schema change is considered forward-compatible if a consumer that is using schema version N can keep processing messages following the next schema version N+1. Conversely, a backward-compatible schema change means that a consumer which is operating based on schema version N, can still process messages following the previous schema version N-1. With that in mind, here is what column-level changes mean for schema compatibility:

<span class="inline-code">ADD COLUMN col_a</span>: Adding a new column to a table is a forward-compatible change. Old consumers—those sticking to the previous schema—can process new events by ignoring the additional field. If the added column is optional and/or defines a default value, it can make such a change backward-compatible. New consumers—those already aware of the modified schema—can still process old events by treating the missing field optional or by compensating with the default value.

<span class="inline-code">DROP COLUMN col_b</span>: Removing an existing column is backward-compatible. New consumers which are aware of the modified schema can still process old events by skipping over the still present i.e. superfluous field. In case the dropped field has been originally defined as optional and/or with a default value, it would be a forward-compatible change such that old consumers still sticking to the previous schema can process new events by compensating for the missing field with the default value.

<span class="inline-code">RENAME COLUMN col_c TO col_d</span>: renaming an existing column in a table is typically considered an incompatible change. By merely looking at any CDC event in isolation after the fact, it's in general unclear what happened. It's akin to a previously available column that has been dropped while a new one has been added.

In the context of CDC pipelines, you should primarily strive for forward-compatible schema changes so as to make sure existing consumers which still expect the previous schema version can keep processing data change events.

Column Type Modifications

Type Widening: Changing <span class="inline-code">float</span> to <span class="inline-code">double</span> or <span class="inline-code">int</span> to <span class="inline-code">long</span>—in general widen the type—is backward-compatible. Since the new type is able to represent all values that the old type could, a new consumer should still be able to read old data. It's not forward-compatible though as old consumers still have the narrower data type which cannot necessarily fit the new wider type's values.

Type Narrowing: Changes which narrow a field's type, for instance, going from <span class="inline-code">double</span> to <span class="inline-code">float</span> or <span class="inline-code">long</span> to <span class="inline-code">int</span> can be considered forward-compatible even if the new type cannot express the full range of values that the old type could. Old consumers should still be able to read new data as the narrower type can fit into the originally wider type. It's not backwards compatible due to potential precision loss which may cause data inconsistencies.

Type Swapping: Switching between inherently different data types, for instance, <span class="inline-code">double</span> to <span class="inline-code">enum</span> or <span class="inline-code">string</span> to <span class="inline-code">boolean</span> in general breaks compatibility due to fundamentally different data representations for affected columns.

A Note on Schema Registries

A schema registry is a vital component for managing and enforcing rules which clearly define how to evolve schemas. It helps to ensure that specific modifications to a data model don't immediately disrupt data processing by downstream consumers.

There are several popular schema registry implementations, for instance, Confluent Schema Registry, Apicurio, or Karapace all of which allow for centrally managing schemas including version support. Producing and consuming clients can lookup specific schemas based on some identifier plus a version number. When schemas are changed and new versions get published, schema registries can automatically check new schema versions against pre-defined compatibility rules (e.g. forward- / backward- / fully-compatible) and thus reject a new schema submission if applicable.

Selected Mitigation Strategies

Beyond schema compatibility rules to address schema changes in a smoother and more systematic way, what can be proactively done to prevent source side schema changes from directly impacting the sink side and potentially breaking data flows?

Outbox Pattern

One approach is to use the outbox pattern, hence, not directly exposing data model internals to the outside world to begin with. So instead of configuring a CDC pipeline directly against the database tables backing some application, a contract is defined based on a well-defined public schema. In addition to writing to the original database tables, any data changes are written into a dedicated table. This table, called the outbox table, is the primary means to communicate (a subset of) internal data changes by means of change events following the defined contract. For reasons of data consistency, write operations to the original tables and the outbox table are all bound to one and the same database transaction. 

Since the pipeline will only ever capture changes to the outbox table, it is guaranteed to propagate messages which adhere to the public schema. Changes to the internal database schema won’t become immediately visible to the outside world, hence, can’t break downstream consumers abruptly.

One downside of the outbox pattern, besides some additional write overhead at the source database, is that if it hasn't been considered from day one, the application needs to be changed accordingly. This may or may not be an option, depending on ownership and control.

Also, doesn’t the outbox pattern only defer the original problem? What if, for some reason, the public contract itself needs to be changed? First, the frequency for this need to actually arise should hopefully be considerably lower than the number of schema changes which are necessary internally. Second, in such cases you might be able to successfully tackle the public schema modification with an alternative approach explained in the next section.

In-flight Transformations

Another well-known solution to help reduce event data-specific coupling between producers and consumers is the message translator pattern. Two rather different, yet common scenarios in the wild to follow this pattern and apply data transformations in-flight are: a) to integrate with legacy systems and their proprietary/non-standardized data formats, or b) to avoid leaking the domain models in microservice architectures. In the context of schema evolution for streaming ETL pipelines, the message translator pattern turns out to be equally effective to ensure that consumers don’t break as schemas need to evolve.

Transformations come in various forms, ranging from basic operations such as including / excluding fields, changing field names, converting data types, or masking of sensitive data to more involved modifications such as full payload format conversions (e.g. from csv to json). While the message translator pattern for event payloads can be implemented in different places, it is most beneficial to externalize this functionality rather than doing it as part of the producing or the consuming side of the data flow in question.

If there is Kafka infrastructure in place, a simple way could be to implement all necessary event payload modifications as a chain of multiple single message transformations in a Kafka Connect-based data pipeline. However, there might be transformations which require more elaborate operations such as external lookups or enrichments based on joining with additional data. Also, whenever it’s necessary to act upon more than one message at a time, fully-fledged stream processing comes into the picture which is where Apache Flink really shines.

Irrespective of the tech stack used to implement basic or complex in-flight transformations, the core idea behind it—conceptually similar to the one for the outbox pattern—is to shield downstream consumers from any abrupt and potentially breaking schema changes.

The final section of this article discusses a vastly different scenario, namely how to actually synchronize schema changes between a source and sink system rather than trying to work around this. Both the outbox and the message translator patterns would be of no use in such a case.

Automatic Schema Synchronization

Let’s revisit one of the examples from earlier in this post which was about 1:1 data replication between different relational data stores. Use cases like this require a way for any schema changes happening in source database tables to be applied onto the corresponding sink database tables—effectively on the fly. Achieving schema synchronization for streaming ETL pipelines in an automated way, thereby limiting any significant downtime, is a non-trivial challenge and requires purpose-built solutions. This is where Flink CDC comes into the picture.

Flink CDC 3.x

Recent advances which have been introduced with Flink CDC 3.0 earlier this year can support automatic schema synchronization as well, besides a few other long-anticipated capabilities. Schema evolution in Flink CDC is not just a simple on/off switch but allows for quite some flexibility due to fine-grained configuration settings to modify its behavior.

At the moment, there are the following five schema evolution modes to choose from to cater for different use cases:

  • exception: effectively forbids all schema changes and fails if applicable
  • evolve: tries to apply all upstream schema changes to downstream and fails if problems occur
  • try evolve: tries to apply all upstream schema changes to downstream but tolerates potential failures
  • lenient: adapts upstream schema changes if necessary such that no destructive operations happen downstream
  • ignore: effectively ignores all potential schema changes but keeps propagating changes from unchanged columns

In addition to this global setting, there is even control on a per event type level. This allows to precisely define which of the currently supported event types—create table, add column, alter column type, drop column, and rename column—should be either included or excluded during schema synchronization towards a sink system.

While automatic schema synchronization is currently only available as part of a limited set of connectors, it can be conveniently used by means of adding a few configuration options to a declarative data pipeline definition written in YAML.

The YAML manifest shown below defines a Flink CDC 3.2-based streaming ETL pipeline which replicates tables between MySQL and Apache Doris and reacts to upstream schema changes such as adding columns by automatically synchronizing these downstream using proper DDL statements behind the scenes:

pipeline:
  name: mysql-to-doris-sync
  parallelism: 1

source:
  type: mysql
  name: my-relational-database-source
  hostname: mysql
  port: 3306
  username: root
  password: 123456
  # sync all tables in the 'inventory' database using regex
  tables: inventory.\.*
  server-id: 7777
  server-time-zone: UTC
  # enable schema evolution (defaults to true)
  schema-change.enabled: true
  jdbc.properties.useSSL: false
  jdbc.properties.allowPublicKeyRetrieval: true

sink:
  type: doris
  name: my-analytics-warehouse-sink
  fenodes: doris:8030
  benodes: doris:8040
  username: root
  password: ""
  sink.buffer-flush.interval: 1s
  sink.buffer-flush.max-bytes: 102400
  sink.buffer-flush.max-rows: 100
  table.create.properties.replication_num: 1
  # if true, addition and deletion of value columns can be done more quickly and synchronously
  table.create.properties.light_schema_change: true

Summary

This blog post discussed why schema evolution is a crucial aspect that needs to be considered  from day one when building streaming ETL pipelines on top of change data capture. We looked at a number of common source-side data model changes on both, table- and column-level to understand the consequences of these schema changes at the sink side, more specifically, what they mean for schema compatibility towards downstream consumers. The following table provides a condensed overview:

Column-level schema changes Forward-compatible Backward-compatible
add column yes no
add optional column yes yes
drop column no yes
drop optional column yes yes
renaming column no no
widen column type no yes
narrow column type yes no
swap column type no no

‍

Besides that, two effective mitigation strategies—the outbox and the message translator pattern—have been explained in order to proactively shield the sink side from unexpected and potentially disruptive schema changes. Finally, we briefly looked into a long-anticipated feature added in Flink CDC 3.0 and how it enables automatic schema synchronization between heterogeneous data stores.

đŸ“« Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

👍 Got it!
Oops! Something went wrong while submitting the form.
Hans-Peter Grahsl

Hans-Peter Grahsl is a Staff Developer Advocate at Decodable. He is an open-source community enthusiast and in particular passionate about event-driven architectures, distributed stream processing systems and data engineering. For his code contributions, conference talks and blog post writing at the intersection of the Apache Kafka and MongoDB communities, Hans-Peter received multiple community awards. He likes to code and is a regular speaker at developer conferences around the world.

Streaming ETL pipelines built on top of change data capture (CDC) are a powerful mechanism and drive many real-time workloads in businesses across various different industries these days. Since new requirements keep popping up on a regular basis, change is inevitable. This article highlights the importance of schema evolution in the context of real-time data pipelines and discusses how to approach modifications to data models at the source side without breaking compatibility with downstream consumers.

Why does schema evolution matter?

Just use a well-defined data schema when initially setting up a streaming ETL pipeline based on change data capture (CDC). Oh, and make sure to never modify any table definitions at the source side ... then it will all be fine. đŸ€Ą

In an ideal world, we might be able to do just that. However, the reality is that changing business needs require applications and their underlying data stores to adapt accordingly. Hence, it's just a matter of time until an initially set up real-time ETL pipeline powered by CDC will start to suffer from hiccups and eventually come to a grinding halt.

In fact, one of the most crucial aspects for any real-world CDC use case to be successful is to consider schema evolution from the very beginning. Common challenging aspects to deal with include:

  • Data format inconsistencies: Depending on the chosen data format, type changes to table columns may cause errors during the (de)serialization of CDC events.
  • Misalignment of data structures: Schema changes at the source database will almost certainly lead to a mismatch between the old and new data structures describing the change events. If not handled systematically, this may well cause incorrect or at least inconsistent data being propagated to consuming applications.
  • Incompatibility with sink systems: Downstream systems such as data warehouses, analytics platforms, or relational databases typically expect a fixed schema. Unexpected changes to this schema can break compatibility and will most likely lead to data ingestion errors, failed queries and broken data flows.
  • Replaying of historical data: If schema changes occur to CDC events that get stored in an event streaming platform like Apache Kafka, it's not necessarily straight-forward to handle a mix of newer and historic CDC events when reprocessing records.
  • State management trouble: In streaming ETL pipelines, it's not uncommon to perform stateful operations such as aggregations or joins which depend on the schema of CDC events and become affected due to changes thereof. This may cause internal state to become invalid after the fact.

Schema Changes at the Source Side

Let's make matters a bit more concrete and look at a few typical schema-related changes in the context of relational databases. It's helpful to broadly categorize data definition language (DDL) statements and differentiate at least between table- versus column-related changes.

Table-level Changes

CDC solutions are ideally capable of properly reacting to more coarse-grained schema changes such as the addition and removal of tables:

  • <span class="inline-code">CREATE TABLE my_table_t1</span>: Whenever new tables get created with their respective columns and type definitions they should be automatically discovered and processed given the CDC tool is configured to do so. For instance, this could be achieved by providing settings to define prefix/suffix or full regex matching on the respective table names to have them either included or excluded. One aspect to consider is that the CDC tool should provide the possibility to perform an initial snapshot to synchronize all the existing data in the added tables to enable backfill scenarios.
  • <span class="inline-code">DROP TABLE my_table_t2</span>: conversely a CDC tool is supposed to properly react to tables getting dropped on the fly by effectively ignoring them without causing any interruption for the remaining tables being captured.

Another modification that is also reflected on the table-level would be to change the cardinality between entities and, for instance, switch from a 1:1 to 1:n relationship. It's a rather invasive schema change that typically requires more upfront thought, careful preparation and additional manual effort to get it done properly, such that the different data model semantics are also propagated throughout the ETL pipeline accordingly.

Column-level Changes

It gets a bit more nuanced when thinking of fine-grained schema changes that are applied to selected columns of database tables using ALTER TABLE statements like the ones below:

  • <span class="inline-code">ADD COLUMN col_a</span>: adding one or more columns to an existing table
  • <span class="inline-code">DROP COLUMN col_b</span>: removing one or more columns from an existing table
  • <span class="inline-code">RENAME COLUMN col_c TO col_d</span>: renaming one or more columns in an existing table

Another column-related schema change worth mentioning that can be tricky to pull off in the context of data pipelines is modifying a column's data type in any of the following ways:

  • Type Widening: changing the type from <span class="inline-code">float</span> to <span class="inline-code">double</span> or from <span class="inline-code">int</span> to <span class="inline-code">long</span>, in general widen the current type of column(s)
  • Type Narrowing: changes which narrow a field's type, for instance, from <span class="inline-code">double</span> to <span class="inline-code">float</span> or from <span class="inline-code">long</span> to <span class="inline-code">int</span>
  • ‍Type Swapping: switching between inherently different data types, for instance, <span class="inline-code">double</span> to <span class="inline-code">enum</span> or <span class="inline-code">string</span> to <span class="inline-code">boolean</span>

What any of these source side column-level changes actually mean for a streaming ETL pipeline and how they affect the sink side or specific downstream consumers is discussed in the next section.

Consequences at the Sink Side

Let's switch focus to understand what happens at the sink side after any of the DDL statements discussed right above are executed at the source side. In general, the consequences can be vastly different—ranging from negligible to disastrous—depending on the streaming ETL pipeline specifics and the concrete CDC scenario at hand:

  • Example 1: Given a streaming ETL pipeline between a relational database (e.g. PostgreSQL) at the source side and some object storage (e.g. S3) on the sink side, schema changes in the database typically won't be disruptive, at least not from a pure pipeline or data flow perspective. Whether or not applications on top—or more broadly speaking any consumer of this data at the sink side—can deal with these changes is a totally different question.
  • Example 2: Consider a 1:1 replication scenario between heterogeneous data stores such as MySQL and PostgreSQL. In this case it's absolutely vital that any modifications to the data model at the source side get properly reflected at the sink side's data model in a timely fashion. Otherwise, the streaming ETL pipeline itself would most likely collapse immediately because any CDC events subsequent to the source side's schema change would fail to be applied at the sink side.

Admittedly, these two examples are pretty much at opposite ends of a spectrum and the actual consequences to expect in streaming ETL pipelines due to schema changes are most often found somewhere in-between for real use cases.

Irrespective of the actual schema changes in question, explicitly reacting to any such DDL statements in a potentially fully-automated way at the downstream side requires the CDC solution to accurately capture and propagate all these schema modifications.

Quick Detour: Schema Changes in Debezium

How does Debezium react to schema-related modifications behind the scenes? The short answer is it depends. 🙃

Unfortunately, due to peculiarities concerning the inner workings of specific databases, Debezium cannot simply treat and expose schema changes across the databases it supports in an agnostic way. At the moment, there are two fundamentally different ways how this is handled for relational databases:

  1. Explicit schema change events as a result of applying DDL statements can be published to a dedicated channel i.e. a separate Kafka topic. This behavior is available for most RDBMS supported by Debezium, for instance, MySQL, or SQL Server.
  1. For PostgreSQL, there is the limitation that the logical decoding feature used by Debezium does not support DDL changes, which in turn means that schema change events are not separately published. Instead, any schema modifications are only reflected as part of the data change events themselves.

That said, let's return to the schema changes on column-level from the previous section to understand what they would mean for schema compatibility towards consumers.

Adding, Dropping, Renaming Columns

When trying to understand the implications of schema changes and what they mean for downstream consumers, there is this notion of forward versus backward compatibility of schemas. In this article, the terms "forward-compatible" and "backward-compatible" are applied with their semantics as commonly used in the context of Kafka schema registries. A schema change is considered forward-compatible if a consumer that is using schema version N can keep processing messages following the next schema version N+1. Conversely, a backward-compatible schema change means that a consumer which is operating based on schema version N, can still process messages following the previous schema version N-1. With that in mind, here is what column-level changes mean for schema compatibility:

<span class="inline-code">ADD COLUMN col_a</span>: Adding a new column to a table is a forward-compatible change. Old consumers—those sticking to the previous schema—can process new events by ignoring the additional field. If the added column is optional and/or defines a default value, it can make such a change backward-compatible. New consumers—those already aware of the modified schema—can still process old events by treating the missing field optional or by compensating with the default value.

<span class="inline-code">DROP COLUMN col_b</span>: Removing an existing column is backward-compatible. New consumers which are aware of the modified schema can still process old events by skipping over the still present i.e. superfluous field. In case the dropped field has been originally defined as optional and/or with a default value, it would be a forward-compatible change such that old consumers still sticking to the previous schema can process new events by compensating for the missing field with the default value.

<span class="inline-code">RENAME COLUMN col_c TO col_d</span>: renaming an existing column in a table is typically considered an incompatible change. By merely looking at any CDC event in isolation after the fact, it's in general unclear what happened. It's akin to a previously available column that has been dropped while a new one has been added.

In the context of CDC pipelines, you should primarily strive for forward-compatible schema changes so as to make sure existing consumers which still expect the previous schema version can keep processing data change events.

Column Type Modifications

Type Widening: Changing <span class="inline-code">float</span> to <span class="inline-code">double</span> or <span class="inline-code">int</span> to <span class="inline-code">long</span>—in general widen the type—is backward-compatible. Since the new type is able to represent all values that the old type could, a new consumer should still be able to read old data. It's not forward-compatible though as old consumers still have the narrower data type which cannot necessarily fit the new wider type's values.

Type Narrowing: Changes which narrow a field's type, for instance, going from <span class="inline-code">double</span> to <span class="inline-code">float</span> or <span class="inline-code">long</span> to <span class="inline-code">int</span> can be considered forward-compatible even if the new type cannot express the full range of values that the old type could. Old consumers should still be able to read new data as the narrower type can fit into the originally wider type. It's not backwards compatible due to potential precision loss which may cause data inconsistencies.

Type Swapping: Switching between inherently different data types, for instance, <span class="inline-code">double</span> to <span class="inline-code">enum</span> or <span class="inline-code">string</span> to <span class="inline-code">boolean</span> in general breaks compatibility due to fundamentally different data representations for affected columns.

A Note on Schema Registries

A schema registry is a vital component for managing and enforcing rules which clearly define how to evolve schemas. It helps to ensure that specific modifications to a data model don't immediately disrupt data processing by downstream consumers.

There are several popular schema registry implementations, for instance, Confluent Schema Registry, Apicurio, or Karapace all of which allow for centrally managing schemas including version support. Producing and consuming clients can lookup specific schemas based on some identifier plus a version number. When schemas are changed and new versions get published, schema registries can automatically check new schema versions against pre-defined compatibility rules (e.g. forward- / backward- / fully-compatible) and thus reject a new schema submission if applicable.

Selected Mitigation Strategies

Beyond schema compatibility rules to address schema changes in a smoother and more systematic way, what can be proactively done to prevent source side schema changes from directly impacting the sink side and potentially breaking data flows?

Outbox Pattern

One approach is to use the outbox pattern, hence, not directly exposing data model internals to the outside world to begin with. So instead of configuring a CDC pipeline directly against the database tables backing some application, a contract is defined based on a well-defined public schema. In addition to writing to the original database tables, any data changes are written into a dedicated table. This table, called the outbox table, is the primary means to communicate (a subset of) internal data changes by means of change events following the defined contract. For reasons of data consistency, write operations to the original tables and the outbox table are all bound to one and the same database transaction. 

Since the pipeline will only ever capture changes to the outbox table, it is guaranteed to propagate messages which adhere to the public schema. Changes to the internal database schema won’t become immediately visible to the outside world, hence, can’t break downstream consumers abruptly.

One downside of the outbox pattern, besides some additional write overhead at the source database, is that if it hasn't been considered from day one, the application needs to be changed accordingly. This may or may not be an option, depending on ownership and control.

Also, doesn’t the outbox pattern only defer the original problem? What if, for some reason, the public contract itself needs to be changed? First, the frequency for this need to actually arise should hopefully be considerably lower than the number of schema changes which are necessary internally. Second, in such cases you might be able to successfully tackle the public schema modification with an alternative approach explained in the next section.

In-flight Transformations

Another well-known solution to help reduce event data-specific coupling between producers and consumers is the message translator pattern. Two rather different, yet common scenarios in the wild to follow this pattern and apply data transformations in-flight are: a) to integrate with legacy systems and their proprietary/non-standardized data formats, or b) to avoid leaking the domain models in microservice architectures. In the context of schema evolution for streaming ETL pipelines, the message translator pattern turns out to be equally effective to ensure that consumers don’t break as schemas need to evolve.

Transformations come in various forms, ranging from basic operations such as including / excluding fields, changing field names, converting data types, or masking of sensitive data to more involved modifications such as full payload format conversions (e.g. from csv to json). While the message translator pattern for event payloads can be implemented in different places, it is most beneficial to externalize this functionality rather than doing it as part of the producing or the consuming side of the data flow in question.

If there is Kafka infrastructure in place, a simple way could be to implement all necessary event payload modifications as a chain of multiple single message transformations in a Kafka Connect-based data pipeline. However, there might be transformations which require more elaborate operations such as external lookups or enrichments based on joining with additional data. Also, whenever it’s necessary to act upon more than one message at a time, fully-fledged stream processing comes into the picture which is where Apache Flink really shines.

Irrespective of the tech stack used to implement basic or complex in-flight transformations, the core idea behind it—conceptually similar to the one for the outbox pattern—is to shield downstream consumers from any abrupt and potentially breaking schema changes.

The final section of this article discusses a vastly different scenario, namely how to actually synchronize schema changes between a source and sink system rather than trying to work around this. Both the outbox and the message translator patterns would be of no use in such a case.

Automatic Schema Synchronization

Let’s revisit one of the examples from earlier in this post which was about 1:1 data replication between different relational data stores. Use cases like this require a way for any schema changes happening in source database tables to be applied onto the corresponding sink database tables—effectively on the fly. Achieving schema synchronization for streaming ETL pipelines in an automated way, thereby limiting any significant downtime, is a non-trivial challenge and requires purpose-built solutions. This is where Flink CDC comes into the picture.

Flink CDC 3.x

Recent advances which have been introduced with Flink CDC 3.0 earlier this year can support automatic schema synchronization as well, besides a few other long-anticipated capabilities. Schema evolution in Flink CDC is not just a simple on/off switch but allows for quite some flexibility due to fine-grained configuration settings to modify its behavior.

At the moment, there are the following five schema evolution modes to choose from to cater for different use cases:

  • exception: effectively forbids all schema changes and fails if applicable
  • evolve: tries to apply all upstream schema changes to downstream and fails if problems occur
  • try evolve: tries to apply all upstream schema changes to downstream but tolerates potential failures
  • lenient: adapts upstream schema changes if necessary such that no destructive operations happen downstream
  • ignore: effectively ignores all potential schema changes but keeps propagating changes from unchanged columns

In addition to this global setting, there is even control on a per event type level. This allows to precisely define which of the currently supported event types—create table, add column, alter column type, drop column, and rename column—should be either included or excluded during schema synchronization towards a sink system.

While automatic schema synchronization is currently only available as part of a limited set of connectors, it can be conveniently used by means of adding a few configuration options to a declarative data pipeline definition written in YAML.

The YAML manifest shown below defines a Flink CDC 3.2-based streaming ETL pipeline which replicates tables between MySQL and Apache Doris and reacts to upstream schema changes such as adding columns by automatically synchronizing these downstream using proper DDL statements behind the scenes:

pipeline:
  name: mysql-to-doris-sync
  parallelism: 1

source:
  type: mysql
  name: my-relational-database-source
  hostname: mysql
  port: 3306
  username: root
  password: 123456
  # sync all tables in the 'inventory' database using regex
  tables: inventory.\.*
  server-id: 7777
  server-time-zone: UTC
  # enable schema evolution (defaults to true)
  schema-change.enabled: true
  jdbc.properties.useSSL: false
  jdbc.properties.allowPublicKeyRetrieval: true

sink:
  type: doris
  name: my-analytics-warehouse-sink
  fenodes: doris:8030
  benodes: doris:8040
  username: root
  password: ""
  sink.buffer-flush.interval: 1s
  sink.buffer-flush.max-bytes: 102400
  sink.buffer-flush.max-rows: 100
  table.create.properties.replication_num: 1
  # if true, addition and deletion of value columns can be done more quickly and synchronously
  table.create.properties.light_schema_change: true

Summary

This blog post discussed why schema evolution is a crucial aspect that needs to be considered  from day one when building streaming ETL pipelines on top of change data capture. We looked at a number of common source-side data model changes on both, table- and column-level to understand the consequences of these schema changes at the sink side, more specifically, what they mean for schema compatibility towards downstream consumers. The following table provides a condensed overview:

Column-level schema changes Forward-compatible Backward-compatible
add column yes no
add optional column yes yes
drop column no yes
drop optional column yes yes
renaming column no no
widen column type no yes
narrow column type yes no
swap column type no no

‍

Besides that, two effective mitigation strategies—the outbox and the message translator pattern—have been explained in order to proactively shield the sink side from unexpected and potentially disruptive schema changes. Finally, we briefly looked into a long-anticipated feature added in Flink CDC 3.0 and how it enables automatic schema synchronization between heterogeneous data stores.

đŸ“« Email signup 👇

Did you enjoy this issue of Checkpoint Chronicle? Would you like the next edition delivered directly to your email to read from the comfort of your own home?

Simply enter your email address here and we'll send you the next issue as soon as it's published—and nothing else, we promise!

Hans-Peter Grahsl

Hans-Peter Grahsl is a Staff Developer Advocate at Decodable. He is an open-source community enthusiast and in particular passionate about event-driven architectures, distributed stream processing systems and data engineering. For his code contributions, conference talks and blog post writing at the intersection of the Apache Kafka and MongoDB communities, Hans-Peter received multiple community awards. He likes to code and is a regular speaker at developer conferences around the world.