Change Data Capture (CDC)
Change Data Capture (CDC) tracks row-level changes (inserts, updates, deletes) in a source database and propagates them to a target. Starlake supports CDC through three patterns that all converge on its write strategies for merging changes into target tables.
CDC patterns overview
| Pattern | Source | Latency | Deletes | Best for |
|---|---|---|---|---|
| Push-based (Debezium/Kafka) | Kafka topic with CDC events | Near real-time | Yes (via delete marker) | Low-latency replication, event-driven architectures |
| Pull-based (incremental extraction) | JDBC query with watermark | Minutes to hours | Only with soft-delete columns | Simpler setups, no Kafka infrastructure |
| File-based | Change files (CSV/JSON) with operation column | Hours to daily | Yes (via operation column) | Mainframe CDC, cloud export services |
All three patterns use the same write strategies to apply changes:
| Write strategy | CDC use case |
|---|---|
UPSERT_BY_KEY_AND_TIMESTAMP | Current-state table: keep only the latest version of each record |
SCD2 | Full history: preserve every version with start/end timestamps |
DELETE_THEN_INSERT | Replace matching keys entirely on each batch |
Pattern A: Push-based CDC with Debezium and Kafka
This pattern captures changes in near real-time using Debezium and Apache Kafka.
Source DB --> Debezium --> Kafka topic --> starlake load --> raw staging (APPEND)
|
starlake transform --> target table
Step 1: Configure Debezium
Deploy a Debezium connector with the ExtractNewRecordState (unwrap) SMT. This flattens the Debezium envelope into a simple row with metadata fields.
{
"name": "pg-source-orders",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "source-db",
"database.dbname": "sales",
"table.include.list": "public.orders,public.customers",
"topic.prefix": "cdc.sales",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,source.ts_ms",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.drop.tombstones": "true"
}
}
Key settings:
ExtractNewRecordStateflattens thebefore/afterenvelope into a flat rowadd.fieldsappends__op(c=create, u=update, d=delete, r=snapshot) and__source_ts_ms(source timestamp)delete.handling.mode: rewriteconverts deletes into regular rows with__deleted = "true"
Step 2: Configure Kafka connection in Starlake
application:
connections:
cdc-kafka:
type: KAFKA
options:
bootstrapServers: "kafka-broker:9092"
kafka:
serverOptions:
bootstrap.servers: "kafka-broker:9092"
topics:
cdc_orders:
topicName: "cdc.sales.public.orders"
maxRead: 100000
fields:
- "cast(value as STRING)"
accessOptions:
kafka.bootstrap.servers: "kafka-broker:9092"
subscribe: "cdc.sales.public.orders"
group.id: "starlake-cdc-orders"
Step 3: Land raw CDC events
Create a raw staging domain that APPENDs all CDC events as-is.
version: 1
load:
name: "cdc_raw"
metadata:
format: "JSON_FLAT"
writeStrategy:
type: "APPEND"
version: 1
table:
name: "cdc_orders"
pattern: "cdc_orders"
attributes:
- name: "order_id"
type: "long"
required: true
- name: "customer_id"
type: "long"
- name: "quantity"
type: "integer"
- name: "amount"
type: "decimal"
- name: "order_date"
type: "date"
- name: "updated_at"
type: "timestamp"
- name: "__op"
type: "string"
comment: "Debezium operation: c=create, u=update, d=delete, r=snapshot"
- name: "__source_ts_ms"
type: "long"
comment: "Source database change timestamp in milliseconds"
- name: "__deleted"
type: "string"
comment: "'true' for delete events (rewrite mode)"
Load CDC events from Kafka:
starlake load --domains cdc_raw --tables cdc_orders
Step 4: Apply changes to the target table
Create a transform task that reads from the raw staging table and merges into the target.
Current-state table (latest version of each record):
version: 1
task:
name: "orders_current"
domain: "sales"
table: "orders"
presql:
- |
DELETE FROM sales.orders
WHERE order_id IN (
SELECT order_id FROM cdc_raw.cdc_orders WHERE __deleted = 'true'
)
writeStrategy:
type: "UPSERT_BY_KEY_AND_TIMESTAMP"
key: ["order_id"]
timestamp: "event_ts"
on: "SOURCE_AND_TARGET"
SELECT
order_id,
customer_id,
quantity,
amount,
order_date,
updated_at,
TIMESTAMP_MILLIS(__source_ts_ms) AS event_ts
FROM cdc_raw.cdc_orders
WHERE __deleted != 'true'
The presql statement handles deletes by removing target rows that match delete events. The main SELECT filters out delete events so only inserts and updates are upserted.
Setting on: SOURCE_AND_TARGET deduplicates the incoming batch by key before merging. This is essential for CDC because a single batch may contain multiple events for the same order_id.
Full history table (SCD2):
version: 1
task:
name: "orders_history"
domain: "sales_history"
table: "orders"
writeStrategy:
type: "SCD2"
key: ["order_id"]
timestamp: "event_ts"
startTs: "valid_from"
endTs: "valid_to"
on: "SOURCE_AND_TARGET"
SELECT
order_id,
customer_id,
quantity,
amount,
order_date,
updated_at,
__op,
TIMESTAMP_MILLIS(__source_ts_ms) AS event_ts
FROM cdc_raw.cdc_orders
SCD2 automatically manages valid_from and valid_to columns. When a new version arrives, the existing current record gets valid_to = CURRENT_TIMESTAMP() and the new record is inserted with valid_from = CURRENT_TIMESTAMP() and valid_to = NULL.
Pattern B: Pull-based CDC with incremental extraction
This pattern uses Starlake's built-in incremental extraction to query the source for new or modified rows. No Kafka infrastructure is needed.
starlake extract-data --incremental --> CSV/Parquet files --> starlake load --> target table
Step 1: Configure incremental extraction
version: 1
extract:
connectionRef: "source_postgres"
jdbcSchemas:
- schema: "public"
tables:
- name: "orders"
partitionColumn: "updated_at"
numPartitions: 4
fullExport: false
- name: "customers"
partitionColumn: "updated_at"
fullExport: false
Step 2: Extract and load
# Extract only rows changed since last run
starlake extract-data --config sales_extract --outputDir $SL_ROOT/incoming/sales
# Load into target with UPSERT
starlake load --domains sales --tables orders
Step 3: Configure write strategy on the load table
For pull-based CDC, you can apply the merge directly during load (no separate transform needed):
version: 1
table:
name: "orders"
pattern: "orders.*\\.csv"
attributes:
- name: "order_id"
type: "long"
required: true
- name: "customer_id"
type: "long"
- name: "quantity"
type: "integer"
- name: "amount"
type: "decimal"
- name: "order_date"
type: "date"
- name: "updated_at"
type: "timestamp"
metadata:
writeStrategy:
type: "UPSERT_BY_KEY_AND_TIMESTAMP"
key: ["order_id"]
timestamp: "updated_at"
on: "SOURCE_AND_TARGET"
Pull-based CDC cannot detect physical deletes. The query WHERE updated_at > last_watermark never returns deleted rows. If your source uses physical deletes, either:
- Add a soft-delete column (
is_deleted,deleted_at) to the source table - Use push-based CDC (Pattern A) instead
Pattern C: File-based CDC
This pattern handles CDC events delivered as files from external systems (mainframe CDC tools, cloud database export services, or custom ETL).
External system drops change files --> landing --> starlake load (APPEND) --> raw staging
|
starlake transform --> target
Step 1: Define the raw staging table
Change files include an operation column indicating the type of change.
Example file orders-20240115-120000.json:
{"op":"I","order_id":1001,"customer_id":5,"quantity":3,"amount":29.99,"order_date":"2024-01-15","updated_at":"2024-01-15T12:00:00"}
{"op":"U","order_id":500,"customer_id":5,"quantity":5,"amount":49.99,"order_date":"2023-06-15","updated_at":"2024-01-15T12:01:00"}
{"op":"D","order_id":200,"customer_id":3,"quantity":null,"amount":null,"order_date":null,"updated_at":"2024-01-15T12:02:00"}
version: 1
table:
name: "orders"
pattern: "orders-.*\\.json"
attributes:
- name: "op"
type: "string"
comment: "I=insert, U=update, D=delete"
- name: "order_id"
type: "long"
required: true
- name: "customer_id"
type: "long"
- name: "quantity"
type: "integer"
- name: "amount"
type: "decimal"
- name: "order_date"
type: "date"
- name: "updated_at"
type: "timestamp"
metadata:
writeStrategy:
type: "APPEND"
Step 2: Transform to target
The transform task applies changes using the same approach as Pattern A:
version: 1
task:
name: "orders_from_files"
domain: "sales"
table: "orders"
presql:
- |
DELETE FROM sales.orders
WHERE order_id IN (
SELECT order_id FROM cdc_files.orders WHERE op = 'D'
)
writeStrategy:
type: "UPSERT_BY_KEY_AND_TIMESTAMP"
key: ["order_id"]
timestamp: "updated_at"
on: "SOURCE_AND_TARGET"
SELECT
order_id,
customer_id,
quantity,
amount,
order_date,
updated_at
FROM cdc_files.orders
WHERE op != 'D'
Handling deletes
CDC delete handling depends on the pattern and write strategy:
| Pattern | Delete mechanism | How to handle |
|---|---|---|
| Push-based (Debezium) | __deleted = 'true' column (with rewrite mode) | Use presql to DELETE matching keys, filter from main SELECT |
| Pull-based (incremental) | Soft-delete column in source (is_deleted) | Include is_deleted in extraction, handle in transform |
| File-based | Operation column (op = 'D') | Use presql to DELETE matching keys, filter from main SELECT |
Delete with presql
The presql field runs SQL statements before the main merge. Use it to propagate deletes:
task:
presql:
- |
DELETE FROM target_schema.target_table
WHERE primary_key IN (
SELECT primary_key FROM staging_table WHERE delete_marker = 'true'
)
writeStrategy:
type: "UPSERT_BY_KEY_AND_TIMESTAMP"
key: ["primary_key"]
timestamp: "event_timestamp"
Soft-delete alternative
Instead of physically deleting rows, mark them as deleted:
task:
presql:
- |
UPDATE target_schema.target_table
SET is_active = false, deleted_at = CURRENT_TIMESTAMP()
WHERE primary_key IN (
SELECT primary_key FROM staging_table WHERE delete_marker = 'true'
)
Deduplication with SOURCE_AND_TARGET
CDC batches often contain multiple events for the same key (e.g., a row created then updated within the same micro-batch). Setting on: SOURCE_AND_TARGET deduplicates the incoming data by key before merging:
writeStrategy:
type: "UPSERT_BY_KEY_AND_TIMESTAMP"
key: ["order_id"]
timestamp: "event_ts"
on: "SOURCE_AND_TARGET"
With SOURCE_AND_TARGET, Starlake applies ROW_NUMBER() OVER (PARTITION BY key ORDER BY timestamp DESC) = 1 to the incoming batch, keeping only the most recent event per key. Without this setting (default TARGET), duplicate keys in the incoming batch may cause non-deterministic merge behavior.
Always use on: SOURCE_AND_TARGET for CDC pipelines where a single batch may contain multiple events for the same key.
Monitoring
Freshness checks
Configure freshness thresholds on target tables to alert when data is stale:
metadata:
freshness:
warn: "15 minutes"
error: "1 hour"
starlake freshness
Extraction audit (pull-based CDC)
The SL_LAST_EXPORT table tracks every incremental extraction run with watermark values, row counts, and success status. See Extraction Monitoring for details.
Data quality expectations
Add expectations to validate CDC event integrity:
expectations:
- query: "SELECT COUNT(*) FROM {table} WHERE __op NOT IN ('c','u','d','r')"
operator: "eq"
value: "0"
failOnError: true
comment: "All CDC operations must be valid Debezium types"
Choosing the right pattern
| Consideration | Push-based (A) | Pull-based (B) | File-based (C) |
|---|---|---|---|
| Latency | Seconds to minutes | Minutes to hours | Hours to daily |
| Infrastructure | Kafka + Debezium | JDBC connection only | File transfer |
| Captures deletes | Yes | Only with soft-delete | Yes |
| Captures all changes | Yes | Only latest per watermark | Depends on source |
| Source impact | Low (log-based) | Medium (queries source) | None |
| Complexity | High | Low | Medium |
Recommendations:
- Use Pattern A when you need near real-time replication with full change tracking including deletes
- Use Pattern B when you have simple requirements, no Kafka, and the source has an
updated_atcolumn - Use Pattern C when changes are delivered as files from systems you do not control
Frequently Asked Questions
How does Starlake support Change Data Capture (CDC)?
Starlake supports CDC through three patterns: push-based CDC via Kafka (consuming Debezium events), pull-based CDC via incremental JDBC extraction, and file-based CDC. All patterns converge on Starlake's write strategies (UPSERT_BY_KEY_AND_TIMESTAMP, SCD2, DELETE_THEN_INSERT) to merge changes into target tables.
Which write strategy should I use for CDC?
Use UPSERT_BY_KEY_AND_TIMESTAMP for current-state tables that only need the latest version of each record. Use SCD2 for full change history with start and end timestamps. Use DELETE_THEN_INSERT when you need to handle hard deletes or replace entire key sets.
Can Starlake consume Debezium CDC events from Kafka?
Yes. Configure a Debezium connector with the ExtractNewRecordState SMT to flatten CDC events, then use Starlake's Kafka ingestion to consume them into a raw staging table. A transform task then applies write strategies to merge changes into the target.
How does Starlake handle CDC deletes?
For push-based CDC (Debezium), use the ExtractNewRecordState SMT with delete.handling.mode: rewrite, which adds a __deleted column. Use presql to delete matching keys from the target before the UPSERT runs. For pull-based CDC, use soft-delete columns (is_deleted, deleted_at) in the source.
What is the difference between push-based and pull-based CDC?
Push-based CDC uses Debezium and Kafka to stream changes in near real-time from the database transaction log. Pull-based CDC uses Starlake's incremental JDBC extraction (extract-data --incremental) to query the source for new rows on a schedule. Push-based captures all changes including deletes; pull-based only captures rows matching the watermark condition.
How does SOURCE_AND_TARGET deduplication work for CDC?
When writeStrategy.on is set to SOURCE_AND_TARGET, Starlake deduplicates the incoming batch by key before merging with the target table. Only the most recent event (by timestamp) is kept per key. This is essential for CDC because a single batch may contain multiple events for the same record.
Related
- Write Strategies -- detailed reference for APPEND, UPSERT, SCD2, DELETE_THEN_INSERT and ADAPTATIVE
- Incremental Extraction -- configure pull-based CDC with partitionColumn and SL_LAST_EXPORT
- Kafka Ingestion -- consume CDC events from Kafka topics
- Load Tutorial -- end-to-end walkthrough for loading files into your warehouse
- Expectations -- data quality checks for CDC event validation