Skip to main content

Change Data Capture (CDC)

Change Data Capture (CDC) tracks row-level changes (inserts, updates, deletes) in a source database and propagates them to a target. Starlake supports CDC through three patterns that all converge on its write strategies for merging changes into target tables.

CDC patterns overview

PatternSourceLatencyDeletesBest for
Push-based (Debezium/Kafka)Kafka topic with CDC eventsNear real-timeYes (via delete marker)Low-latency replication, event-driven architectures
Pull-based (incremental extraction)JDBC query with watermarkMinutes to hoursOnly with soft-delete columnsSimpler setups, no Kafka infrastructure
File-basedChange files (CSV/JSON) with operation columnHours to dailyYes (via operation column)Mainframe CDC, cloud export services

All three patterns use the same write strategies to apply changes:

Write strategyCDC use case
UPSERT_BY_KEY_AND_TIMESTAMPCurrent-state table: keep only the latest version of each record
SCD2Full history: preserve every version with start/end timestamps
DELETE_THEN_INSERTReplace matching keys entirely on each batch

Pattern A: Push-based CDC with Debezium and Kafka

This pattern captures changes in near real-time using Debezium and Apache Kafka.

Source DB --> Debezium --> Kafka topic --> starlake load --> raw staging (APPEND)
|
starlake transform --> target table

Step 1: Configure Debezium

Deploy a Debezium connector with the ExtractNewRecordState (unwrap) SMT. This flattens the Debezium envelope into a simple row with metadata fields.

Debezium connector configuration (external to Starlake)
{
"name": "pg-source-orders",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "source-db",
"database.dbname": "sales",
"table.include.list": "public.orders,public.customers",
"topic.prefix": "cdc.sales",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,source.ts_ms",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.drop.tombstones": "true"
}
}

Key settings:

  • ExtractNewRecordState flattens the before/after envelope into a flat row
  • add.fields appends __op (c=create, u=update, d=delete, r=snapshot) and __source_ts_ms (source timestamp)
  • delete.handling.mode: rewrite converts deletes into regular rows with __deleted = "true"

Step 2: Configure Kafka connection in Starlake

metadata/application.sl.yml
application:
connections:
cdc-kafka:
type: KAFKA
options:
bootstrapServers: "kafka-broker:9092"
kafka:
serverOptions:
bootstrap.servers: "kafka-broker:9092"
topics:
cdc_orders:
topicName: "cdc.sales.public.orders"
maxRead: 100000
fields:
- "cast(value as STRING)"
accessOptions:
kafka.bootstrap.servers: "kafka-broker:9092"
subscribe: "cdc.sales.public.orders"
group.id: "starlake-cdc-orders"

Step 3: Land raw CDC events

Create a raw staging domain that APPENDs all CDC events as-is.

metadata/load/cdc_raw/_config.sl.yml
version: 1
load:
name: "cdc_raw"
metadata:
format: "JSON_FLAT"
writeStrategy:
type: "APPEND"
metadata/load/cdc_raw/orders.sl.yml
version: 1
table:
name: "cdc_orders"
pattern: "cdc_orders"
attributes:
- name: "order_id"
type: "long"
required: true
- name: "customer_id"
type: "long"
- name: "quantity"
type: "integer"
- name: "amount"
type: "decimal"
- name: "order_date"
type: "date"
- name: "updated_at"
type: "timestamp"
- name: "__op"
type: "string"
comment: "Debezium operation: c=create, u=update, d=delete, r=snapshot"
- name: "__source_ts_ms"
type: "long"
comment: "Source database change timestamp in milliseconds"
- name: "__deleted"
type: "string"
comment: "'true' for delete events (rewrite mode)"

Load CDC events from Kafka:

starlake load --domains cdc_raw --tables cdc_orders

Step 4: Apply changes to the target table

Create a transform task that reads from the raw staging table and merges into the target.

Current-state table (latest version of each record):

metadata/transform/sales/orders_current.sl.yml
version: 1
task:
name: "orders_current"
domain: "sales"
table: "orders"
presql:
- |
DELETE FROM sales.orders
WHERE order_id IN (
SELECT order_id FROM cdc_raw.cdc_orders WHERE __deleted = 'true'
)
writeStrategy:
type: "UPSERT_BY_KEY_AND_TIMESTAMP"
key: ["order_id"]
timestamp: "event_ts"
on: "SOURCE_AND_TARGET"
metadata/transform/sales/orders_current.sql
SELECT
order_id,
customer_id,
quantity,
amount,
order_date,
updated_at,
TIMESTAMP_MILLIS(__source_ts_ms) AS event_ts
FROM cdc_raw.cdc_orders
WHERE __deleted != 'true'

The presql statement handles deletes by removing target rows that match delete events. The main SELECT filters out delete events so only inserts and updates are upserted.

Setting on: SOURCE_AND_TARGET deduplicates the incoming batch by key before merging. This is essential for CDC because a single batch may contain multiple events for the same order_id.

Full history table (SCD2):

metadata/transform/sales_history/orders_history.sl.yml
version: 1
task:
name: "orders_history"
domain: "sales_history"
table: "orders"
writeStrategy:
type: "SCD2"
key: ["order_id"]
timestamp: "event_ts"
startTs: "valid_from"
endTs: "valid_to"
on: "SOURCE_AND_TARGET"
metadata/transform/sales_history/orders_history.sql
SELECT
order_id,
customer_id,
quantity,
amount,
order_date,
updated_at,
__op,
TIMESTAMP_MILLIS(__source_ts_ms) AS event_ts
FROM cdc_raw.cdc_orders

SCD2 automatically manages valid_from and valid_to columns. When a new version arrives, the existing current record gets valid_to = CURRENT_TIMESTAMP() and the new record is inserted with valid_from = CURRENT_TIMESTAMP() and valid_to = NULL.


Pattern B: Pull-based CDC with incremental extraction

This pattern uses Starlake's built-in incremental extraction to query the source for new or modified rows. No Kafka infrastructure is needed.

starlake extract-data --incremental --> CSV/Parquet files --> starlake load --> target table

Step 1: Configure incremental extraction

metadata/extract/sales_extract.sl.yml
version: 1
extract:
connectionRef: "source_postgres"
jdbcSchemas:
- schema: "public"
tables:
- name: "orders"
partitionColumn: "updated_at"
numPartitions: 4
fullExport: false
- name: "customers"
partitionColumn: "updated_at"
fullExport: false

Step 2: Extract and load

# Extract only rows changed since last run
starlake extract-data --config sales_extract --outputDir $SL_ROOT/incoming/sales

# Load into target with UPSERT
starlake load --domains sales --tables orders

Step 3: Configure write strategy on the load table

For pull-based CDC, you can apply the merge directly during load (no separate transform needed):

metadata/load/sales/orders.sl.yml
version: 1
table:
name: "orders"
pattern: "orders.*\\.csv"
attributes:
- name: "order_id"
type: "long"
required: true
- name: "customer_id"
type: "long"
- name: "quantity"
type: "integer"
- name: "amount"
type: "decimal"
- name: "order_date"
type: "date"
- name: "updated_at"
type: "timestamp"
metadata:
writeStrategy:
type: "UPSERT_BY_KEY_AND_TIMESTAMP"
key: ["order_id"]
timestamp: "updated_at"
on: "SOURCE_AND_TARGET"
warning

Pull-based CDC cannot detect physical deletes. The query WHERE updated_at > last_watermark never returns deleted rows. If your source uses physical deletes, either:

  • Add a soft-delete column (is_deleted, deleted_at) to the source table
  • Use push-based CDC (Pattern A) instead

Pattern C: File-based CDC

This pattern handles CDC events delivered as files from external systems (mainframe CDC tools, cloud database export services, or custom ETL).

External system drops change files --> landing --> starlake load (APPEND) --> raw staging
|
starlake transform --> target

Step 1: Define the raw staging table

Change files include an operation column indicating the type of change.

Example file orders-20240115-120000.json:

{"op":"I","order_id":1001,"customer_id":5,"quantity":3,"amount":29.99,"order_date":"2024-01-15","updated_at":"2024-01-15T12:00:00"}
{"op":"U","order_id":500,"customer_id":5,"quantity":5,"amount":49.99,"order_date":"2023-06-15","updated_at":"2024-01-15T12:01:00"}
{"op":"D","order_id":200,"customer_id":3,"quantity":null,"amount":null,"order_date":null,"updated_at":"2024-01-15T12:02:00"}
metadata/load/cdc_files/orders.sl.yml
version: 1
table:
name: "orders"
pattern: "orders-.*\\.json"
attributes:
- name: "op"
type: "string"
comment: "I=insert, U=update, D=delete"
- name: "order_id"
type: "long"
required: true
- name: "customer_id"
type: "long"
- name: "quantity"
type: "integer"
- name: "amount"
type: "decimal"
- name: "order_date"
type: "date"
- name: "updated_at"
type: "timestamp"
metadata:
writeStrategy:
type: "APPEND"

Step 2: Transform to target

The transform task applies changes using the same approach as Pattern A:

metadata/transform/sales/orders_from_files.sl.yml
version: 1
task:
name: "orders_from_files"
domain: "sales"
table: "orders"
presql:
- |
DELETE FROM sales.orders
WHERE order_id IN (
SELECT order_id FROM cdc_files.orders WHERE op = 'D'
)
writeStrategy:
type: "UPSERT_BY_KEY_AND_TIMESTAMP"
key: ["order_id"]
timestamp: "updated_at"
on: "SOURCE_AND_TARGET"
metadata/transform/sales/orders_from_files.sql
SELECT
order_id,
customer_id,
quantity,
amount,
order_date,
updated_at
FROM cdc_files.orders
WHERE op != 'D'

Handling deletes

CDC delete handling depends on the pattern and write strategy:

PatternDelete mechanismHow to handle
Push-based (Debezium)__deleted = 'true' column (with rewrite mode)Use presql to DELETE matching keys, filter from main SELECT
Pull-based (incremental)Soft-delete column in source (is_deleted)Include is_deleted in extraction, handle in transform
File-basedOperation column (op = 'D')Use presql to DELETE matching keys, filter from main SELECT

Delete with presql

The presql field runs SQL statements before the main merge. Use it to propagate deletes:

task:
presql:
- |
DELETE FROM target_schema.target_table
WHERE primary_key IN (
SELECT primary_key FROM staging_table WHERE delete_marker = 'true'
)
writeStrategy:
type: "UPSERT_BY_KEY_AND_TIMESTAMP"
key: ["primary_key"]
timestamp: "event_timestamp"

Soft-delete alternative

Instead of physically deleting rows, mark them as deleted:

task:
presql:
- |
UPDATE target_schema.target_table
SET is_active = false, deleted_at = CURRENT_TIMESTAMP()
WHERE primary_key IN (
SELECT primary_key FROM staging_table WHERE delete_marker = 'true'
)

Deduplication with SOURCE_AND_TARGET

CDC batches often contain multiple events for the same key (e.g., a row created then updated within the same micro-batch). Setting on: SOURCE_AND_TARGET deduplicates the incoming data by key before merging:

writeStrategy:
type: "UPSERT_BY_KEY_AND_TIMESTAMP"
key: ["order_id"]
timestamp: "event_ts"
on: "SOURCE_AND_TARGET"

With SOURCE_AND_TARGET, Starlake applies ROW_NUMBER() OVER (PARTITION BY key ORDER BY timestamp DESC) = 1 to the incoming batch, keeping only the most recent event per key. Without this setting (default TARGET), duplicate keys in the incoming batch may cause non-deterministic merge behavior.

tip

Always use on: SOURCE_AND_TARGET for CDC pipelines where a single batch may contain multiple events for the same key.


Monitoring

Freshness checks

Configure freshness thresholds on target tables to alert when data is stale:

In your table or task YAML
metadata:
freshness:
warn: "15 minutes"
error: "1 hour"
starlake freshness

Extraction audit (pull-based CDC)

The SL_LAST_EXPORT table tracks every incremental extraction run with watermark values, row counts, and success status. See Extraction Monitoring for details.

Data quality expectations

Add expectations to validate CDC event integrity:

metadata/expectations/cdc_quality.sl.yml
expectations:
- query: "SELECT COUNT(*) FROM {table} WHERE __op NOT IN ('c','u','d','r')"
operator: "eq"
value: "0"
failOnError: true
comment: "All CDC operations must be valid Debezium types"

Choosing the right pattern

ConsiderationPush-based (A)Pull-based (B)File-based (C)
LatencySeconds to minutesMinutes to hoursHours to daily
InfrastructureKafka + DebeziumJDBC connection onlyFile transfer
Captures deletesYesOnly with soft-deleteYes
Captures all changesYesOnly latest per watermarkDepends on source
Source impactLow (log-based)Medium (queries source)None
ComplexityHighLowMedium

Recommendations:

  • Use Pattern A when you need near real-time replication with full change tracking including deletes
  • Use Pattern B when you have simple requirements, no Kafka, and the source has an updated_at column
  • Use Pattern C when changes are delivered as files from systems you do not control

Frequently Asked Questions

How does Starlake support Change Data Capture (CDC)?

Starlake supports CDC through three patterns: push-based CDC via Kafka (consuming Debezium events), pull-based CDC via incremental JDBC extraction, and file-based CDC. All patterns converge on Starlake's write strategies (UPSERT_BY_KEY_AND_TIMESTAMP, SCD2, DELETE_THEN_INSERT) to merge changes into target tables.

Which write strategy should I use for CDC?

Use UPSERT_BY_KEY_AND_TIMESTAMP for current-state tables that only need the latest version of each record. Use SCD2 for full change history with start and end timestamps. Use DELETE_THEN_INSERT when you need to handle hard deletes or replace entire key sets.

Can Starlake consume Debezium CDC events from Kafka?

Yes. Configure a Debezium connector with the ExtractNewRecordState SMT to flatten CDC events, then use Starlake's Kafka ingestion to consume them into a raw staging table. A transform task then applies write strategies to merge changes into the target.

How does Starlake handle CDC deletes?

For push-based CDC (Debezium), use the ExtractNewRecordState SMT with delete.handling.mode: rewrite, which adds a __deleted column. Use presql to delete matching keys from the target before the UPSERT runs. For pull-based CDC, use soft-delete columns (is_deleted, deleted_at) in the source.

What is the difference between push-based and pull-based CDC?

Push-based CDC uses Debezium and Kafka to stream changes in near real-time from the database transaction log. Pull-based CDC uses Starlake's incremental JDBC extraction (extract-data --incremental) to query the source for new rows on a schedule. Push-based captures all changes including deletes; pull-based only captures rows matching the watermark condition.

How does SOURCE_AND_TARGET deduplication work for CDC?

When writeStrategy.on is set to SOURCE_AND_TARGET, Starlake deduplicates the incoming batch by key before merging with the target table. Only the most recent event (by timestamp) is kept per key. This is essential for CDC because a single batch may contain multiple events for the same record.