Skip to main content

Customization

Starlake comes with built-in DAG templates that work out of the box. This page covers how to customize these templates, inject parameters at runtime, and manage task dependencies.

You just need to be comfortable with Jinja2 templating and Python to customize DAG generation.

Architecture overview

The orchestration library is organized around a factory pattern:

IStarlakeJob (generic interface)
├── StarlakeAirflowJob (Airflow base)
│ ├── StarlakeAirflowBashJob ← shell execution
│ ├── StarlakeAirflowCloudRunJob ← GCP Cloud Run
│ ├── StarlakeAirflowDataprocJob ← GCP Dataproc
│ └── StarlakeAirflowFargateJob ← AWS Fargate
├── StarlakeDagsterJob (Dagster base)
│ ├── StarlakeDagsterShellJob ← shell execution
│ ├── StarlakeDagsterCloudRunJob ← GCP Cloud Run
│ ├── StarlakeDagsterDataprocJob ← GCP Dataproc
│ └── StarlakeDagsterFargateJob ← AWS Fargate
└── StarlakeSnowflakeJob ← Snowflake native SQL

The appropriate class is instantiated automatically based on the template you choose. The StarlakeJobFactory discovers and registers all implementations via a plugin-based registry.

Core methods

Every factory class provides these methods for generating orchestrator tasks:

MethodDescriptionStarlake command
sl_import(task_id, domain, tables)Import/stage data from the landing areastarlake stage
sl_pre_load(domain, tables, pre_load_strategy)Check pre-load conditionsstarlake preload
sl_load(task_id, domain, table, spark_config, dataset)Load a tablestarlake load
sl_transform(task_id, transform_name, transform_options, spark_config, dataset)Run a transformationstarlake transform
sl_job(task_id, arguments, spark_config, dataset, task_type)Generic command (used internally by the above)any starlake command

Templates

Built-in templates

Templates are in the src/main/resources/templates/dags resource directory, organized by operation type.

Load templates

TemplateExecution environmentFactory class
load/airflow_scheduled_table_bash.py.j2Shell/BashStarlakeAirflowBashJob
load/airflow_scheduled_table_cloud_run.py.j2GCP Cloud RunStarlakeAirflowCloudRunJob
load/airflow_scheduled_table_dataproc.py.j2GCP DataprocStarlakeAirflowDataprocJob
load/airflow_scheduled_table_fargate.py.j2AWS FargateStarlakeAirflowFargateJob

Transform templates

TemplateExecution environmentFactory class
transform/airflow_scheduled_task_bash.py.j2Shell/BashStarlakeAirflowBashJob
transform/airflow_scheduled_task_cloud_run.py.j2GCP Cloud RunStarlakeAirflowCloudRunJob
transform/airflow_scheduled_task_dataproc.py.j2GCP DataprocStarlakeAirflowDataprocJob
transform/airflow_scheduled_task_fargate.py.j2AWS FargateStarlakeAirflowFargateJob

Custom templates

You can create custom templates that extend the built-in ones. Place them in ${SL_ROOT}/metadata/dags/templates/.

A custom template typically:

  1. Defines custom variables (e.g., jobs, user_defined_macros)
  2. Includes the built-in template via a Jinja2 {% include %} statement
metadata/dags/templates/custom_scheduled_task_cloud_run.py.j2
{% include 'dags/templates/__custom_jobs.py.j2' %}
{% include 'templates/dags/transform/airflow_scheduled_task_cloud_run.py.j2' %}

Then reference it in your DAG configuration:

metadata/dags/custom_transform_cloud_run.sl.yml
dag:
comment: "Custom transform DAG"
template: "custom_scheduled_task_cloud_run.py.j2"
filename: "{{domain}}_custom_tasks.py"
options:
cloud_run_project_id: "my-project"
cloud_run_job_name: "starlake-transform"

Runtime customization

Transform parameters

Data transformations often require parameterized SQL queries whose parameters should be evaluated at runtime.

SELECT * FROM orders
WHERE extraction_date >= '{{date_param_min}}' AND extraction_date <= '{{date_param_max}}'

The jobs variable

All transform DAG templates support a dictionary variable named jobs. Each key is a transformation name and its value contains the parameters to pass.

metadata/dags/templates/__custom_jobs.py.j2
jobs = {
"Products.TopSellingProducts": {
"options": "date_param_min=2024-01-01,date_param_max=2024-12-31"
},
"Products.MonthlySalesPerProduct": {
"options": "start_day=1"
}
}

The jobs dict is merged with options when the factory class is instantiated:

sl_job = StarlakeAirflowCloudRunJob(
options=dict(options, **sys.modules[__name__].__dict__.get('jobs', {}))
)

Inside sl_transform, each transform looks up its own key to find additional options:

# From IStarlakeJob.sl_transform():
additional_options = self.__class__.get_context_var(
transform_name, {}, self.options
).get("options", "")

Airflow user-defined macros

Because transform parameters may depend on Airflow context variables, you can define user-defined macros that are evaluated at runtime.

metadata/dags/templates/__custom_jobs.py.j2
from custom import get_days_interval, get_month_period

user_defined_macros = {
"days_interval": get_days_interval,
"month_period": get_month_period
}

jobs = {
"Products.TopSellingProducts": {
"options": "{{ days_interval(data_interval_end | ds, var.value.get('TOP_SELLING_PRODUCTS_DELTA', '30')) }}"
}
}

The user_defined_macros dict is picked up automatically by the generated DAG:

with DAG(
dag_id=...,
user_defined_macros=sys.modules[__name__].__dict__.get('user_defined_macros', None),
...
) as dag:

Terraform integration

A best practice is to inject jobs via Terraform variables, keeping your templates environment-agnostic.

metadata/dags/templates/__custom_jobs.py.j2
import json
jobs = json.loads("""${jobs}""")
variables.tf
variable "jobs" {
type = list(object({
name = string
options = string
}))
default = []
}
main.tf
locals {
jobs = tomap({
for job in var.jobs :
"${job.name}" => {options = job.options}
})
}

resource "google_storage_bucket_object" "composer_dags" {
for_each = local.dag_files
name = each.value
content = templatefile(
"${path.module}/${each.value}",
merge(local.variables, {jobs = jsonencode(local.jobs)})
)
bucket = var.composer_bucket
}
vars_dev.tfvars
jobs = [
{
name = "Products.TopSellingProducts"
options = "{{ days_interval(data_interval_end | ds, var.value.get('TOP_SELLING_PRODUCTS_DELTA', '30')) }}"
},
{
name = "Products.MonthlySalesPerProduct"
options = "{{ month_period(data_interval_end | ds, var.value.get('SALES_START_DAY', '1')) }}"
}
]

Dependencies

Starlake analyzes SQL queries to compute the full dependency tree for every transformation. The run_dependencies_first option controls how these dependencies are handled.

All dependencies are available in the generated DAG via the task_deps Python variable:

task_deps = json.loads("""[{
"data": {
"name": "Customers.HighValueCustomers",
"typ": "task",
"parent": "Customers.CustomerLifeTimeValue",
"parentTyp": "task",
"sink": "Customers.HighValueCustomers"
},
"children": [{
"data": {
"name": "Customers.CustomerLifeTimeValue",
"typ": "task",
"parent": "starbake.Customers",
"parentTyp": "table",
"sink": "Customers.CustomerLifeTimeValue"
},
"children": [
{"data": {"name": "starbake.Customers", "typ": "table"}, "task": false},
{"data": {"name": "starbake.Orders", "typ": "table"}, "task": false}
],
"task": true
}],
"task": true
}]""")

Inline strategy

When run_dependencies_first = true, all dependencies are generated as tasks within the same DAG.

This creates a self-contained DAG that runs the entire pipeline (loads + transforms) in the correct order.

External state change strategy

When run_dependencies_first = false (the default), the orchestrator's native data-aware scheduling mechanism is used. The transform DAG only runs when its upstream dependencies have been satisfied.

A schedule is created using Airflow Assets (formerly Datasets). Each upstream load and transform updates its corresponding Asset, and the transform DAG is triggered when all required Assets are fresh.

Each starlake command records its outlets (output Assets). At the end of the DAG, all outlets are updated:

end = sl_job.dummy_op(
task_id="end",
outlets=[Dataset(dag.dag_id)] + list(map(lambda x: Dataset(x.uri), sl_job.outlets))
)

The dataset_triggering_strategy option controls how multiple upstream Assets are combined:

  • any — Any single Asset update triggers the DAG
  • all — All Assets must be updated
  • Custom boolean expression (e.g., dataset1 & (dataset2 | dataset3))