Change data capture (CDC) is a process to capture changes made to data in the database and stream those changes to external processes, applications, or other databases.
- The database and its tables must be created using YugabyteDB version 2.13 or later.
- CDC supports YSQL tables only. (See Limitations.)
Be aware that you can't stream data out of system tables.
NoteThe current YugabyteDB CDC implementation supports only Debezium and Kafka.
The core primitive of CDC is the stream. Streams can be enabled and disabled on databases. Every change to a watched database table is emitted as a record in a configurable format to a configurable sink. Streams scale to any YugabyteDB cluster independent of its size and are designed to impact production traffic as little as possible.
Streams are the YugabyteDB endpoints for fetching database changes by applications, processes, and systems. Streams can be enabled or disabled (on a per namespace basis). Every change to a database table (for which the data is being streamed) is emitted as a record to the stream, which is then propagated further for consumption by applications, in this case to Debezium, and then ultimately to Kafka.
To facilitate the streaming of data, you have to create a DB Stream. This stream is created at the database level, and can access the data in all of that database's tables.
When a client reads the changes from WAL (Write-ahead Log) /IntentDB, the intents are retained and the retention time is controlled by the gflag cdc_intent_retention_ms.
When you create a stream, the checkpoint for a tablet is set as soon as the client requests changes. If the client doesn't request changes within
cdc_intent_retention_ms milliseconds, the CDC service considers the
tablet_id, stream_id combination to be expired, and allows those intents to be removed by the garbage collection process.
Once a stream has expired, you need to create a new stream ID in order to proceed.
WarningIf you set
cdc_intent_retention_msto a high value, and the stream lags for any reason, the intents will be retained for a longer period. This may destabilize your cluster if the number of intents keeps growing.
Debezium connector for YugabyteDB
The Debezium connector for YugabyteDB pulls data from YugabyteDB and publishes it to Kafka. The following illustration explains the pipeline:
There are several GFlags you can use to fine-tune YugabyteDB's CDC behavior. These flags are documented in the Change data capture flags section of the yb-tserver reference page.
Per-tablet ordered delivery guarantee
All changes for a row (or rows in the same tablet) are received in the order in which they happened. However, due to the distributed nature of the problem, there is no guarantee of the order across tablets.
At least once delivery
Updates for rows are streamed at least once. This can happen in the case of Kafka Connect Node failure. If the Kafka Connect Node pushes the records to Kafka and crashes before committing the offset, on restart, it will again get the same set of records.
No gaps in change stream
Note that after you have received a change for a row for some timestamp
t, you won't receive a previously unseen change for that row at a lower timestamp. Receiving any change implies that you have received all older changes for that row.
The change records for CDC are read from the WAL. The CDC module maintains checkpoints internally for each of the stream-ids and garbage collects the WAL entries if those have been streamed to CDC clients.
In case CDC is lagging or away for some time, the disk usage may grow and may cause YugabyteDB cluster instability. To avoid this scenario, if a stream is inactive for a configured amount of time, the WAL is garbage-collected. This is configurable using a Gflag.
The commands used to manipulate CDC DB streams are described in the yb-admin reference documentation.
Initially, if you create a stream for a particular table that already contains some records, the stream takes a snapshot of the table, and streams all the data that resides in the table. After the snapshot of the whole table is completed, YugabyteDB starts streaming the changes that happen in the table.
The snapshot feature uses the
cdc_snapshot_batch_size GFlag. This flag's default value is 250 records included per batch in response to an internal call to get the snapshot. If the table contains a very large amount of data, you may need to increase this value to reduce the amount of time it takes to stream the complete snapshot. You can also choose not to take a snapshot by modifying the Debezium configuration.
If the snapshot fails, you need to restart the connector. Upon restart, if the connector detects that the snapshot has been completed for a given tablet, the connector skips that tablet in the snapshot process and resumes streaming the changes for that tablet.
Taking a snapshot again
If you need to force the connector to take a snapshot again, you should clean up the Kafka topics manually and delete their contents. Then, create a new stream ID, and deploy the connector again with that newly-created stream ID.
You can't take another snapshot of the table using an existing stream ID. In other words, for a given stream ID, if the snapshot process is completed successfully, you can't use that stream ID to take the snapshot again.
Before image refers to the state of the row before the change event occurred. This state is populated during UPDATE and DELETE events as in case of INSERT and READ (Snapshot) events, the change is for the creation of new content.
At any moment, YugabyteDB not only stores the latest state of the data, but also the recent history of changes. By default, the history retention period is controlled by the history retention interval flag, applied cluster-wide to every YSQL database.
However, when before image is enabled for a database, YugabyteDB adjusts the history retention for that database based on the most lagging active CDC stream. When a CDC active stream's lag increases, the amount of space required for the database grows as more data is retained.
There are no technical limitations on the retention target. The actual overhead depends on the workload, and you'll need to estimate it by running tests based on your applications.
You'll need to create a CDC DB stream indicating the server to send the before image of the changed rows with the streams. To learn more about creating streams with before image enabled, see yb-admin.
NoteWrite operations within the current transaction aren't visible in the before image. In other words, the before image is only available for records committed prior to the current transaction.
Dynamic addition of new tables
If a new table is added to a namespace on which there is an active stream ID, a background thread will add the newly created table to the stream ID so that data can now be streamed from the new table as well. The background thread runs at an interval of 1 second and adds
cdcsdk_table_processing_limit_per_run tables per iteration to the stream.
CDC supports packed rows. However, if all the non-key columns of a packed row are modified, CDC emits the changes as an INSERT record rather than an UPDATE record.
- YCQL tables aren't currently supported. Issue 11320.
- CDC behaviour is undefined on downgrading from a CDC supported version (2.13 and newer) to an unsupported version (2.12 and older) and upgrading it back. Issue 12800
- CDC is not supported on a target table for xCluster replication 11829.
- A single stream can only be used to stream data from one namespace only.
- There should be a primary key on the table you want to stream the changes from.
In addition, CDC support for the following features will be added in upcoming releases: