CDC from DynamoDB to ClickHouse
This page covers how set up CDC from DynamoDB to ClickHouse using ClickPipes. There are 2 components to this integration:
- The initial snapshot via S3 ClickPipes
- Real-time updates via Kinesis ClickPipes
Data will be ingested into a ReplacingMergeTree
. This table engine is commonly used for CDC scenarios to allow update operations to be applied. More on this pattern can be found in the following blog articles:
- Change Data Capture (CDC) with PostgreSQL and ClickHouse - Part 1
- Change Data Capture (CDC) with PostgreSQL and ClickHouse - Part 2
1. Set up Kinesis Stream
First, you will want to enable a Kinesis stream on your DynamoDB table to capture changes in real-time. We want to do this before we create the snapshot to avoid missing any data. Find the AWS guide located here.
2. Create the snapshot
Next, we will create a snapshot of the DynamoDB table. This can be achieved through an AWS export to S3. Find the AWS guide located here. You will want to do a "Full export" in the DynamoDB JSON format.
3. Load the snapshot into ClickHouse
Create necessary tables
The snapshot data from DynamoDB will look something this:
{
"age": {
"N": "26"
},
"first_name": {
"S": "sally"
},
"id": {
"S": "0A556908-F72B-4BE6-9048-9E60715358D4"
}
}
Observe that the data is in a nested format. We will need to flatten this data before loading it into ClickHouse. This can be done using the JSONExtract
function in ClickHouse in a Materialized View.
We will want to create three tables:
- A table to store the raw data from DynamoDB
- A table to store the final flattened data (destination table)
- A Materialized View to flatten the data
For the example DynamoDB data above, the ClickHouse tables would look like this:
/* Snapshot table */
CREATE TABLE IF NOT EXISTS "default"."snapshot"
(
`item` String
)
ORDER BY tuple();
/* Table for final flattened data */
CREATE MATERIALIZED VIEW IF NOT EXISTS "default"."snapshot_mv" TO "default"."destination" AS
SELECT
JSONExtractString(item, 'id', 'S') AS id,
JSONExtractInt(item, 'age', 'N') AS age,
JSONExtractString(item, 'first_name', 'S') AS first_name
FROM "default"."snapshot";
/* Table for final flattened data */
CREATE TABLE IF NOT EXISTS "default"."destination" (
"id" String,
"first_name" String,
"age" Int8,
"version" Int64
)
ENGINE ReplacingMergeTree("version")
ORDER BY id;
There are a few requirements for the destination table:
- This table must be a
ReplacingMergeTree
table - The table must have a
version
column- In later steps, we will be mapping the
ApproximateCreationDateTime
field from the Kinesis stream to theversion
column.
- In later steps, we will be mapping the
- The table should use the partition key as the sorting key (specified by
ORDER BY
)- Rows with the same sorting key will be deduplicated based on the
version
column.
- Rows with the same sorting key will be deduplicated based on the
Create the snapshot ClickPipe
Now you can create a ClickPipe to load the snapshot data from S3 into ClickHouse. Follow the S3 ClickPipe guide here, but use the following settings:
- Ingest path: You will need to locate the path of the exported json files in S3. The path will look something like this:
https://{bucket}.s3.amazonaws.com/{prefix}/AWSDynamoDB/{export-id}/data/*
- Format: JSONEachRow
- Table: Your snapshot table (e.g.
default.snapshot
in example above)
Once created, data will begin populating in the snapshot and destination tables. You do not need to wait for the snapshot load to finish before moving on to the next step.
4. Create the Kinesis ClickPipe
Now we can set up the Kinesis ClickPipe to capture real-time changes from the Kinesis stream. Follow the Kinesis ClickPipe guide here, but use the following settings:
- Stream: The Kinesis stream used in step 1
- Table: Your destination table (e.g.
default.destination
in example above) - Flatten object: true
- Column mappings:
ApproximateCreationDateTime
:version
- Map other fields to the appropriate destination columns as shown below
5. Cleanup (optional)
Once the snapshot ClickPipe has finished, you can delete the snapshot table and materialized view.
DROP TABLE IF EXISTS "default"."snapshot";
DROP TABLE IF EXISTS "default"."snapshot_clickpipes_error";
DROP VIEW IF EXISTS "default"."snapshot_mv";