top of page
  • Writer's pictureGang Tao

Change Data Capture (CDC) for Real-time Analytics

Change Data Capture (CDC) is a term used to describe when changed data (delta) in a database is being traced in real-time. The changed data is used to take actions, such as synchronizing source data into a target system, which is a typical use case of data integration.


 

Why is CDC required in Data Analytics?


CDC has become very popular these days due to its ability to deliver real-time data integration and support modern architectural patterns. Let’s take a look at the architecture evolution of data analytics.


At the very beginning of data analytics, all data is stored in one database (usually OLTP) – all business data is here and all business operations depend on this one database. The data analyst uses the same database to run analytics. For a small business where there is not much data, this is actually not a bad idea; it is simple to manage and the operator is usually the same person as the analyst.


But as a business grows, there are some issues with this architecture:

  • Since business operations and data analytics share the same database, these two actions will impact each other.

  • OLTP databases are usually not designed for analyzing large datasets, and performance becomes a challenge.

  • There is usually no historical data in the OLTP. For a table with a primary key, the update only keeps the latest data. If historical data is required in your analytics, this becomes an issue as the data will be gone.


Thus, separating operational data and analytic data is necessary here. The architecture evolves as two separated data systems where the operational data stores in an OLTP database and the analytical data stores in an OLAP database, an ETL component will extract, transform and load the data from TP to AP.


This new architecture basically solved all the issues above:

  • The two systems have separated the data of operational data and analytic data so these two actions will no longer impact each others

  • OLAP, usually with column store, are designed for large volumes of data analytics

  • The ETL component periodically synchronizes the data from OLTP to OLAP, so historical data is kept in OLAP system based on the interval of synchronization


While the traditional ETL usually runs in a batch mode, there is high latency between the source data and target data, so there is always a lag between these two data systems, which means your analytic can only access the old data instead of latest data. And some historical data will be lost due to the batch ETL not happening in real-time.


There is where CDC comes to the rescue.


Unlike traditional ETL, CDC tries to catch the data change in real-time, so every time a record is being inserted, updated, or deleted into/from a table, a change event is generated in real-time, and the target system will be synchronized with low latency, keeping all the historical data. Every single change of the table is kept and your analytic is running on the latest version of the same data in the OLTP system.


This is why people love CDC – it is real-time!


 

How to implement CDC


There are different ways to implement CDC:


Log based

With log based CDC, the change event is generated based on the transactional log of the database. This is the most popular CDC implementation as it has following benefit:

  • Non-intrusive: Log-based CDC operates at the database log level, without requiring modifications to the source database schema or adding triggers. It does not impact the performance of the source system.

  • Complete Change History: Log-based CDC captures all changes, including low-level operations, providing a comprehensive audit trail and enabling point-in-time recovery.

  • High Performance: It can handle high transaction volumes efficiently, as it operates at a lower level in the database engine.


However, there are limitations as well:

  • Database Dependency: Log-based CDC is tightly coupled to the specific database engine's log format and implementation. It may require vendor-specific solutions or log parsing libraries.

  • Complexity: Parsing and interpreting database logs can be complex, requiring expertise in database internals and log formats.


Debezium is a popular log based CDC solution. It handles the complexity of how to handle the change log from different databases such as MySQL, PostgreSQL, MongoDB, Oracle, SQL Server.


Query Based

Another way to do CDC is periodically run the database query , keep the query history and calculate the change for every query. Leveraging a table timestamp column or an incremental key, it retrieves only those rows that have changed since the data was last extracted.


There are pros and cons for this method as well:


Pros:


  • Standardized Approach: Many databases and systems provide standard queries or JDBC/ODBC connection, offering a standardized and supported method for capturing data changes.

  • Reduced Performance Impact: Database query implementations are typically optimized for performance and have minimal impact on the source database.

  • Flexibility: SQL Query often provides additional features such as filtering, change metadata, allowing customization based on specific needs.

Cons:

  • Latency : with this solution, the CDC will need to pull the query periodically which will usually generate some latency, and there might be missing changes during different query pulls.

  • Data Limitation : not all tables contain timestamp or incremental key, in such case, query based pulling CDC cannot find the difference between different pullings.


When I was working at Splunk, our team was responsible for developing a very popular application call DB Connect, which is actually a Query based CDC solution


Trigger Based

Triggers are a common database feature and are supported by various databases such as Oracle, DB2, SQLServer, MySQL, Postgres. Defining triggers can enable users to create change logs in shadow tables.


Trigger can provide real-time changs, while it is kind of complex and will generate performance impact based on the number of triggers users create, not many CDC solutions today are based on triggers.


 

Support E2E real-time Analytics in Timeplus using CDC


As a real-time data analytic platform, Timeplus now provides an easy way to directly integrate with Debezium CDC, in which case, the user can seamlessly run a real-time streaming query on a data stream that is a local representation of a remote database table.


Like I mentioned before, in this way:

  • The local stream contains the latest data from source table, it is real-time synchronization

  • The local stream contains all the historical data version



The above diagram shows how the end-to-end deployment works, a Debezium connector is running on Kafka to convert the database change into a Kafka topic, the user can create a Kafka Source in Timeplus, specify the payload type as Debezium JSON, and the Kafka Source will process the data from the Kafka topic and create a local stream, mirroring the remote database table.


Let me show some examples to explain it in detail.


Here are the steps to setup Debezium CDC and synchronize data to Timeplus.


  1. Setup your database correctly so that the change log is enabled and the user has the permission to read the change log.

  2. Setup your Kafka cluster with Debezium installed.

  3. Create a Debezium connector which synchronize table data change from your source database into a target kafka topic

  4. Create a Timeplus Kafka source using Debezium JSON as data source type. If you create such a source on Timeplus Cloud Console, target stream will be created for each source table on your Timeplus workspace.

  5. Run your streaming query on these sources to do real-time analysis.


Assuming you have successfully finished step 1 to 3, refer to Debezium connector for MySQL or Debezium connector for PostgreSQL for details.


Assuming you have created a source table and made some insert/update/delete on the table :



CREATETABLE positions (
   Account varchar(255),
   Symbol varchar(255) NOTNULL PRIMARY KEY,
   Volume int
);
-- initial position
INSERT INTO positions (Account, Symbol, Volume) VALUES ('a00001', 'MSFT', 100);

-- update position on trade
UPDATE positions SET Volume = 0 
WHERE Account = 'a00001' and Symbol = 'MSFT';

-- delete an account
DELETE FROM positions 
WHERE Account = 'a00001';

In this case, the table has three columns which record the security position of a specific account, the position will change when the user buys/sells securities and in some cases, the account gets deleted.


If your Debezium connector is correctly configured (assuming you are using default configuration options), you should be able to get some data from related kafka topics (Debezium will create a topic for each source table for the change data).


Change data is in JSON format, Here are the `payload` field of the change data , which contains the most important information about data change:


// insert
{
"after": {
"Account": "a00001",
"Symbol": "MSFT",
"Volume": 100
   },
"before": null,
"op": "c"
}

// update
{
"after": {
"Account": "a00001",
"Symbol": "MSFT",
"Volume": 0
   },
"before": {
"Account": "a00001",
"Symbol": "MSFT",
"Volume": 100
   },
"op": "u"
}

// delete
{
"after": null,
"before": {
"Account": "a00001",
"Symbol": "MSFT",
"Volume": 0
   },
"op": "d"
}

  • Insert For insert events, Debezium will generate a payload with before as null and after contains the json object of all the inserted fields. The `op` represents the operation is `c`

  • Update For update events, Debezium will generate a payload with `before` as record json object before update and `after` as record json object after deletion

  • Delete For delete events, Debezium will generate a payload with `before` as record json object before deletion and `after` as null

These are what change means, the payload representing the data information before event happens and after the event happens.



Now let’s set up a Kafka Source in Timeplus to synchronize this table into Timeplus


In Timeplus console UI, chose add data


Then chose `Apache Kafka`


Configure your Kafka brokers with required information


Input your Kafka Topic name, Chose either `Debezium JSON` or `Debezium JSON (upsert only)`

Choosing these two different options will impact the type of stream that Timeplus will create for the CDC data.

  • For Debezium JSON, a changelog stream will be created which support the insert/update/delete semantic

  • For Debezium JSON (upsert only), a versioned kv stream will be created which only support insert/update and there is no delete semantic


In case your source table does not need delete operation, it is best to use upsert only mode where you will get the best performance and functionality on streaming query with versioned kv stream.


In case your source table does need delete operation, changelog stream should be used


In this sample, let’s assume your source table needs delete which is the most common scenario.


In the next step, Timeplus will do some schema inference based on the JSON data payload in the Kafka topic. Give your stream a name and select which field is the primary key.



Finally, after reviewing the configuration, click `Create the source`, if everything is configured correctly, a Kafka source that reads the Debezium payload and writes to Timeplus changelog stream is created.


You can now query that changelog stream, when you run those insert/update/delete from your source database, the change data will be shown on the UI as the query result in real-time.


For change log stream, there are two internal fields introduced to support query, one is the event time fields and the other is `_tp_delta`,

  • For insert operation, one record with `_tp_delta = 1` is created.

  • For update cooperation, two records with `_tp_delta = -1` and `_tp_delta = 1` are created, representing the data before and after change

  • For delete operation, one record with `_tp_delta = -1` is created.

These are actually consistent with the Debezium data where `before` and `after` is used to represent the change. Timeplus uses a `_tp_delta` field to make sure all the streaming query can handle the change in an incremental way.


With streaming query on changelog stream, any change on the source table will impact the stream query in the realtime, there is no GAP between your operational data and your analytic data!


Summary


By supporting Debezium JSON payload in Timeplus Kafka Source, we have enabled the user to seamlessly integrate the two data systems of TP and AP, the user can get real-time insight on any update on the source data, decision making will always leverage your latest business data without delay.


Try it yourself! Visit timeplus.com and create your own workspace for real-time analytics.

Comments