Skip to main content

Dagster & dlt with components

info

dg and Dagster Components are under active development. You may encounter feature gaps, and the APIs may change. To report issues or give feedback, please join the #dg-components channel in the Dagster Community Slack.

The dagster-dlt library provides a DltLoadCollectionComponent which can be used to easily represent a collection of dlt sources and pipelines as assets in Dagster.

Preparing a Dagster project

To begin, you'll need a Dagster project. You can use an existing project ready for components or scaffold a new one:

uvx create-dagster project my-project && cd my-project/src

Next, you will need to add the dagster-dlt library to the project:

uv add dagster-dlt

Scaffolding a dlt component

Now that you have a Dagster project, you can scaffold a dlt component. You may optionally provide the source and destination types, which will pull in the appropriate dlt source:

dg scaffold defs dagster_dlt.DltLoadCollectionComponent github_snowflake_ingest \
--source github --destination snowflake

The scaffold call will generate a basic defs.yaml file and a loads.py file:

tree my_project/defs
my_project/defs
├── __init__.py
└── github_snowflake_ingest
├── defs.yaml
├── github
│   ├── __init__.py
│   ├── helpers.py
│   ├── queries.py
│   ├── README.md
│   └── settings.py
└── loads.py

3 directories, 8 files

The loads.py file contains a skeleton dlt source and pipeline which are referenced by Dagster, but can also be run directly using dlt:

my_project/defs/github_snowflake_ingest/loads.py
import dlt


@dlt.source
def my_source():
@dlt.resource
def hello_world():
yield "hello, world!"

return hello_world


my_load_source = my_source()
my_load_pipeline = dlt.pipeline(destination="snowflake")

Each of these sources and pipelines are referenced by a fully scoped Python identifier in the defs.yaml file, pairing them into a set of loads:

my_project/defs/github_snowflake_ingest/defs.yaml
type: dagster_dlt.DltLoadCollectionComponent

attributes:
loads:
- source: .loads.my_load_source
pipeline: .loads.my_load_pipeline

Configuring dlt loads

Next, you can fill in the template loads.py file with your own dlt sources and pipelines:

my_project/defs/github_snowflake_ingest/loads.py
import dlt
from .github import github_reactions, github_repo_events, github_stargazers

dlthub_dlt_stargazers_source = github_stargazers("dlt-hub", "dlt")
dlthub_dlt_stargazers_pipeline = dlt.pipeline(
"github_stargazers", destination="snowflake", dataset_name="dlthub_stargazers"
)
my_project/defs/github_snowflake_ingest/defs.yaml
type: dagster_dlt.DltLoadCollectionComponent

attributes:
loads:
- source: .loads.dlthub_dlt_stargazers_source
pipeline: .loads.dlthub_dlt_stargazers_pipeline

You can use dg list defs to list the assets produced by the load:

dg list defs

┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━┩ │
│ │ │ dlthub_stargazers/stargazers │ default │ github_stargazers_stargazers │ dlt │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├──────────────────────────────┼─────────┼──────────────────────────────┼───────────┼─────────────┤ │
│ │ │ github_stargazers_stargazers │ default │ │ │ │ │
│ │ └──────────────────────────────┴─────────┴──────────────────────────────┴───────────┴─────────────┘ │
└─────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────┘

Customizing Dagster assets

Properties of the assets emitted by each load can be customized in the defs.yaml file using the translation key:

my_project/defs/github_snowflake_ingest/defs.yaml
type: dagster_dlt.DltLoadCollectionComponent

attributes:
loads:
- source: .loads.dlthub_dlt_stargazers_source
pipeline: .loads.dlthub_dlt_stargazers_pipeline
translation:
group_name: github_data
description: "Loads all users who have starred the dlt-hub/dlt repo"
dg list defs

┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━┩ │
│ │ │ dlthub_stargazers/stargazers │ github_data │ github_stargazers_st… │ dlt │ Loads all users who │ │
│ │ │ │ │ │ snowflake │ have starred the │ │
│ │ │ │ │ │ │ dlt-hub/dlt repo │ │
│ │ ├──────────────────────────────┼─────────────┼───────────────────────┼───────────┼───────────────────────┤ │
│ │ │ github_stargazers_stargazers │ default │ │ │ │ │
│ │ └──────────────────────────────┴─────────────┴───────────────────────┴───────────┴───────────────────────┘ │
└─────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

Both the DltResource and Pipeline objects are available in scope, and can be used for dynamic customization:

my_project/defs/github_snowflake_ingest/defs.yaml
type: dagster_dlt.DltLoadCollectionComponent

attributes:
loads:
- source: .loads.dlthub_dlt_stargazers_source
pipeline: .loads.dlthub_dlt_stargazers_pipeline
translation:
metadata:
resource_name: "{{ resource.name }}"
pipeline_name: "{{ pipeline.pipeline_name }}"
is_transformer: "{{ resource.is_transformer }}"