A Deep Dive Into Ingesting Debezium Events From Kafka With Flink SQL
Over the years, I’ve spoken quite a bit about the use cases for processing Debezium data change events with Apache Flink, such as metadata enrichment, building denormalized data views, and creating data contracts for your CDC streams. One detail I haven’t covered in depth so far is how to actually ingest Debezium change events from a Kafka topic into Flink, in particular via Flink SQL. Several connectors and data formats exist for this, which can make things somewhat confusing at first. So let’s dive into the different options and the considerations around them!
Flink SQL Connectors for Apache Kafka
For processing events from a Kafka topic using Flink SQL (or the Flink Table API, which essentially offers a programmatic counterpart to SQL), there are two connectors: The Apache Kafka SQL connector and the Upsert Kafka SQL Connector.
Both connectors can be used as a source connector—reading data from a Kafka topic—and as a sink connector, for writing data to a Kafka topic. There’s support for different data formats such as JSON and Apache Avro, the latter with a schema registry such as the Confluent schema registry, or API-compatible implementations like Apicurio. The Apache Kafka SQL Connector also supports Debezium-specific JSON and Avro formats.
The combination of connector and format defines the exact semantics,
in particular whether the ingested Debezium events are processed as an append-only stream,
or as a changelog stream, building and incrementally updating materialized views of the source tables based on the incoming INSERT
, UPDATE
, and DELETE
events
(Dynamic Tables in Flink SQL terminology).
The Apache Kafka SQL Connector in Append-Only Mode
When using the Apache Kafka SQL Connector with the JSON format, no Debezium-specific semantics are applied: The Kafka topic with the Debezium events is interpreted as an append-only log of independent events. The same is the case when using the Confluent Avro format instead of JSON.
The schema of the table must be exactly modeled after Debezium’s data event structure, including all the fields of both message key (representing the record’s primary key) and message value (the change event):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
CREATE TABLE authors_append_only_source (
id BIGINT NOT NULL, (1)
before ROW( (2)
id BIGINT,
first_name STRING,
last_name STRING,
biography STRING,
registered BIGINT
),
after ROW(
id BIGINT,
first_name STRING,
last_name STRING,
biography STRING,
registered BIGINT
),
source ROW(
version STRING,
connector STRING,
name STRING,
ts_ms BIGINT,
snapshot BOOLEAN,
db STRING,
sequence STRING,
table STRING,
txid BIGINT,
lsn BIGINT,
xmin BIGINT
),
op STRING,
ts_ms BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.authors',
'properties.bootstrap.servers' = 'localhost:9092',
'scan.startup.mode' = 'earliest-offset', (3)
'key.format' = 'json', (4)
'key.fields' = 'id',
'value.format' = 'json', (5)
'value.fields-include' = 'EXCEPT_KEY'
);
1 | The id field maps to the key of incoming Kafka messages |
2 | The before , after , source , op , and ts_ms fields map to the value of incoming Kafka messages |
3 | Start reading from the earliest offset of the topic |
4 | Use JSON as the format for Kafka keys, with the id field being part of the key |
5 | Use JSON as the format for Kafka values, excluding the key fields (id in this case) |
When taking a look at the type of the events in the Flink source table—for instance by setting the result mode to changelog
when querying the table in the Flink SQL client—you’ll see that all the events are insertions (first op
column in the listing below),
no matter what their change event type is from a Debezium perspective (second op
column):
1
2
3
4
5
6
| op | id | before | after | source | op | ts_ms |
+----+------+--------------------------------+--------------------------------+--------------------------------+ ---+---------------+
| +I | 1001 | <NULL> | (1001, John, Stenton, ZbJa0... | (3.1.0.Final, postgresql, d... | r | 1744296502685 |
| +I | 1008 | <NULL> | (1009, John, Thomas, ZbJ0du... | (3.1.0.Final, postgresql, d... | c | 1744360987874 |
| +I | 1009 | (1009, John, Thomas, ZbJ0du... | (1009, John, Beck, ZbJ0duaf... | (3.1.0.Final, postgresql, d... | u | 1744626041413 |
| +I | 1008 | (1009, John, Beck, ZbJ0duaf... | <NULL> | (3.1.0.Final, postgresql, d... | d | 1744627927160 |
For writing (potentially processed) change events back into an output topic,
another table can be created with exactly the same schema and configuration,
only that you’d adjust the topic name accordingly and omit the scan.startup.mode
option.
The mapping of the key is required for both source and sink table in order to ensure that the partitioning,
and thus the ordering, of the Debezium events on the output topic is the same as on the input topic.
When to use it: The Apache Kafka SQL Connector in append-only mode is a great choice when you want to operate on a "raw" stream of Debezium data change events, without applying any changelog or upsert semantics. It comes in handy for applying transformations such as adjusting date formats or filtering events based on specific field values. In that sense, this is similar to using the Flink DataStream API on a change event stream, only that you are using SQL rather than Java for your processing logic.
The Apache Kafka SQL Connector As a Changelog Source
Besides the append-only mode, the Apache Kafka SQL Connector also supports changelog semantics via the Debezium data format.
Both JSON (by specifying debezium-json
as the value format of your table) and Avro with a registry (via debezium-avro-confluent
) are supported.
The INSERT
, UPDATE
, and DELETE
events ingested from the Kafka topic are used by the Flink SQL engine to incrementally re-compute the corresponding dynamic table, as well as any continuous queries you are running against it.
If you query a changelog-based source table, the result set always represents the current state of that table,
updated in realtime whenever a new Debezium event comes in.
The table schema looks quite a bit different than before.
Instead of modeling the entire Debezium envelope structure, only the actual table schema
(i.e. the contents of the before
and after
sections) needs to be specified:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE authors_changelog_source (
id BIGINT,
first_name STRING,
last_name STRING,
biography STRING,
registered BIGINT,
PRIMARY KEY (id) NOT ENFORCED (1)
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.authors',
'properties.bootstrap.servers' = 'localhost:9092',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'debezium-json' (2)
);
1 | While not strictly needed here, a primary key definition—in conjunction with setting the job-level configuration table.exec.source.cdc-events-duplicate to true —ensures that duplicates are discarded in case Debezium events are ingested a second time, for instance after a connector crash |
2 | Using debezium-json as the value format enables changelog semantics for this table |
When querying this table in the Flink SQL client, the operation type reflects the kind of the incoming Debezium event.
Note how update events are broken up into an update-before event (-U
, representing the retraction of the old row) and an update-after event (+U
, the insertion of the new row) internally by the Flink SQL engine:
1
2
3
4
5
6
7
+----+------+------------+-----------+-----------+------------------+
| op | id | first_name | last_name | biography | registered |
+----+------+------------+-----------+-----------+------------------+
| +I | 1010 | John | Thomas | ZbJ0duDvW | 1741642600000000 |
| -U | 1010 | John | Thomas | ZbJ0duDvW | 1741642600000000 |
| +U | 1010 | John | Stenton | ZbJ0duDvW | 1741642600000000 |
| -D | 1010 | John | Stenton | ZbJ0duDvW | 1741642600000000 |
For a source table it is typically not required to map the Kafka message key field(s) to the table schema when using the Debezium data format.
Instead, they are part of the change event value.
For situations where that’s not the case, key fields can be mapped via the key.fields
configuration option;
also the value.fields-include
option must be set to EXCEPT_KEY
then.
Optionally, additional Debezium metadata fields such as the origin timestamp or the name of the source table and schema can be mapped as virtual columns:
1
2
3
4
5
6
7
8
9
CREATE TABLE authors_changelog_source (
ts_ms TIMESTAMP_LTZ METADATA FROM 'value.ingestion-timestamp' VIRTUAL, (1)
source_table STRING METADATA FROM 'value.source.table' VIRTUAL, (2)
source_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL, (3)
id BIGINT,
...
) WITH (
...
);
1 | Maps the ts_ms field of the change events (the time at which the data change occurred in the source database) |
2 | Maps the source.table field of the change events |
3 | Maps all the source metadata of the change events |
Flink’s Debezium data format requires change events to have not only the after
section,
but also the before
part which describes the previous state of a row which got updated or deleted.
This old row image is required by Flink for retracting previous values when incrementally re-computing derived data views.
Unfortunately, this means that Postgres users can leverage this format only for tables which have a replica identity of FULL
.
Otherwise, the old row image isn’t captured in the Postgres WAL and thus not exposed via logical replication.
An exception is raised in this case:
1
2
3
java.lang.IllegalStateException: The "before" field of UPDATE message is null, if you are using Debezium Postgres Connector, please check the Postgres table has been set REPLICA IDENTITY to FULL level.
at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:159)
...
While Flink’s ChangelogNormalize
operator can materialize the retract events (at the cost of persisting all the required data in its own state store),
this currently is not supported when using the Apache Kafka SQL Connector as a changelog source with the Debezium change event format.
I don’t think there’s a fundamental issue which would prevent this from being possible,
it just currently isn’t implemented.
In order to propagate change events to another Kafka topic,
you’ll need to set up a sink connector, also using debezium-json
as the value format.
You can define which field(s) should go into the Kafka message key via the key.fields
property.
Make sure to use json
(not debezium-json
!) as the key format:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE authors_changelog_sink (
id BIGINT,
first_name STRING,
last_name STRING,
biography STRING,
registered BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'authors_processed',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'key.fields' = 'id',
'value.format' = 'debezium-json'
);
While the events on the downstream Kafka topic adhere to the Debezium’s event envelope schema,
they are produced by Flink, not Debezium.
In particular, they are lacking all the metadata you’d usually find in the source
block.
Also updates are reflected by two events, rather than a single event as Debezium would emit it:
a deletion event with the old row state, followed by an insert event with the new row state.
When to use it: The Apache Kafka SQL connector as a changelog source (and sink) is great when you want to implement streaming queries against incoming data change events, for instance in order to create denormalized views or to enable real-time analytics of the data in an OLTP datastore. It is not the best choice for ETL pipelines which don’t require stateful processing due to the removal of all the Debezium metadata. Also, splitting updates into a delete and insert event causes write amplification in downstream systems, which otherwise might support in-place updates to existing rows.
The Upsert Kafka SQL Connector
Last, let’s take a look at the Upsert Kafka SQL Connector.
It consumes/produces a changelog stream applying "upsert" semantics.
As a source connector, the first event for a given key is considered an INSERT
,
all subsequent events for that key with a non-null value are considered UPDATE
s to the same.
Tombstone records on the Kafka topic (i.e. records with a key and a null value) are interpreted as DELETE
events for that key.
Tombstone records are used by Kafka to remove records during log compaction.
You therefore need to configure a value for the topic’s |
As a sink connector, any insert or update for a key yields an event with the current state as the value, and the deletion of a key yields a tombstone record.
In order for Debezium to emit such a "flat" event structure with just the current state of a row—instead of the full Debebezium change event envelope—the new record state transformation (a Kafka Connect single message transform, SMT) needs to be applied when configuring the connector:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
{
"name": "inventory-connector",
"config": {
"connector.class":
"io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"topic.prefix": "dbserver1",
"schema.include.list": "inventory",
"slot.name" : "dbserver1",
"plugin.name" : "pgoutput",
"transforms" : "unwrap", (1)
"transforms.unwrap.type" :
"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones" : "false" (2)
}
}
1 | Apply the ExtractNewRecordState transform before sending the events to Kafka |
2 | As some Kafka Connect sink connectors can’t handle tombstone records, the connector supports dropping them. Setting this option will keep tombstone records, allowing to propagate delete events to Flink |
With this SMT in place, the contents of the after
section of INSERT
and UPDATE
events will be extracted and propagated as the sole change event value, i.e. the new row state.
DELETE
events will be propagated as Kafka tombstones, as expected by the upsert connector.
Note that the ExtractNewRecordState
SMT is highly configurable, for instance you could opt into exporting specific source
metadata properties as fields in the change event value, or as header properties of the emitted Kafka records.
The configuration of a source table for the upsert connector is pretty similar to the previous changelog source,
only that the connector type is upsert-kafka
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE authors_upsert_source (
id BIGINT,
first_name STRING,
last_name STRING,
biography STRING,
registered BIGINT,
PRIMARY KEY (id) NOT ENFORCED (1)
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'dbserver1.inventory.authors',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'value.format' = 'json'
);
1 | A primary key definition is mandatory when using the upsert connector; it determines which field(s) are part of the Kafka message key and thus are forming the upsert key |
The same goes for defining sink tables.
Now, is it also possible to ingest full Debezium change events, i.e. with the envelope, but emit upsert-style events?
Indeed it is, as you can mix and match the Kafka SQL connector as a source using the debezium-json
with the Upsert Kafka SQL connector as a sink using the json
format.
This comes in handy for instance for writing updates to an incrementally recomputed materialized view to an OLAP store for serving purposes,
without incurring the overhead of the delete + insert event pair emitted by the non-upsert connector.
When to use it: Use the Upsert Kafka SQL Connector for processing "flat" Data change events, without the Debezium event envelope. Similar to the Kafka SQL Connector as a changelog source, the upsert connector lets you implement streaming queries on change event feeds. Unlike the Kafka SQL Connector, updates are emitted as a single event, which results in less write overhead on downstream systems, in particular if partial updates (rather than full row rewrites) are supported.
Summary
When venturing into the world of processing Debezium data change events in realtime with Apache Flink and Flink SQL, the combination of available connectors and data formats for doing so can be somewhat overwhelming. The table below gives an overview over the different options, their characteristics, and use cases:
Connector | Kafka SQL Connector | Kafka SQL Connector as changelog source | Upsert Kafka SQL Connector |
---|---|---|---|
Stream type |
Append-only |
Changelog |
Changelog |
Change event format |
|
|
|
Input event type |
Debezium change event envelope |
Debezium change event envelope |
Flat events with current state; tombstone records |
Output event type |
Debezium change event envelope |
Synthetic Debezium change event envelope; updates broken up into delete + insert event |
Flat events with current state; tombstone records |
Metadata |
In change event envelope |
Mapped to table schema |
Mapped to table schema, must be part of row state |
Start reading position |
Configurable |
Configurable |
Earliest offset |
When to use |
Processing of change events themselves, e.g. transformation, enrichment, routing |
Realtime queries on changelog streams of full Debezium events, e.g. to create materialized views and enable realtime analytics |
Realtime queries on changelog streams of "flat" data change events, e.g. to create materialized views and enable realtime analytics |
Interestingly, whereas the Apache Flink project itself provides two separate Kafka connectors for upsert and non-upsert use cases,
managed Flink SQL offerings in the cloud tend to provide a more unified experience centered around one single higher-level connector.
As an example, the connector for integrating Flink with Kafka topics on Confluent Cloud exposes a setting changelog.mode
,
which defaults to append
when deriving a Flink table from an uncompacted Kafka topic and to upsert
for compacted topics.
Similar abstractions exist on other services too,
with the general aim being to shield users from some of the intricacies here.
One more thing you might wonder at this point is: how does Flink CDC fit into all this?
Also hosted by the Apache Software Foundation,
this project integrates Debezium as a native connector into Flink,
instead of channeling data change events through Apache Kafka.
The Flink CDC connectors also emit changelog streams with retraction events as shown above,
only the Postgres connector optionally supports upsert semantics via its changelog-mode
setting.
There are pros and cons for both ways of integrating Debezium and Flink, for instance in regards to the replayability of events. This warrants a separate blog post just dedicated to comparing both approaches at some point, though.
If you’d like to experiment with the different connectors and data formats for ingesting Debezium data change events from Kafka into Flink SQL by yourself, check out this project in my stream-examples repository which contains Flink jobs for all the different configurations.