Customizing DAG generation
Starlake comes with out of the box DAG templates. These templates can be customized to fit your specific needs for any scheduler of your choice.
You just need to be comfortable with Jinja2 templating language and Python programming language.
starlake is not an orchestration tool, but it can be used to generate your DAG based on templates and to run your transforms in the right
order on your tools of choice for scheduling and monitoring batch oriented workflows.
Starlake DAG generation relies on:
- starlake command line tool
- DAG configuration(s) and their references within the loads and tasks
- template(s) that may be customized
- starlake-orchestration framework to dynamically generate the tasks that will be run
- managing dependencies between tasks to execute transforms in the correct order
Prerequisites
Before using Starlake dag generation, ensure the following minimum versions are installed on your system:
- starlake: 1.0.1 or higher
- Airflow
- Dagster
Additional requirements for Airflow
- Apache Airflow: 2.4.0 or higher (2.6.0 or higher is recommended with cloud-run)
- starlake-airflow: 0.1.2.1 or higher
Additional requirements for Dagster
- Dagster: 1.6.0 or higher
- starlake-dagster: 0.1.2 or higher
Command
starlake dag-generate [options]
where options are:
parameter | cardinality | description |
---|---|---|
--outputDir <value> | optional | Path for saving the resulting DAG file(s) (${SL_ROOT}/metadata/dags/generated by default). |
--clean | optional | Should the existing DAG file(s) be removed first (false by default) |
--domains | optional | Wether to generate DAG file(s) to load schema(s) or not (true by default if --tasks option has not been specified) |
--tasks | optional | Whether to generate DAG file(s) for tasks or not (true by default if --domains option has not been specified) |
--tags <value> | optional | Whether to generate DAG file(s) for the specified tags only (no tags by default) |
Configuration
All DAG configuration files are located in ${SL_ROOT}/metadata/dags directory. The root element is dag.
References
We reference a DAG configuration by using the configuration file name without its extension
DAG configuration for loading data
The configuration files to use for loading data can be defined:
- at the project level, in the application file ${SL_ROOT}/metadata/application.sl.yml under the application.dagRef.load property.
In this case the same configuration file will be used as the default DAG configuration for all the tables in the project.
application:
dagRef:
load: load_cloud_run_domain
#...
- at the domain level, in the domain configuration file ${SL_ROOT}/metadata/load/{domain}/_config.sl.yml under the load.metadata.dagRef property.
In this case the configuration file will be used as the default DAG configuration for all the tables in the domain.
load:
metadata:
dagRef:load_bash_domain
#...
- at the table level, in the table configuration file ${SL_ROOT}/metadata/load/{domain}/{table}.sl.yml under the table.metadata.dagRef property.
In this case the configuration file will be used as the default DAG configuration for the table only.
table:
metadata:
dagRef:load_bash_domain
#...
DAG configuration for transforming data
The configuration files to use for transforming data can be defined
- at the project level, in the application file ${SL_ROOT}/metadata/application.sl.yml under the application.dagRef.transform property.
In this case the same configuration file will be used as the default DAG configuration for all the transformations in the project.
application:
dagRef:
transform: norm_cloud_run_domain
#...
- at the transformation level, in the transformation configuration file ${SL_ROOT}/metadata/transform/{domain}/{transformation}.sl.yml under the task.dagRef property.
In this case the configuration file will be used as the default DAG configuration for the transformation only.
task:
dagRef: agr_cloud_run_domain
#...
Properties
A DAG configuration defines four properties: comment, template, filename and options.
dag:
comment: "dag for transforming tables for domain {\{domain\}} with cloud run" # will appear as a description of the dag
template: "custom_scheduled_task_cloud_run.py.j2" # the dag template to use
filename: "{\{domain\}}_norm_cloud_run.py" # the relative path to the outputDir specified as a parameter of the `dag-generate` command where the generated dag file will be copied
options:
sl_env_var: "{\"SL_ROOT\": \"${root_path}\", \"SL_DATASETS\": \"${root_path}/datasets\", \"SL_TIMEZONE\": \"Europe/Paris\"}"
#...
Comment
A short description to describe the generated DAG.
Template
The path to the template that will generate the DAG(s), either:
- an absolute path
- a relative path name to the ${SL_ROOT}metadata/dags/template directory
- a relative path name to the src/main/templates/dags starlake resource directory
Filename
The filename defines the relative path to the DAG(s) that will be generated. The specified path is relative to the outputDir option that was specified on the command line (or its default value if not specified).
The value of this property may include special variables that will have a direct impact on the number of dags that will be generated:
- domain: a single DAG for all tables within the domain affected by this configuration
dag:
filename: "{\{domain\}}_norm_cloud_run.py" # one DAG per domain
#...
- table : as many dags as there are tables in the domain affected by this configuration
dag:
filename: "{\{domain\}}_{\{table\}}_norm_cloud_run.py" # one DAG per table
#...
Otherwise, a single DAG will be generated for all tables affected by this configuration.
Options
This property allows you to pass a certain number of options to the template in the form of a dictionary.
Some of these options are common to all templates.
Starlake env vars
sl_en_var defines starlake environment variables passed as an encoded json string
dag:
options:
sl_env_var: "{\"SL_ROOT\": \"${root_path}\", \"SL_DATASETS\": \"${root_path}/datasets\", \"SL_TIMEZONE\": \"Europe/Paris\"}"
#...
Pre-load strategy
pre_load_strategy defines the strategy that can be used to conditionaly load the tables of a domain within the DAG.
Four possible strategies:
NONE
The load of the domain will not be conditionned and no pre-load tasks will be executed (the default strategy).
- Airflow
- Dagster
IMPORTED
This strategy implies that at least one file is present in the landing area (${SL_ROOT}/incoming/{domain} by default, if option incoming_path has not been specified). If there is one or more files to load, the method sl_import will be called to import the domain before loading it, otherwise the loading of the domain will be skipped.
dag:
options:
pre_load_strategy: "imported"
#...
- Airflow
- Dagster
PENDING
This strategy implies that at least one file is present in the pending datasets area of the domain (${SL_ROOT}/datasets/pending/{domain} by default if option pending_path has not been specified), otherwise the loading of the domain will be skipped.
dag:
options:
pre_load_strategy: "pending"
#...
- Airflow
- Dagster
ACK
This strategy implies that an ack file is present at the specified path (${SL_ROOT}/datasets/pending/{domain}/{{{{ds}}}}.ack by default if option global_ack_file_path has not been specified), otherwise the loading of the domain will be skipped.
dag:
options:
pre_load_strategy: "ack"
#...
- Airflow
- Dagster
Load dependencies
load_dependencies defines wether or not we want to generate recursively all the dependencies associated to each task for which the transformation DAG was generated (False by default).
dag:
options:
load_dependencies: True
#...
Additional options
Depending on the template chosen, a specific concrete factory class extending ai.starlake.job.IStarlakeJob
will be instantiated for which additional options may be required.
IStarlakeJob
ai.starlake.job.IStarlakeJob
is the generic factory interface responsible for generating the tasks that will run the starlake's stage, load and transform commands:
- sl_import will generate the task that will run the starlake stage command.
def sl_import(
self,
task_id: str,
domain: str,
**kwargs) -> BaseOperator:
#...
name | type | description |
---|---|---|
task_id | str | the optional task id (\{domain\}_import by default) |
domain | str | the required domain to import |
- sl_load will generate the task that will run the starlake load command.
def sl_load(
self,
task_id: str,
domain: str,
table: str,
spark_config: StarlakeSparkConfig=None,
**kwargs) -> BaseOperator:
#...
name | type | description |
---|---|---|
task_id | str | the optional task id (\{domain\}_\{table\}_load by default) |
domain | str | the required domain of the table to load |
table | str | the required table to load |
spark_config | StarlakeSparkConfig | the optional ai.starlake.job.StarlakeSparkConfig |
- sl_transform will generate the task that will run the starlake transform command.
def sl_transform(
self,
task_id: str,
transform_name: str,
transform_options: str=None,
spark_config: StarlakeSparkConfig=None, **kwargs) -> BaseOperator:
#...
name | type | description |
---|---|---|
task_id | str | the optional task id ({transform_name} by default) |
transform_name | str | the transform to run |
transform_options | str | the optional transform options |
spark_config | StarlakeSparkConfig | the optional ai.starlake.job.StarlakeSparkConfig |
Ultimately, all of these methods will call the sl_job method that needs to be implemented in all concrete factory classes.
def sl_job(
self,
task_id: str,
arguments: list,
spark_config: StarlakeSparkConfig=None,
**kwargs) -> BaseOperator:
#...
name | type | description |
---|---|---|
task_id | str | the required task id |
arguments | list | The required arguments of the starlake command to run |
spark_config | StarlakeSparkConfig | the optional ai.starlake.job.StarlakeSparkConfig |
Concrete factory classes
- Airflow
- Dagster
Apache Airflow Concrete factory classes
Each concrete factory class extends ai.starlake.airflow.StarlakeAirflowJob
and implements the sl_job method that will generate the Airflow task that will run the corresponding starlake command.
For all templates instantiating StarlakeAirflowJob
class, the
default_pool option defines the Airflow pool to use for all tasks executed within the DAG.
dag:
options:
default_pool: "custom_default_pool"
#...
ai.starlake.airflow.bash.StarlakeAirflowBashJob
is a concrete implementation of StarlakeAirflowJob
that generates tasks using airflow.operators.bash.BashOperator
. Usefull for on premise execution.
An additional SL_STARLAKE_PATH option is required to specify the path to the starlake executable.
ai.starlake.airflow.gcp.StarlakeAirflowCloudRunJob
class is a concrete implementation of StarlakeAirflowJob
that overrides the sl_job
method that will run the starlake command by executing Cloud Run job.
Bellow is the list of additional options used to configure the Cloud run job:
name | type | description |
---|---|---|
cloud_run_project_id | str | the optional cloud run project id (the project id on which the composer has been instantiated by default) |
cloud_run_job_name | str | the required name of the cloud run job |
cloud_run_region | str | the optional region (europe-west1 by default) |
cloud_run_async | bool | the optional flag to run the cloud run job asynchronously (True by default)` |
retry_on_failure | bool | the optional flag to retry the cloud run job on failure (False by default)` |
retry_delay_in_seconds | int | the optional delay in seconds to wait before retrying the cloud run job (10 by default)` |
If the execution has been parameterized to be asynchronous, an ai.starlake.airflow.gcp.CloudRunJobCompletionSensor
which extends airflow.sensors.bash.BashSensor
will be instantiated to wait for the completion of the Cloud run job execution.
Dagster Concrete factory classes
Each concrete factory class extends ai.starlake.dagster.StarlakeDagsterJob
and implements the sl_job method that will generate the Dagster op that will run the corresponding starlake command.
StarlakeDagsterShellJob
ai.starlake.dagster.shell.StarlakeDagsterShellJob
is a concrete implementation of StarlakeDagsterJob
that generates nodes using dagster-shell library. Usefull for on premise execution.
An additional SL_STARLAKE_PATH option is required to specify the path to the starlake executable.
StarlakeDagsterCloudRunJob
ai.starlake.dagster.gcp.StarlakeDagsterCloudRunJob
class is a concrete implementation of StarlakeDagsterJob
that overrides the sl_job
method that will run the starlake command by executing Cloud Run job.
Bellow is the list of additional options used to configure the Cloud run job:
name | type | description |
---|---|---|
cloud_run_project_id | str | the optional cloud run project id (the project id on which the composer has been instantiated by default) |
cloud_run_job_name | str | the required name of the cloud run job |
cloud_run_region | str | the optional region (europe-west1 by default) |
Templates
Starlake templates
Starlake templates are listed under the src/main/resources/template/dags resource directory. There are two types of templates, those for loading data and others for transforming data.
Data loading
Starlake templates for data loading are listed under the load subdirectory.
- Airflow
- Dagster
Apache Airflow Templates for data loading
__airflow_scheduled_table_tpl.py.j2 is the abstract template to generate Airflow DAGs for data loading which requires the instantiation of a concrete factory class that implements ai.starlake.airflow.StarlakeAirflowJob
Currently, there are three Airflow concrete templates for data loading.
All extend this abstract template by instantiating the corresponding concrete factory class using include statements.
- airflow_scheduled_table_bash.py.j2 instantiates a
StarlakeAirflowBashJob
class.
# This template executes individual bash jobs and requires the following dag generation options set:
# - SL_STARLAKE_PATH: the path to the starlake executable [OPTIONAL]
# ...
{% include 'templates/dags/__starlake_airflow_bash_job.py.j2' %}
{% include 'templates/dags/load/__airflow_scheduled_table_tpl.py.j2' %}
- airflow_scheduled_table_cloud_run.py.j2 instantiates a
StarlakeAirflowCloudRunJob
class.
# This template executes individual cloud run jobs and requires the following dag generation options set:
# - cloud_run_project_id: the project id where the job is located (if not set, the project id of the composer environment will be used) [OPTIONAL]
# - cloud_run_job_region: the region where the job is located (if not set, europe-west1 will be used) [OPTIONAL]
# - cloud_run_job_name: the name of the job to execute [REQUIRED]
# ...
{% include 'templates/dags/__starlake_airflow_cloud_run_job.py.j2' %}
{% include 'templates/dags/load/__airflow_scheduled_table_tpl.py.j2' %}
Dagster Templates for data loading
__dagster_scheduled_table_tpl.py.j2 is the abstract template to generate Dagster DAGs for data loading which requires the instantiation of a concrete factory class that implements ai.starlake.dagster.StarlakeDagsterJob
Currently, there are three Dagster concrete templates for data loading.
All extend this abstract template by instantiating the corresponding concrete factory class using include statements.
- dagster_scheduled_table_shell.py.j2 instantiates a
StarlakeDagsterShellJob
class.
# This template executes individual shell jobs and requires the following dag generation options set:
# - SL_STARLAKE_PATH: the path to the starlake executable [OPTIONAL]
# ...
{% include 'templates/dags/__starlake_dahster_shell_job.py.j2' %}
{% include 'templates/dags/load/__dagster_scheduled_table_tpl.py.j2' %}
- dagster_scheduled_table_cloud_run.py.j2 instantiates a
StarlakeDagsterCloudRunJob
class.
# This template executes individual cloud run jobs and requires the following dag generation options set:
# - cloud_run_project_id: the project id where the job is located (if not set, the project id of the composer environment will be used) [OPTIONAL]
# - cloud_run_job_region: the region where the job is located (if not set, europe-west1 will be used) [OPTIONAL]
# - cloud_run_job_name: the name of the job to execute [REQUIRED]
# ...
{% include 'templates/dags/__starlake_dagster_cloud_run_job.py.j2' %}
{% include 'templates/dags/load/__airflow_scheduled_table_tpl.py.j2' %}
Data transformation
Starlake templates for data transformation are listed under the transform subdirectory.
- Airflow
- Dagster
Apache Airflow Templates for data transformation
__airflow_scheduled_task_tpl.py.j2 is the abstract template to generate Airflow DAGs for data transformation which requires, in the same way, the instantiation of a concrete factory class that implements ai.starlake.airflow.StarlakeAirflowJob
Currently, there are three Airflow concrete templates for data transformation.
All extend this abstract template by instantiating the corresponding concrete factory class using include statements.
- airflow_scheduled_task_bash.py.j2 instantiates a
StarlakeAirflowBashJob
class.
# ...
{% include 'templates/dags/__starlake_airflow_bash_job.py.j2' %}
{% include 'templates/dags/load/__airflow_scheduled_task_tpl.py.j2' %}
- airflow_scheduled_task_cloud_run.py.j2 instantiates a
StarlakeAirflowCloudRunJob
class.
# ...
{% include 'templates/dags/__starlake_airflow_cloud_run_job.py.j2' %}
{% include 'templates/dags/load/__airflow_scheduled_table_tpl.py.j2' %}
Dagster Templates for data transformation
__dagster_scheduled_task_tpl.py.j2 is the abstract template to generate Dagster DAGs for data transformation which requires, in the same way, the instantiation of a concrete factory class that implements ai.starlake.dagster.StarlakeDagsterJob
Currently, there are three Dagster concrete templates for data transformation.
All extend this abstract template by instantiating the corresponding concrete factory class using include statements.
- dagster_scheduled_task_shell.py.j2 instantiates a
StarlakeDagsterShellJob
class.
# ...
{% include 'templates/dags/__starlake_dagster_shell_job.py.j2' %}
{% include 'templates/dags/load/__dagster_scheduled_task_tpl.py.j2' %}
- dagster_scheduled_task_cloud_run.py.j2 instantiates a
StarlakeDagsterCloudRunJob
class.
# ...
{% include 'templates/dags/__starlake_dagster_cloud_run_job.py.j2' %}
{% include 'templates/dags/load/__dagster_scheduled_table_tpl.py.j2' %}
Customize existing templates
Although the options are useful for customizing the generated DAGs, there are situations where we need to be able to dynamically apply some of them at runtime.
Transform parameters
Often data transformation requires parameterized SQL queries whose parameters should be evaluated at runtime.
-- ...
step1 as(
SELECT * FROM step0
# highlight-next-line
WHERE DAT_EXTRACTION >= '{{date_param_min}}' and DAT_EXTRACTION <= '{{date_param_max}}'
)
-- ...
jobs variable
All Starlake DAG templates for data transformation offer the ability of injecting parameter values via the optional definition of a dictionary-like Python variable named jobs where each key represents the name of a transformation and its value the parameters to be passed to the transformation. Each entry of this dictionary will be added to the options of the corresponding DAG.
#...
#optional variable jobs as a dict of all parameters to apply by job
#eg jobs = {"task1 domain.task1 name": {"options": "task1 transform options"}, "task2 domain.task2 name": {"options": "task2 transform options"}}
sl_job = StarlakeAirflowCloudRunJob(options=dict(options, **sys.modules[__name__].__dict__.get('jobs', {})))
#...
def sl_transform(self, task_id: str, transform_name: str, transform_options: str=None, spark_config: StarlakeSparkConfig=None, **kwargs) -> T:
"""Transform job.
Generate the scheduler task that will run the starlake `transform` command.
Args:
task_id (str): The optional task id.
transform_name (str): The transform to run.
transform_options (str): The optional transform options to use.
spark_config (StarlakeSparkConfig): The optional spark configuration to use.
Returns:
T: The scheduler task.
"""
task_id = f"{transform_name}" if not task_id else task_id
arguments = ["transform", "--name", transform_name]
transform_options = transform_options if transform_options else self.__class__.get_context_var(transform_name, {}, self.options).get("options", "")
if transform_options:
arguments.extend(["--options", transform_options])
return self.sl_job(task_id=task_id, arguments=arguments, spark_config=spark_config, **kwargs)
#...
Because this variable has to be defined in the same module as that of the generated DAG (options=dict(options, **sys.modules[__name__].__dict__.get('jobs', {}))
), we need to create a customized DAG template that should extend the existing one(s), including our specific code.
#...
jobs = #...
#...
{% include 'dags/templates/__custom_jobs.py.j2' %} # our custom code
{% include 'templates/dags/transform/airflow_scheduled_task_cloud_run.py.j2' %} # the template to extend
Airflow user defined macros
Because the SQL parameters may be closely related to Airflow context variable(s), their evaluation may rely on some Airflow user defined macros.
All starlake DAG templates for data transformation offer the ability to specify User defined macros through the optional definition of a dictionary-like Python variable named user_defined_macros.
#...
# [START instantiate_dag]
with DAG(dag_id=os.path.basename(__file__).replace(".py", "").replace(".pyc", "").lower(),
schedule_interval=None if cron == "None" else cron,
schedule=schedule,
default_args=sys.modules[__name__].__dict__.get('default_dag_args', DEFAULT_DAG_ARGS),
catchup=False,
user_defined_macros=sys.modules[__name__].__dict__.get('user_defined_macros', None),
user_defined_filters=sys.modules[__name__].__dict__.get('user_defined_filters', None),
tags=set([tag.upper() for tag in tags]),
description=description) as dag:
#...
Again, because this variable has to be defined in the same module as that of the generated DAG (user_defined_macros=sys.modules[__name__].__dict__.get('user_defined_macros', None)
), we need to create a customized DAG template that should extend the existing one(s), including our specific code.
from custom import get_days_interval,get_month_periode_depending_on_start_day_params
user_defined_macros = {
"days_interval": get_days_interval,
"month_periode_depending_on_start_day": get_month_periode_depending_on_start_day_params
}
#...
{% include 'dags/templates/__custom_jobs.py.j2' %} # relative to the project metadata folder
{% include 'templates/dags/transform/airflow_scheduled_task_cloud_run.py.j2' %} # relative to src/main/resources starlake resource directory
In addition, a good practice is to inject those variables using terraform variables ...
#...
import json
jobs = json.loads("""${jobs}""")
- variables.tf
variable "jobs" {
type = list(object({
name = string
options = string
}))
default = []
}
- main.tf
locals {
jobs = tomap({
for job in var.jobs :
"${job.name}" => {options=job.options}
})
#...
}
resource "google_storage_bucket_object" "composer_storage_objects" {
for_each = local.composer_storage_objects
name = each.value
content = templatefile(
"${path.module}/${each.value}",
merge(local.composer_storage_variables, {jobs=jsonencode(local.jobs)}, {clusters=jsonencode(var.clusters)})
)
bucket = var.composer_bucket
}
- vars_dev.tfvars
jobs = [
{
name = "Products.TopSellingProducts"
options = "{{ days_interval(data_interval_end | ds, var.value.get('TOP_SELLING_PRODUCTS_DELTA', '30')) }}"
},
...
{
name = "Products.MonthlySalesPerProduct"
options = "{{ month_periode_depending_on_start_day(data_interval_end | ds, var.value.get('SALES_PER_PRODUCT_START_DAY', '1')) }}"
}
]
Finally, we will have to define a specific DAG configuration that will make use of our customized DAG template.
---
dag:
comment: "agregation dag for domain {\{domain\}} with cloud run" # will appear as a description of the dag
template: "custom_scheduled_task_cloud_run.py.j2" # the dag template to use
filename: "{\{domain\}}_agr_cloud_run.py" # the relative path to the outputDir specified as a parameter of the `dag-generate` command where the generated dag file will be copied
options:
sl_env_var: "{\"SL_ROOT\": \"${root_path}\", \"SL_DATASETS\": \"${root_path}/datasets\", \"SL_TIMEZONE\": \"Europe/Paris\"}"
cloud_run_project_id: "${project_id}"
cloud_run_job_name: "${job_name}-transform" # cloud run job name for auto jobs
cloud_run_job_region: "${region}"
cloud_run_async: False # whether or not to use asynchronous cloud run job execution
# retry_on_failure: True # when asynchronous job execution has been selected, it specifies whether or not we want to use a bash sensor with automatic retry for a specific exit code (implies airflow v2.6+)
tags: "{\{domain\}} {\{domain\}}_CLOUD_RUN" # tags that will be added to the dag
load_dependencies: False # whether or not to add all dependencies as airflow tasks within the resulting dag
default_pool: "custom_default_pool" # pool to use for all tasks defined within the dag
- main.tf
resource "google_storage_bucket_object" "composer_storage_objects" {
for_each = local.composer_storage_objects
name = each.value
content = templatefile(
"${path.module}/${each.value}",
merge(local.composer_storage_variables, {jobs=jsonencode(local.jobs)}, {clusters=jsonencode(var.clusters)})
)
bucket = var.composer_bucket
}
Dependencies
For any transformation, Starlake is able to calculate all its dependencies towards other tasks or loads thanks to the analysis of SQL queries.
As seen previously, the load_dependencies option defines whether or not we wish to recursively generate all the dependencies associated with each task for which the transformation DAG must be generated (False by default). If we choose to not generate those dependencies, the corresponding DAG will be scheduled using the Airflow's data-aware scheduling mechanism.
All dependencies for data transformation are available in the generated DAG via the Python dictionary variable task_deps.
task_deps=json.loads("""[ {
"data" : {
"name" : "Customers.HighValueCustomers",
"typ" : "task",
"parent" : "Customers.CustomerLifeTimeValue",
"parentTyp" : "task",
"parentRef" : "CustomerLifetimeValue",
"sink" : "Customers.HighValueCustomers"
},
"children" : [ {
"data" : {
"name" : "Customers.CustomerLifeTimeValue",
"typ" : "task",
"parent" : "starbake.Customers",
"parentTyp" : "table",
"parentRef" : "starbake.Customers",
"sink" : "Customers.CustomerLifeTimeValue"
},
"children" : [ {
"data" : {
"name" : "starbake.Customers",
"typ" : "table",
"parentTyp" : "unknown"
},
"task" : false
}, {
"data" : {
"name" : "starbake.Orders",
"typ" : "table",
"parentTyp" : "unknown"
},
"task" : false
} ],
"task" : true
} ],
"task" : true
} ]""")
Inline
In this strategy (load_dependencies = True), all the dependencies related to the transformation will be generated.
- Airflow
- Dagster
External state change
In this strategy (load_dependencies = False), the default strategy, the scheduler will launch a run for the corresponding transform DAG if its dependencies are met.
Airflow Data-aware scheduling
- Airflow
- Dagster
Airflow Data-aware scheduling
In this strategy, a schedule will be created to check if the dependencies are met via the use of Airflow Datasets.
#...
schedule = None
datasets: Set[str] = []
_extra_dataset: Union[dict, None] = sys.modules[__name__].__dict__.get('extra_dataset', None)
_extra_dataset_parameters = '?' + '&'.join(list(f'{k}={v}' for (k,v) in _extra_dataset.items())) if _extra_dataset else ''
# if you choose to not load the dependencies, a schedule will be created to check if the dependencies are met
def _load_datasets(task: dict):
if 'children' in task:
for child in task['children']:
datasets.append(keep_ascii_only(child['data']['name']).lower())
_load_datasets(child)
if load_dependencies.lower() != 'true':
for task in task_deps:
_load_datasets(task)
schedule = list(map(lambda dataset: Dataset(dataset + _extra_dataset_parameters), datasets))
#...
with DAG(dag_id=os.path.basename(__file__).replace(".py", "").replace(".pyc", "").lower(),
schedule_interval=None if cron == "None" else cron,
schedule=schedule,
default_args=sys.modules[__name__].__dict__.get('default_dag_args', DEFAULT_DAG_ARGS),
catchup=False,
user_defined_macros=sys.modules[__name__].__dict__.get('user_defined_macros', None),
user_defined_filters=sys.modules[__name__].__dict__.get('user_defined_filters', None),
tags=set([tag.upper() for tag in tags]),
description=description) as dag:
#...
Those required Datasets are updated for each load and task that have been executed.
The ai.starlake.airflow.StarlakeAirflowJob class is responsible for recording the outlets related to the execution of each starlake command.
def __init__(
self,
pre_load_strategy: Union[StarlakePreLoadStrategy, str, None],
options: dict=None,
**kwargs) -> None:
#...
self.outlets: List[Dataset] = kwargs.get('outlets', [])
def sl_import(self, task_id: str, domain: str, **kwargs) -> BaseOperator:
#...
dataset = Dataset(keep_ascii_only(domain).lower())
self.outlets += kwargs.get('outlets', []) + [dataset]
#...
def sl_load(
self,
task_id: str,
domain: str,
table: str,
spark_config: StarlakeSparkConfig=None,
**kwargs) -> BaseOperator:
#...
dataset = Dataset(keep_ascii_only(f'\{domain\}.\{table\}').lower())
self.outlets += kwargs.get('outlets', []) + [dataset]
#...
def sl_transform(
self,
task_id: str,
transform_name: str,
transform_options: str=None,
spark_config: StarlakeSparkConfig=None,
**kwargs) -> BaseOperator:
#...
dataset = Dataset(keep_ascii_only(transform_name).lower())
self.outlets += kwargs.get('outlets', []) + [dataset]
#...
All the outlets that have been recorded are available in the outlets property of the starlake concrete factory class instance and are used at the very last step of the corresponding DAG to update the Datasets.
end = sl_job.dummy_op(task_id="end", outlets=[Dataset(keep_ascii_only(dag.dag_id))]+list(map(lambda x: Dataset(x.uri + _extra_dataset_parameters), sl_job.outlets)))
In conjonction with the Starlake dag generation, the outlets property can be used to schedule effortless DAGs that will run the transform commands.
Dagster Multi Asset Sensor
In this strategy, a sensor will be created to check if the dependencies are met via the use of Dagster Assets.
#...
from dagster import AssetKey, MultiAssetSensorDefinition, MultiAssetSensorEvaluationContext, SkipReason, Definitions
# if you want to load dependencies, set load_dependencies to True in the options
load_dependencies: bool = StarlakeDagsterJob.get_context_var(var_name='load_dependencies', default_value='False', options=options).lower() == 'true'
sensor = None
# if you choose to not load the dependencies, a sensor will be created to check if the dependencies are met
if not load_dependencies:
assets: Set[str] = []
def load_assets(task: dict):
if 'children' in task:
for child in task['children']:
assets.append(sanitize_id(child['data']['name']))
load_assets(child)
for task in task_deps:
load_assets(task)
def multi_asset_sensor_with_skip_reason(context: MultiAssetSensorEvaluationContext):
asset_events = context.latest_materialization_records_by_key()
if all(asset_events.values()):
context.advance_all_cursors()
return RunRequest()
elif any(asset_events.values()):
materialized_asset_key_strs = [
key.to_user_string() for key, value in asset_events.items() if value
]
not_materialized_asset_key_strs = [
key.to_user_string() for key, value in asset_events.items() if not value
]
return SkipReason(
f"Observed materializations for {materialized_asset_key_strs}, "
f"but not for {not_materialized_asset_key_strs}"
)
else:
return SkipReason("No materializations observed")
sensor = MultiAssetSensorDefinition(
name = f'{job_name}_sensor',
monitored_assets = list(map(lambda asset: AssetKey(asset), assets)),
asset_materialization_fn = multi_asset_sensor_with_skip_reason,
minimum_interval_seconds = 60,
description = f"Sensor for {job_name}",
job_name = job_name,
)
#...
defs = Definitions(
jobs=[generate_job()],
schedules=crons,
sensors=[sensor] if sensor else [],
)
Those required Assets are materialized for each load and task that have been executed.
The ai.starlake.dagster.StarlakeDagsterJob class is responsible for recording the assets related to the execution of each starlake command.
def sl_import(self, task_id: str, domain: str, **kwargs) -> NodeDefinition:
kwargs.update({'asset': AssetKey(sanitize_id(domain))})
#...
def sl_load(self, task_id: str, domain: str, table: str, spark_config: StarlakeSparkConfig=None, **kwargs) -> NodeDefinition:
kwargs.update({'asset': AssetKey(sanitize_id(f"\{domain\}.\{table\}"))})
#...
def sl_transform(self, task_id: str, transform_name: str, transform_options: str = None, spark_config: StarlakeSparkConfig = None, **kwargs) -> NodeDefinition:
kwargs.update({'asset': AssetKey(sanitize_id(transform_name))})
#...
Each corresponding asset will be then materialized at run time through the execution of the Dagset op defined within the sl_job function of the concrete factory class that has been instantiated by the template.
def sl_job(self, task_id: str, arguments: list, spark_config: StarlakeSparkConfig=None, **kwargs) -> NodeDefinition:
"""Overrides IStarlakeJob.sl_job()
Generate the Dagster node that will run the starlake command.
Args:
task_id (str): The required task id.
arguments (list): The required arguments of the starlake command to run.
Returns:
OpDefinition: The Dastger node.
"""
command = self.__class__.get_context_var("SL_STARLAKE_PATH", "starlake", self.options) + f" {' '.join(arguments)}"
asset_key: AssetKey = kwargs.get("asset", None)
@op(
name=task_id,
ins=kwargs.get("ins", {}),
out={kwargs.get("out", "result"): Out(str)},
)
def job(context, **kwargs):
output, return_code = execute_shell_command(
shell_command=command,
output_logging="STREAM",
log=context.log,
cwd=self.sl_root,
env=self.sl_env_vars,
log_shell_command=True,
)
if return_code:
raise Failure(description=f"Starlake command {command} execution failed with output: {output}")
if asset_key:
yield AssetMaterialization(asset_key=asset_key.path, description=kwargs.get("description", f"Starlake command {command} execution succeeded"))
yield Output(value=output, output_name="result")
return job