Skip to main content

Transformation Capabilities for Python

1. PySpark Runtime

Python transforms require a Spark runtime (Databricks, EMR, or standalone Spark). They do not work with DuckDB, BigQuery, or Snowflake standalone engines.

2. Output via SL_THIS View

The transform script must register its result as a temporary view named SL_THIS. Starlake reads this view to materialize the output into the target table.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

result = (
spark.table("starbake.products")
.groupBy("category")
.agg(
F.sum(F.col("quantity") * F.col("sale_price")).alias("total_revenue"),
F.sum("quantity").alias("total_units_sold")
)
)

result.createOrReplaceTempView("SL_THIS")

3. Schema Matching

If the target table exists, the DataFrame schema must match the table schema. If the target table does not exist, Starlake infers the schema from the DataFrame.

4. Parameter Passing

Arguments are passed via the --options flag and converted to command-line arguments for the script.

starlake transform --name domain.task --options key1=value1,key2=value2

This is converted to:

task.py --key1 value1 --key2 value2

Parse with argparse or sys.argv in the script.

5. Source Table Access

Any table accessible to the Spark session can be read with spark.table("schema.table"). This includes tables loaded by Starlake ingestion and tables from external catalogs.

6. YAML Configuration

Python transforms use the same .sl.yml configuration file as SQL transforms, supporting all the same properties: write strategies, partitioning, clustering, ACL, RLS, expectations, export, freshness, and scheduling.

task:
writeStrategy:
type: OVERWRITE
sink:
partition:
- category
clustering:
- product_id

7. Write Strategies

All write strategies available for SQL transforms are supported:

StrategyDescription
APPENDInsert all new rows (default)
OVERWRITEReplace all existing rows
UPSERT_BY_KEYMerge by key
UPSERT_BY_KEY_AND_TIMESTAMPMerge by key with timestamp comparison
OVERWRITE_BY_PARTITIONOverwrite only affected partitions
DELETE_THEN_INSERTDelete matching keys, then insert
SCD2Slowly Changing Dimension Type 2

8. Pre/Post SQL Hooks

SQL statements can be executed before and after the Python transform via presql and postsql in the YAML configuration.

task:
presql:
- "TRUNCATE TABLE staging.ml_features"
postsql:
- "ANALYZE TABLE ml.predictions COMPUTE STATISTICS"

9. Cross-Database Writes

The sink.connectionRef property directs output to a specific database connection.

task:
sink:
connectionRef: target_db

10. Export to Files

Results can be exported to files (CSV, JSON, Parquet, Avro, ORC) with optional coalescing.

task:
sink:
format: parquet
path: mnt/data/ml_output
coalesce: true

11. Access Control

Table-level ACL and row-level security (RLS) are applied to the output table using the same syntax as SQL transforms and ingestion.

task:
acl:
- role: SELECT
grants:
- user:[email protected]
rls:
- name: "region filter"
predicate: "region = 'EMEA'"
grants:
- "group:emea-team"

12. Post-Transform Expectations

Data quality assertions are evaluated after the transform completes. See the full Expectations reference for all 53 built-in macros covering completeness, validity, volume, schema, uniqueness, and numeric checks.

13. Scheduling and DAG Integration

Python transforms support cron scheduling and DAG triggering.

task:
schedule: "0 3 * * *"
dagRef: "ml_pipeline"

14. Task Timeout

A taskTimeoutMs property sets the maximum execution time in milliseconds.

task:
taskTimeoutMs: 7200000

15. Freshness Monitoring

The freshness property defines staleness thresholds for the output data.

task:
freshness:
warn: "12h"
error: "1d"

SQL vs Python Comparison

AspectSQLPython
Source file.sql.py
RuntimeDuckDB, BigQuery, Snowflake, Spark, DatabricksSpark, Databricks, EMR only
Use caseStandard queries, joins, aggregationsML, text processing, API calls, complex logic
Output mechanismSELECT resultSL_THIS temporary view
YAML configurationSame formatSame format
Pre/Post SQL hooksSupportedSupported
Write strategiesAllAll
ExpectationsSupportedSupported

Summary

CapabilityCategory
PySpark DataFrame processingCore
Output via SL_THIS temporary viewCore
Schema inference or matchingCore
Parameter passing (--options)Core
Pre/Post SQL hooks (presql / postsql)Core
Write strategies (APPEND, OVERWRITE, UPSERT, SCD2, etc.)Write
Cross-database writes (sink.connectionRef)Write
Partitioning and clusteringWrite
Export to files (CSV, JSON, Parquet, Avro, ORC)Export
Access control (ACL, RLS)Security
Post-transform expectationsData Quality
Freshness monitoringData Quality
Scheduling and DAG integrationOrchestration
Task timeoutOrchestration