Skip to main content

Customizing DAG generation

Starlake comes with out of the box DAG templates. These templates can be customized to fit your specific needs for any scheduler of your choice.

You just need to be comfortable with Jinja2 templating language and Python programming language.

starlake is not an orchestration tool, but it can be used to generate your DAG based on templates and to run your transforms in the right order on your tools of choice for scheduling and monitoring batch oriented workflows.


Starlake DAG generation relies on:

  • starlake command line tool
  • DAG configuration(s) and their references within the loads and tasks
  • template(s) that may be customized
  • starlake-orchestration framework to dynamically generate the tasks that will be run
  • managing dependencies between tasks to execute transforms in the correct order

Prerequisites

Before using Starlake dag generation, ensure the following minimum versions are installed on your system:

  • starlake: 1.0.1 or higher

Additional requirements for Airflow

  • Apache Airflow: 2.4.0 or higher (2.6.0 or higher is recommended with cloud-run)
  • starlake-airflow: 0.1.2.1 or higher

Command

starlake dag-generate [options]

where options are:

parametercardinalitydescription
--outputDir <value>optionalPath for saving the resulting DAG file(s) (${SL_ROOT}/metadata/dags/generated by default).
--cleanoptionalShould the existing DAG file(s) be removed first (false by default)
--domainsoptionalWether to generate DAG file(s) to load schema(s) or not (true by default if --tasks option has not been specified)
--tasksoptionalWhether to generate DAG file(s) for tasks or not (true by default if --domains option has not been specified)
--tags <value>optionalWhether to generate DAG file(s) for the specified tags only (no tags by default)

Configuration

All DAG configuration files are located in ${SL_ROOT}/metadata/dags directory. The root element is dag.

References

We reference a DAG configuration by using the configuration file name without its extension

DAG configuration for loading data

DAG configuration for loading data

The configuration files to use for loading data can be defined:

  • at the project level, in the application file ${SL_ROOT}/metadata/application.sl.yml under the application.dagRef.load property.
    In this case the same configuration file will be used as the default DAG configuration for all the tables in the project.
application:
dagRef:
load: load_cloud_run_domain
#...
  • at the domain level, in the domain configuration file ${SL_ROOT}/metadata/load/{domain}/_config.sl.yml under the load.metadata.dagRef property.
    In this case the configuration file will be used as the default DAG configuration for all the tables in the domain.
load:
metadata:
dagRef:load_bash_domain
#...
  • at the table level, in the table configuration file ${SL_ROOT}/metadata/load/{domain}/{table}.sl.yml under the table.metadata.dagRef property.
    In this case the configuration file will be used as the default DAG configuration for the table only.
table:
metadata:
dagRef:load_bash_domain
#...

DAG configuration for transforming data

DAG configuration for transforming data

The configuration files to use for transforming data can be defined

  • at the project level, in the application file ${SL_ROOT}/metadata/application.sl.yml under the application.dagRef.transform property.
    In this case the same configuration file will be used as the default DAG configuration for all the transformations in the project.
application:
dagRef:
transform: norm_cloud_run_domain
#...
  • at the transformation level, in the transformation configuration file ${SL_ROOT}/metadata/transform/{domain}/{transformation}.sl.yml under the task.dagRef property.
    In this case the configuration file will be used as the default DAG configuration for the transformation only.
task:
dagRef: agr_cloud_run_domain
#...

Properties

A DAG configuration defines four properties: comment, template, filename and options.

dag:
comment: "dag for transforming tables for domain {\{domain\}} with cloud run" # will appear as a description of the dag
template: "custom_scheduled_task_cloud_run.py.j2" # the dag template to use
filename: "{\{domain\}}_norm_cloud_run.py" # the relative path to the outputDir specified as a parameter of the `dag-generate` command where the generated dag file will be copied
options:
sl_env_var: "{\"SL_ROOT\": \"${root_path}\", \"SL_DATASETS\": \"${root_path}/datasets\", \"SL_TIMEZONE\": \"Europe/Paris\"}"

#...

Comment

Comment

A short description to describe the generated DAG.

Template

Template

The path to the template that will generate the DAG(s), either:

  • an absolute path
  • a relative path name to the ${SL_ROOT}metadata/dags/template directory
  • a relative path name to the src/main/templates/dags starlake resource directory

Filename

Filename

The filename defines the relative path to the DAG(s) that will be generated. The specified path is relative to the outputDir option that was specified on the command line (or its default value if not specified).

The value of this property may include special variables that will have a direct impact on the number of dags that will be generated:

  • domain: a single DAG for all tables within the domain affected by this configuration
dag:
filename: "{\{domain\}}_norm_cloud_run.py" # one DAG per domain
#...
  • table : as many dags as there are tables in the domain affected by this configuration
dag:
filename: "{\{domain\}}_{\{table\}}_norm_cloud_run.py" # one DAG per table
#...

Otherwise, a single DAG will be generated for all tables affected by this configuration.

Options

Options

This property allows you to pass a certain number of options to the template in the form of a dictionary.

Some of these options are common to all templates.

Starlake env vars

sl_en_var defines starlake environment variables passed as an encoded json string

dag:
options:
sl_env_var: "{\"SL_ROOT\": \"${root_path}\", \"SL_DATASETS\": \"${root_path}/datasets\", \"SL_TIMEZONE\": \"Europe/Paris\"}"
#...
Pre-load strategy

pre_load_strategy defines the strategy that can be used to conditionaly load the tables of a domain within the DAG.

Four possible strategies:

NONE
NONE

The load of the domain will not be conditionned and no pre-load tasks will be executed (the default strategy).

IMPORTED
IMPORTED

This strategy implies that at least one file is present in the landing area (${SL_ROOT}/incoming/{domain} by default, if option incoming_path has not been specified). If there is one or more files to load, the method sl_import will be called to import the domain before loading it, otherwise the loading of the domain will be skipped.

dag:
options:
pre_load_strategy: "imported"
#...

PENDING
PENDING

This strategy implies that at least one file is present in the pending datasets area of the domain (${SL_ROOT}/datasets/pending/{domain} by default if option pending_path has not been specified), otherwise the loading of the domain will be skipped.

dag:
options:
pre_load_strategy: "pending"
#...

ACK
ACK

This strategy implies that an ack file is present at the specified path (${SL_ROOT}/datasets/pending/{domain}/{{{{ds}}}}.ack by default if option global_ack_file_path has not been specified), otherwise the loading of the domain will be skipped.

dag:
options:
pre_load_strategy: "ack"
#...

Load dependencies

load_dependencies defines wether or not we want to generate recursively all the dependencies associated to each task for which the transformation DAG was generated (False by default).

dag:
options:
load_dependencies: True
#...

Additional options

Depending on the template chosen, a specific concrete factory class extending ai.starlake.job.IStarlakeJob will be instantiated for which additional options may be required.

IStarlakeJob

ai.starlake.job.IStarlakeJob is the generic factory interface responsible for generating the tasks that will run the starlake's stage, load and transform commands:

  • sl_import will generate the task that will run the starlake stage command.
def sl_import(
self,
task_id: str,
domain: str,
**kwargs) -> BaseOperator:
#...
nametypedescription
task_idstrthe optional task id (\{domain\}_import by default)
domainstrthe required domain to import
  • sl_load will generate the task that will run the starlake load command.
def sl_load(
self,
task_id: str,
domain: str,
table: str,
spark_config: StarlakeSparkConfig=None,
**kwargs) -> BaseOperator:
#...
nametypedescription
task_idstrthe optional task id (\{domain\}_\{table\}_load by default)
domainstrthe required domain of the table to load
tablestrthe required table to load
spark_configStarlakeSparkConfigthe optional ai.starlake.job.StarlakeSparkConfig
  • sl_transform will generate the task that will run the starlake transform command.
def sl_transform(
self,
task_id: str,
transform_name: str,
transform_options: str=None,
spark_config: StarlakeSparkConfig=None, **kwargs) -> BaseOperator:
#...
nametypedescription
task_idstrthe optional task id ({transform_name} by default)
transform_namestrthe transform to run
transform_optionsstrthe optional transform options
spark_configStarlakeSparkConfigthe optional ai.starlake.job.StarlakeSparkConfig

Ultimately, all of these methods will call the sl_job method that needs to be implemented in all concrete factory classes.

def sl_job(
self,
task_id: str,
arguments: list,
spark_config: StarlakeSparkConfig=None,
**kwargs) -> BaseOperator:
#...
nametypedescription
task_idstrthe required task id
argumentslistThe required arguments of the starlake command to run
spark_configStarlakeSparkConfigthe optional ai.starlake.job.StarlakeSparkConfig

Concrete factory classes

Apache Airflow Concrete factory classes

Each concrete factory class extends ai.starlake.airflow.StarlakeAirflowJob and implements the sl_job method that will generate the Airflow task that will run the corresponding starlake command.

Default pool

For all templates instantiating StarlakeAirflowJob class, the default_pool option defines the Airflow pool to use for all tasks executed within the DAG.

dag:
options:
default_pool: "custom_default_pool"
#...
Bash

ai.starlake.airflow.bash.StarlakeAirflowBashJob is a concrete implementation of StarlakeAirflowJob that generates tasks using airflow.operators.bash.BashOperator. Usefull for on premise execution.

An additional SL_STARLAKE_PATH option is required to specify the path to the starlake executable.

Cloud run

ai.starlake.airflow.gcp.StarlakeAirflowCloudRunJob class is a concrete implementation of StarlakeAirflowJob that overrides the sl_job method that will run the starlake command by executing Cloud Run job.

Bellow is the list of additional options used to configure the Cloud run job:

nametypedescription
cloud_run_project_idstrthe optional cloud run project id (the project id on which the composer has been instantiated by default)
cloud_run_job_namestrthe required name of the cloud run job
cloud_run_regionstrthe optional region (europe-west1 by default)
cloud_run_asyncboolthe optional flag to run the cloud run job asynchronously (True by default)`
retry_on_failureboolthe optional flag to retry the cloud run job on failure (False by default)`
retry_delay_in_secondsintthe optional delay in seconds to wait before retrying the cloud run job (10 by default)`

If the execution has been parameterized to be asynchronous, an ai.starlake.airflow.gcp.CloudRunJobCompletionSensor which extends airflow.sensors.bash.BashSensor will be instantiated to wait for the completion of the Cloud run job execution.

Templates

Starlake templates

Starlake templates are listed under the src/main/resources/template/dags resource directory. There are two types of templates, those for loading data and others for transforming data.

Data loading

Data loading

Starlake templates for data loading are listed under the load subdirectory.

Apache Airflow Templates for data loading

__airflow_scheduled_table_tpl.py.j2 is the abstract template to generate Airflow DAGs for data loading which requires the instantiation of a concrete factory class that implements ai.starlake.airflow.StarlakeAirflowJob

Currently, there are three Airflow concrete templates for data loading.

All extend this abstract template by instantiating the corresponding concrete factory class using include statements.

  • airflow_scheduled_table_bash.py.j2 instantiates a StarlakeAirflowBashJob class.
src/main/resources/templates/dags/load/airflow_scheduled_table_bash.py.j2
# This template executes individual bash jobs and requires the following dag generation options set:
# - SL_STARLAKE_PATH: the path to the starlake executable [OPTIONAL]
# ...
{% include 'templates/dags/__starlake_airflow_bash_job.py.j2' %}
{% include 'templates/dags/load/__airflow_scheduled_table_tpl.py.j2' %}
  • airflow_scheduled_table_cloud_run.py.j2 instantiates a StarlakeAirflowCloudRunJob class.
src/main/resources/templates/dags/load/airflow_scheduled_table_cloud_run.py.j2
# This template executes individual cloud run jobs and requires the following dag generation options set:
# - cloud_run_project_id: the project id where the job is located (if not set, the project id of the composer environment will be used) [OPTIONAL]
# - cloud_run_job_region: the region where the job is located (if not set, europe-west1 will be used) [OPTIONAL]
# - cloud_run_job_name: the name of the job to execute [REQUIRED]
# ...
{% include 'templates/dags/__starlake_airflow_cloud_run_job.py.j2' %}
{% include 'templates/dags/load/__airflow_scheduled_table_tpl.py.j2' %}

Data transformation

Data transformation

Starlake templates for data transformation are listed under the transform subdirectory.

Apache Airflow Templates for data transformation

__airflow_scheduled_task_tpl.py.j2 is the abstract template to generate Airflow DAGs for data transformation which requires, in the same way, the instantiation of a concrete factory class that implements ai.starlake.airflow.StarlakeAirflowJob

Currently, there are three Airflow concrete templates for data transformation.

All extend this abstract template by instantiating the corresponding concrete factory class using include statements.

  • airflow_scheduled_task_bash.py.j2 instantiates a StarlakeAirflowBashJob class.
src/main/resources/templates/dags/transform/airflow_scheduled_task_bash.py.j2
# ...
{% include 'templates/dags/__starlake_airflow_bash_job.py.j2' %}
{% include 'templates/dags/load/__airflow_scheduled_task_tpl.py.j2' %}
  • airflow_scheduled_task_cloud_run.py.j2 instantiates a StarlakeAirflowCloudRunJob class.
src/main/resources/templates/dags/transform/airflow_scheduled_task_cloud_run.py.j2
# ...
{% include 'templates/dags/__starlake_airflow_cloud_run_job.py.j2' %}
{% include 'templates/dags/load/__airflow_scheduled_table_tpl.py.j2' %}

Customize existing templates

Although the options are useful for customizing the generated DAGs, there are situations where we need to be able to dynamically apply some of them at runtime.

Transform parameters

Transform parameters

Often data transformation requires parameterized SQL queries whose parameters should be evaluated at runtime.

-- ...
step1 as(
SELECT * FROM step0
# highlight-next-line
WHERE DAT_EXTRACTION >= '{{date_param_min}}' and DAT_EXTRACTION <= '{{date_param_max}}'
)
-- ...
jobs variable

All Starlake DAG templates for data transformation offer the ability of injecting parameter values via the optional definition of a dictionary-like Python variable named jobs where each key represents the name of a transformation and its value the parameters to be passed to the transformation. Each entry of this dictionary will be added to the options of the corresponding DAG.

src/main/resources/template/dags/__starlake_airflow_cloud_run_job.py.j2
#...
#optional variable jobs as a dict of all parameters to apply by job
#eg jobs = {"task1 domain.task1 name": {"options": "task1 transform options"}, "task2 domain.task2 name": {"options": "task2 transform options"}}
sl_job = StarlakeAirflowCloudRunJob(options=dict(options, **sys.modules[__name__].__dict__.get('jobs', {})))
ai.starlake.job.IStarlakeJob
#...
def sl_transform(self, task_id: str, transform_name: str, transform_options: str=None, spark_config: StarlakeSparkConfig=None, **kwargs) -> T:
"""Transform job.
Generate the scheduler task that will run the starlake `transform` command.

Args:
task_id (str): The optional task id.
transform_name (str): The transform to run.
transform_options (str): The optional transform options to use.
spark_config (StarlakeSparkConfig): The optional spark configuration to use.

Returns:
T: The scheduler task.
"""
task_id = f"{transform_name}" if not task_id else task_id
arguments = ["transform", "--name", transform_name]
transform_options = transform_options if transform_options else self.__class__.get_context_var(transform_name, {}, self.options).get("options", "")
if transform_options:
arguments.extend(["--options", transform_options])
return self.sl_job(task_id=task_id, arguments=arguments, spark_config=spark_config, **kwargs)
#...

Because this variable has to be defined in the same module as that of the generated DAG (options=dict(options, **sys.modules[__name__].__dict__.get('jobs', {}))), we need to create a customized DAG template that should extend the existing one(s), including our specific code.

metadata/dags/templates/__custom_jobs.py.j2
#...

jobs = #...
metadata/dags/templates/custom_scheduled_task_cloud_run.py.j2
#...
{% include 'dags/templates/__custom_jobs.py.j2' %} # our custom code
{% include 'templates/dags/transform/airflow_scheduled_task_cloud_run.py.j2' %} # the template to extend

Airflow user defined macros

Airflow user defined macros

Because the SQL parameters may be closely related to Airflow context variable(s), their evaluation may rely on some Airflow user defined macros.

All starlake DAG templates for data transformation offer the ability to specify User defined macros through the optional definition of a dictionary-like Python variable named user_defined_macros.

#...
# [START instantiate_dag]
with DAG(dag_id=os.path.basename(__file__).replace(".py", "").replace(".pyc", "").lower(),
schedule_interval=None if cron == "None" else cron,
schedule=schedule,
default_args=sys.modules[__name__].__dict__.get('default_dag_args', DEFAULT_DAG_ARGS),
catchup=False,
user_defined_macros=sys.modules[__name__].__dict__.get('user_defined_macros', None),
user_defined_filters=sys.modules[__name__].__dict__.get('user_defined_filters', None),
tags=set([tag.upper() for tag in tags]),
description=description) as dag:
#...

Again, because this variable has to be defined in the same module as that of the generated DAG (user_defined_macros=sys.modules[__name__].__dict__.get('user_defined_macros', None)), we need to create a customized DAG template that should extend the existing one(s), including our specific code.

metadata/dags/templates/__custom_jobs.py.j2
from custom import get_days_interval,get_month_periode_depending_on_start_day_params

user_defined_macros = {
"days_interval": get_days_interval,
"month_periode_depending_on_start_day": get_month_periode_depending_on_start_day_params
}
metadata/dags/templates/custom_scheduled_task_cloud_run.py.j2
#...
{% include 'dags/templates/__custom_jobs.py.j2' %} # relative to the project metadata folder
{% include 'templates/dags/transform/airflow_scheduled_task_cloud_run.py.j2' %} # relative to src/main/resources starlake resource directory

In addition, a good practice is to inject those variables using terraform variables ...

metadata/dags/templates/__custom_jobs.py.j2
#...

import json

jobs = json.loads("""${jobs}""")
  • variables.tf
variable "jobs" {
type = list(object({
name = string
options = string
}))
default = []
}
  • main.tf
locals {
jobs = tomap({
for job in var.jobs :
"${job.name}" => {options=job.options}
})

#...
}

resource "google_storage_bucket_object" "composer_storage_objects" {
for_each = local.composer_storage_objects
name = each.value
content = templatefile(
"${path.module}/${each.value}",
merge(local.composer_storage_variables, {jobs=jsonencode(local.jobs)}, {clusters=jsonencode(var.clusters)})
)
bucket = var.composer_bucket
}
  • vars_dev.tfvars
jobs = [
{
name = "Products.TopSellingProducts"
options = "{{ days_interval(data_interval_end | ds, var.value.get('TOP_SELLING_PRODUCTS_DELTA', '30')) }}"
},

...
{
name = "Products.MonthlySalesPerProduct"
options = "{{ month_periode_depending_on_start_day(data_interval_end | ds, var.value.get('SALES_PER_PRODUCT_START_DAY', '1')) }}"
}
]

Finally, we will have to define a specific DAG configuration that will make use of our customized DAG template.

metadata/dags/custom_transform_cloud_run.yml
---
dag:
comment: "agregation dag for domain {\{domain\}} with cloud run" # will appear as a description of the dag
template: "custom_scheduled_task_cloud_run.py.j2" # the dag template to use
filename: "{\{domain\}}_agr_cloud_run.py" # the relative path to the outputDir specified as a parameter of the `dag-generate` command where the generated dag file will be copied
options:
sl_env_var: "{\"SL_ROOT\": \"${root_path}\", \"SL_DATASETS\": \"${root_path}/datasets\", \"SL_TIMEZONE\": \"Europe/Paris\"}"
cloud_run_project_id: "${project_id}"
cloud_run_job_name: "${job_name}-transform" # cloud run job name for auto jobs
cloud_run_job_region: "${region}"
cloud_run_async: False # whether or not to use asynchronous cloud run job execution
# retry_on_failure: True # when asynchronous job execution has been selected, it specifies whether or not we want to use a bash sensor with automatic retry for a specific exit code (implies airflow v2.6+)
tags: "{\{domain\}} {\{domain\}}_CLOUD_RUN" # tags that will be added to the dag
load_dependencies: False # whether or not to add all dependencies as airflow tasks within the resulting dag
default_pool: "custom_default_pool" # pool to use for all tasks defined within the dag
  • main.tf
resource "google_storage_bucket_object" "composer_storage_objects" {
for_each = local.composer_storage_objects
name = each.value
content = templatefile(
"${path.module}/${each.value}",
merge(local.composer_storage_variables, {jobs=jsonencode(local.jobs)}, {clusters=jsonencode(var.clusters)})
)
bucket = var.composer_bucket
}

Dependencies

For any transformation, Starlake is able to calculate all its dependencies towards other tasks or loads thanks to the analysis of SQL queries.

As seen previously, the load_dependencies option defines whether or not we wish to recursively generate all the dependencies associated with each task for which the transformation DAG must be generated (False by default). If we choose to not generate those dependencies, the corresponding DAG will be scheduled using the Airflow's data-aware scheduling mechanism.

All dependencies for data transformation are available in the generated DAG via the Python dictionary variable task_deps.

task_deps=json.loads("""[ {
"data" : {
"name" : "Customers.HighValueCustomers",
"typ" : "task",
"parent" : "Customers.CustomerLifeTimeValue",
"parentTyp" : "task",
"parentRef" : "CustomerLifetimeValue",
"sink" : "Customers.HighValueCustomers"
},
"children" : [ {
"data" : {
"name" : "Customers.CustomerLifeTimeValue",
"typ" : "task",
"parent" : "starbake.Customers",
"parentTyp" : "table",
"parentRef" : "starbake.Customers",
"sink" : "Customers.CustomerLifeTimeValue"
},
"children" : [ {
"data" : {
"name" : "starbake.Customers",
"typ" : "table",
"parentTyp" : "unknown"
},
"task" : false
}, {
"data" : {
"name" : "starbake.Orders",
"typ" : "table",
"parentTyp" : "unknown"
},
"task" : false
} ],
"task" : true
} ],
"task" : true
} ]""")

Inline

In this strategy (load_dependencies = True), all the dependencies related to the transformation will be generated.

External state change

In this strategy (load_dependencies = False), the default strategy, the scheduler will launch a run for the corresponding transform DAG if its dependencies are met.

Airflow Data-aware scheduling

Airflow Data-aware scheduling

In this strategy, a schedule will be created to check if the dependencies are met via the use of Airflow Datasets.

src/main/resources/template/dags/transform/__airflow_scheduled_task_tpl.py.j2
#...
schedule = None

datasets: Set[str] = []

_extra_dataset: Union[dict, None] = sys.modules[__name__].__dict__.get('extra_dataset', None)

_extra_dataset_parameters = '?' + '&'.join(list(f'{k}={v}' for (k,v) in _extra_dataset.items())) if _extra_dataset else ''

# if you choose to not load the dependencies, a schedule will be created to check if the dependencies are met
def _load_datasets(task: dict):
if 'children' in task:
for child in task['children']:
datasets.append(keep_ascii_only(child['data']['name']).lower())
_load_datasets(child)

if load_dependencies.lower() != 'true':
for task in task_deps:
_load_datasets(task)
schedule = list(map(lambda dataset: Dataset(dataset + _extra_dataset_parameters), datasets))

#...

with DAG(dag_id=os.path.basename(__file__).replace(".py", "").replace(".pyc", "").lower(),
schedule_interval=None if cron == "None" else cron,
schedule=schedule,
default_args=sys.modules[__name__].__dict__.get('default_dag_args', DEFAULT_DAG_ARGS),
catchup=False,
user_defined_macros=sys.modules[__name__].__dict__.get('user_defined_macros', None),
user_defined_filters=sys.modules[__name__].__dict__.get('user_defined_filters', None),
tags=set([tag.upper() for tag in tags]),
description=description) as dag:
#...

Those required Datasets are updated for each load and task that have been executed.

The ai.starlake.airflow.StarlakeAirflowJob class is responsible for recording the outlets related to the execution of each starlake command.

ai.starlake.airflow.StarlakeAirflowJob
def __init__(
self,
pre_load_strategy: Union[StarlakePreLoadStrategy, str, None],
options: dict=None,
**kwargs) -> None:
#...
self.outlets: List[Dataset] = kwargs.get('outlets', [])

def sl_import(self, task_id: str, domain: str, **kwargs) -> BaseOperator:
#...
dataset = Dataset(keep_ascii_only(domain).lower())
self.outlets += kwargs.get('outlets', []) + [dataset]
#...

def sl_load(
self,
task_id: str,
domain: str,
table: str,
spark_config: StarlakeSparkConfig=None,
**kwargs) -> BaseOperator:
#...
dataset = Dataset(keep_ascii_only(f'\{domain\}.\{table\}').lower())
self.outlets += kwargs.get('outlets', []) + [dataset]
#...

def sl_transform(
self,
task_id: str,
transform_name: str,
transform_options: str=None,
spark_config: StarlakeSparkConfig=None,
**kwargs) -> BaseOperator:
#...
dataset = Dataset(keep_ascii_only(transform_name).lower())
self.outlets += kwargs.get('outlets', []) + [dataset]
#...

All the outlets that have been recorded are available in the outlets property of the starlake concrete factory class instance and are used at the very last step of the corresponding DAG to update the Datasets.

src/main/resources/template/dags/transform/__airflow_scheduled_task_tpl.py.j2
    end = sl_job.dummy_op(task_id="end", outlets=[Dataset(keep_ascii_only(dag.dag_id))]+list(map(lambda x: Dataset(x.uri + _extra_dataset_parameters), sl_job.outlets)))

In conjonction with the Starlake dag generation, the outlets property can be used to schedule effortless DAGs that will run the transform commands.