Skip to main content

Customize Dagster DAGs

This guide covers Dagster-specific customization for Starlake DAG generation. It details Shell and Cloud Run factory classes, Jinja2 templates for data loading and transformation, and the Multi Asset Sensor mechanism for dependency-driven execution. It explains how Dagster asset materialization integrates with Starlake commands.

Quick start

  1. Install prerequisites -- Dagster 1.6.0+, starlake-dagster 0.1.2+, and starlake 1.0.1+.
  2. Choose a factory class -- Use StarlakeDagsterShellJob for local/on-premise or StarlakeDagsterCloudRunJob for GCP Cloud Run.
  3. Configure DAG YAML -- Create a configuration file in metadata/dags with the Dagster template, filename, and options (run_dependencies_first).
  4. Generate and deploy -- Run starlake dag-generate --clean, then deploy the generated files to your Dagster repository.

For general DAG configuration concepts (references, properties, options), see the Customizing DAG Generation hub page. For a hands-on introduction, see the Orchestration Tutorial.

Prerequisites

  • Dagster: 1.6.0 or higher
  • starlake-dagster: 0.1.2 or higher
  • starlake: 1.0.1 or higher

Concrete factory classes

Each concrete factory class extends ai.starlake.dagster.StarlakeDagsterJob and implements the sl_job method to generate the Dagster op that runs the corresponding Starlake command.

StarlakeDagsterShellJob

Shell

ai.starlake.dagster.shell.StarlakeDagsterShellJob generates nodes using the dagster-shell library. Use it for on-premise or local execution.

An additional SL_STARLAKE_PATH option specifies the path to the Starlake executable.

Starlake Dagster job using shell operator for on-premise execution

StarlakeDagsterCloudRunJob

Cloud Run

ai.starlake.dagster.gcp.StarlakeDagsterCloudRunJob overrides the sl_job method to run Starlake commands by executing Cloud Run jobs.

Additional options for the Cloud Run job:

NameTypeDescription
cloud_run_project_idstrThe optional Cloud Run project id (defaults to the current GCP project id)
cloud_run_job_namestrThe required name of the Cloud Run job
cloud_run_regionstrThe optional region (europe-west1 by default)

Pre-load strategy visualizations

The pre-load strategy options (NONE, IMPORTED, PENDING, ACK) are described in the hub page. Below are the Dagster-specific visualizations for each strategy.

NONE

Dagster job visualization for NONE pre-load strategy

IMPORTED

Dagster job visualization for IMPORTED pre-load strategy

PENDING

Dagster job visualization for PENDING pre-load strategy

ACK

Dagster job visualization for ACK pre-load strategy

Templates

Data loading templates

__dagster_scheduled_table_tpl.py.j2 is the abstract template for Dagster data loading DAGs. It requires a concrete factory class implementing ai.starlake.dagster.StarlakeDagsterJob.

Three Dagster concrete templates extend it using include statements:

  • dagster_scheduled_table_shell.py.j2 -- Instantiates StarlakeDagsterShellJob:
src/main/resources/templates/dags/load/dagster_scheduled_table_shell.py.j2
# This template executes individual shell jobs and requires the following dag generation options set:
# - SL_STARLAKE_PATH: the path to the starlake executable [OPTIONAL]
# ...
{% include 'templates/dags/__starlake_dagster_shell_job.py.j2' %}
{% include 'templates/dags/load/__dagster_scheduled_table_tpl.py.j2' %}
  • dagster_scheduled_table_cloud_run.py.j2 -- Instantiates StarlakeDagsterCloudRunJob:
src/main/resources/templates/dags/load/dagster_scheduled_table_cloud_run.py.j2
# This template executes individual cloud run jobs and requires the following dag generation options set:
# - cloud_run_project_id: the project id where the job is located (if not set, the current GCP project id will be used) [OPTIONAL]
# - cloud_run_region: the region where the job is located (if not set, europe-west1 will be used) [OPTIONAL]
# - cloud_run_job_name: the name of the job to execute [REQUIRED]
# ...
{% include 'templates/dags/__starlake_dagster_cloud_run_job.py.j2' %}
{% include 'templates/dags/load/__dagster_scheduled_table_tpl.py.j2' %}

Data transformation templates

__dagster_scheduled_task_tpl.py.j2 is the abstract template for data transformation DAGs. It follows the same pattern.

Three Dagster concrete templates exist:

  • dagster_scheduled_task_shell.py.j2 -- Instantiates StarlakeDagsterShellJob:
src/main/resources/templates/dags/transform/dagster_scheduled_task_shell.py.j2
# ...
{% include 'templates/dags/__starlake_dagster_shell_job.py.j2' %}
{% include 'templates/dags/transform/__dagster_scheduled_task_tpl.py.j2' %}
  • dagster_scheduled_task_cloud_run.py.j2 -- Instantiates StarlakeDagsterCloudRunJob:
src/main/resources/templates/dags/transform/dagster_scheduled_task_cloud_run.py.j2
# ...
{% include 'templates/dags/__starlake_dagster_cloud_run_job.py.j2' %}
{% include 'templates/dags/transform/__dagster_scheduled_task_tpl.py.j2' %}

Dependencies

Starlake calculates all dependencies for each transformation by analyzing SQL queries.

The run_dependencies_first option controls whether dependencies are generated inline or handled through the Dagster Multi Asset Sensor mechanism (default: False).

All dependencies are available in the generated DAG via the Python dictionary variable task_deps:

task_deps=json.loads("""[ {
"data" : {
"name" : "Customers.HighValueCustomers",
"typ" : "task",
"parent" : "Customers.CustomerLifeTimeValue",
"parentTyp" : "task",
"parentRef" : "CustomerLifetimeValue",
"sink" : "Customers.HighValueCustomers"
},
"children" : [ {
"data" : {
"name" : "Customers.CustomerLifeTimeValue",
"typ" : "task",
"parent" : "starbake.Customers",
"parentTyp" : "table",
"parentRef" : "starbake.Customers",
"sink" : "Customers.CustomerLifeTimeValue"
},
"children" : [ {
"data" : {
"name" : "starbake.Customers",
"typ" : "table",
"parentTyp" : "unknown"
},
"task" : false
}, {
"data" : {
"name" : "starbake.Orders",
"typ" : "table",
"parentTyp" : "unknown"
},
"task" : false
} ],
"task" : true
} ],
"task" : true
} ]""")

Inline dependencies

With run_dependencies_first = True, all dependencies are included in the same Dagster job.

Dagster job with inline transform dependencies in a single job

Multi Asset Sensor

With run_dependencies_first = False (default), a sensor checks whether all dependent assets have been materialized before triggering execution.

src/main/resources/template/dags/transform/__dagster_scheduled_task_tpl.py.j2
#...
from dagster import AssetKey, MultiAssetSensorDefinition, MultiAssetSensorEvaluationContext, SkipReason, Definitions

# if you want to load dependencies, set run_dependencies_first to True in the options
run_dependencies_first: bool = StarlakeDagsterJob.get_context_var(var_name='run_dependencies_first', default_value='False', options=options).lower() == 'true'

sensor = None

# if you choose to not load the dependencies, a sensor will be created to check if the dependencies are met
if not run_dependencies_first:
assets: Set[str] = []

def load_assets(task: dict):
if 'children' in task:
for child in task['children']:
assets.append(sanitize_id(child['data']['name']))
load_assets(child)

for task in task_deps:
load_assets(task)

def multi_asset_sensor_with_skip_reason(context: MultiAssetSensorEvaluationContext):
asset_events = context.latest_materialization_records_by_key()
if all(asset_events.values()):
context.advance_all_cursors()
return RunRequest()
elif any(asset_events.values()):
materialized_asset_key_strs = [
key.to_user_string() for key, value in asset_events.items() if value
]
not_materialized_asset_key_strs = [
key.to_user_string() for key, value in asset_events.items() if not value
]
return SkipReason(
f"Observed materializations for {materialized_asset_key_strs}, "
f"but not for {not_materialized_asset_key_strs}"
)
else:
return SkipReason("No materializations observed")

sensor = MultiAssetSensorDefinition(
name = f'{job_name}_sensor',
monitored_assets = list(map(lambda asset: AssetKey(asset), assets)),
asset_materialization_fn = multi_asset_sensor_with_skip_reason,
minimum_interval_seconds = 60,
description = f"Sensor for {job_name}",
job_name = job_name,
)
#...
defs = Definitions(
jobs=[generate_job()],
schedules=crons,
sensors=[sensor] if sensor else [],
)

How assets are materialized

The ai.starlake.dagster.StarlakeDagsterJob class records assets for each Starlake command:

ai.starlake.dagster.StarlakeDagsterJob
def sl_import(self, task_id: str, domain: str, **kwargs) -> NodeDefinition:
kwargs.update({'asset': AssetKey(sanitize_id(domain))})
#...

def sl_load(self, task_id: str, domain: str, table: str, spark_config: StarlakeSparkConfig=None, **kwargs) -> NodeDefinition:
kwargs.update({'asset': AssetKey(sanitize_id(f"\{domain\}.\{table\}"))})
#...

def sl_transform(self, task_id: str, transform_name: str, transform_options: str = None, spark_config: StarlakeSparkConfig = None, **kwargs) -> NodeDefinition:
kwargs.update({'asset': AssetKey(sanitize_id(transform_name))})
#...

Each asset is materialized at runtime through the Dagster op defined in sl_job of the concrete factory class:

ai.starlake.dagster.shell.StarlakeShellJob
def sl_job(self, task_id: str, arguments: list, spark_config: StarlakeSparkConfig=None, **kwargs) -> NodeDefinition:
"""Overrides IStarlakeJob.sl_job()
Generate the Dagster node that will run the starlake command.

Args:
task_id (str): The required task id.
arguments (list): The required arguments of the starlake command to run.

Returns:
OpDefinition: The Dagster node.
"""
command = self.__class__.get_context_var("SL_STARLAKE_PATH", "starlake", self.options) + f" {' '.join(arguments)}"

asset_key: AssetKey = kwargs.get("asset", None)

@op(
name=task_id,
ins=kwargs.get("ins", {}),
out={kwargs.get("out", "result"): Out(str)},
)
def job(context, **kwargs):
output, return_code = execute_shell_command(
shell_command=command,
output_logging="STREAM",
log=context.log,
cwd=self.sl_root,
env=self.sl_env_vars,
log_shell_command=True,
)

if return_code:
raise Failure(description=f"Starlake command {command} execution failed with output: {output}")

if asset_key:
yield AssetMaterialization(asset_key=asset_key.path, description=kwargs.get("description", f"Starlake command {command} execution succeeded"))

yield Output(value=output, output_name="result")

return job

Dagster job with multi-asset sensor scheduling for transform triggers

Frequently Asked Questions

How does Starlake generate Dagster ops?

Starlake uses concrete factory classes that inherit from StarlakeDagsterJob. StarlakeDagsterShellJob generates ops via the dagster-shell library. StarlakeDagsterCloudRunJob generates ops that execute Cloud Run jobs.

What version of Dagster is required?

Dagster 1.6.0 minimum and starlake-dagster 0.1.2 minimum.

How does Starlake manage dependencies in Dagster?

With run_dependencies_first=True, all dependencies are included inline in the same job. With run_dependencies_first=False (default), a Multi Asset Sensor monitors the materialization of dependent assets before triggering the job.

How does the Starlake Multi Asset Sensor work?

The sensor checks that all dependent assets have been materialized. If so, it triggers a RunRequest. Otherwise, it returns a SkipReason indicating the missing assets.

How are assets materialized?

Each sl_import, sl_load, and sl_transform command registers a corresponding AssetKey. The materialization is performed at execution time via AssetMaterialization in the sl_job method.

What additional options are needed for Cloud Run?

cloud_run_project_id (optional), cloud_run_job_name (required), and cloud_run_region (optional, defaults to europe-west1).

Can I use Dagster locally with Starlake?

Yes. The StarlakeDagsterShellJob template executes Starlake commands locally via shell. You just need to specify the SL_STARLAKE_PATH option.