Skip to main content

Components ETL pipeline tutorial

warning

This feature is considered in a preview stage and is under active development. It can change significantly, or be removed completely. It is not considered ready for production use.

Setup

1. Install project dependencies

Prerequisites

To complete this tutorial, you must install uv and dg.

First, install duckdb for a local database and tree to visualize project structure:

brew install duckdb tree
note

tree is optional and is only used to produce a nicely formatted representation of the project structure on the comand line. You can also use find, ls, dir, or any other directory listing command.

2. Scaffold a new project

After installing dependencies, scaffold a components-ready project:

dg scaffold project jaffle-platform
Creating a Dagster project at /.../jaffle-platform.
Scaffolded files for Dagster project at /.../jaffle-platform.
...

The dg scaffold project command builds a project at jaffle-platform and initializes a new Python virtual environment inside it. When you use dg's default environment management behavior, you won't need to worry about activating this virtual environment yourself.

To learn more about the files, directories, and default settings in a project scaffolded with dg scaffold project, see "Creating a project with components".

Ingest data

1. Add the Sling component type to your environment

To ingest data, you must set up Sling. We can list the component types available to our project with dg list plugins. If we run this now, the Sling component won't appear, since the dagster package doesn't contain components for specific integrations (like Sling):

dg list plugins
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Plugin ┃ Objects ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ dagster │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┓ │
│ │ ┃ Symbol ┃ Summary ┃ Features ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━┩ │
│ │ │ dagster.asset │ Create a │ [scaffold-target] │ │
│ │ │ │ definition for how │ │ │
│ │ │ │ to compute an │ │ │
│ │ │ │ asset. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼────────────────────┼─────────────────────┤ │
│ │ │ dagster.components.DefinitionsComponent │ An arbitrary set[component, │ │
│ │ │ │ of dagster │ scaffold-target] │ │
│ │ │ │ definitions. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼────────────────────┼─────────────────────┤ │
│ │ │ dagster.components.DefsFolderComponent │ A folder which may │ [component, │ │
│ │ │ │ contain multiple │ scaffold-target] │ │
│ │ │ │ submodules, each │ │ │
│ │ │ │ which define │ │ │
│ │ │ │ components. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼────────────────────┼─────────────────────┤ │
│ │ │ dagster.components.PipesSubprocessScriptCollectionComponent │ Assets that wrap │ [component, │ │
│ │ │ │ Python scripts │ scaffold-target] │ │
│ │ │ │ executed with │ │ │
│ │ │ │ Dagster's │ │ │
│ │ │ │ PipesSubprocessCl… │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼────────────────────┼─────────────────────┤ │
│ │ │ dagster.schedule │ Creates a schedule │ [scaffold-target] │ │
│ │ │ │ following the │ │ │
│ │ │ │ provided cron │ │ │
│ │ │ │ schedule and │ │ │
│ │ │ │ requests runs for │ │ │
│ │ │ │ the provided job. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼────────────────────┼─────────────────────┤ │
│ │ │ dagster.sensor │ Creates a sensor │ [scaffold-target] │ │
│ │ │ │ where the │ │ │
│ │ │ │ decorated function │ │ │
│ │ │ │ is used as the │ │ │
│ │ │ │ sensor's │ │ │
│ │ │ │ evaluation │ │ │
│ │ │ │ function. │ │ │
│ │ └─────────────────────────────────────────────────────────────┴────────────────────┴─────────────────────┘ │
└─────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

To make the Sling component available in your environment, install the dagster-sling package:

uv add dagster-sling
note

When you run commands like dg list plugins, dg obtains the results by resolving a project environment and querying it. In this case, the project environment was set up as part of the dg scaffold project command. The pyproject.toml in newly scaffolded projects contains a setting tool.dg.project.python_environment = "persistent_uv". This tells dg to expect a uv-managed virtual environment to be present in the project root directory. (This can be confirmed by the presence of a uv.lock file.)

2. Confirm availability of the Sling component type

To confirm that the dagster_sling.SlingReplicationCollectionComponent component type is now available, run the dg list plugins command again:

dg list plugins
Using /.../jaffle-platform/.venv/bin/dagster-components
┏━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Plugin ┃ Objects ┃
┡━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ dagster │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┓ │
│ │ ┃ Symbol ┃ Summary ┃ Features ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━┩ │
│ │ │ dagster.asset │ Create a │ [scaffold-targ… │ │
│ │ │ │ definition for │ │ │
│ │ │ │ how to compute │ │ │
│ │ │ │ an asset. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────────┼─────────────────┤ │
│ │ │ dagster.components.DefinitionsComponent │ An arbitrary set[component, │ │
│ │ │ │ of dagster │ scaffold-targe… │ │
│ │ │ │ definitions. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────────┼─────────────────┤ │
│ │ │ dagster.components.DefsFolderComponent │ A folder which[component, │ │
│ │ │ │ may contain │ scaffold-targe… │ │
│ │ │ │ multiple │ │ │
│ │ │ │ submodules, each │ │ │
│ │ │ │ which define │ │ │
│ │ │ │ components. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────────┼─────────────────┤ │
│ │ │ dagster.components.PipesSubprocessScriptCollectionComponent │ Assets that wrap │ [component, │ │
│ │ │ │ Python scripts │ scaffold-targe… │ │
│ │ │ │ executed with │ │ │
│ │ │ │ Dagster's │ │ │
│ │ │ │ PipesSubprocess… │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────────┼─────────────────┤ │
│ │ │ dagster.schedule │ Creates a │ [scaffold-targ… │ │
│ │ │ │ schedule │ │ │
│ │ │ │ following the │ │ │
│ │ │ │ provided cron │ │ │
│ │ │ │ schedule and │ │ │
│ │ │ │ requests runs │ │ │
│ │ │ │ for the provided │ │ │
│ │ │ │ job. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────────┼─────────────────┤ │
│ │ │ dagster.sensor │ Creates a sensor │ [scaffold-targ… │ │
│ │ │ │ where the │ │ │
│ │ │ │ decorated │ │ │
│ │ │ │ function is used │ │ │
│ │ │ │ as the sensor's │ │ │
│ │ │ │ evaluation │ │ │
│ │ │ │ function. │ │ │
│ │ └─────────────────────────────────────────────────────────────┴──────────────────┴─────────────────┘ │
│ dagster_sling │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┓ │
│ │ ┃ Symbol ┃ Summary ┃ Features ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━┩ │
│ │ │ dagster_sling.SlingReplicationCollectionComponent │ Expose one or more[component, │ │
│ │ │ │ Sling replications │ scaffold-target] │ │
│ │ │ │ to Dagster as │ │ │
│ │ │ │ assets. │ │ │
│ │ └───────────────────────────────────────────────────┴──────────────────────┴───────────────────────┘ │
└───────────────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘

3. Create a new instance of the Sling component

Next, create a new instance of this component type:

dg scaffold 'dagster_sling.SlingReplicationCollectionComponent' ingest_files
Using /.../jaffle-platform/.venv/bin/dagster-components

This adds a component instance to the project at jaffle_platform/defs/ingest_files:

tree src/jaffle_platform
src/jaffle_platform
├── __init__.py
├── definitions.py
├── defs
│   ├── __init__.py
│   └── ingest_files
│   ├── component.yaml
│   └── replication.yaml
└── lib
└── __init__.py

4 directories, 6 files

A single file, component.yaml, was created in the component folder. The component.yaml file is common to all Dagster components, and specifies the component type and any parameters used to scaffold definitions from the component at runtime.

jaffle-platform/src/jaffle_platform/defs/ingest_files/component.yaml
type: dagster_sling.SlingReplicationCollectionComponent

attributes:
replications:
- path: replication.yaml

Right now the parameters define a single "replication"-- this is a Sling concept that specifies how data should be replicated from a source to a target. The details are specified in a replication.yaml file that is read by Sling. This file does not yet exist-- we are going to create it shortly.

note

The path parameter for a replication is relative to the same folder containing component.yaml. This is a convention for components.

4. Download files for Sling source

Next, you will need to download some files locally to use your Sling source, since Sling doesn't support reading from the public internet:

curl -O https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_customers.csv &&
curl -O https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_orders.csv &&
curl -O https://raw.githubusercontent.com/dbt-labs/jaffle-shop-classic/refs/heads/main/seeds/raw_payments.csv

5. Set up the Sling to DuckDB replication

Create a replication.yaml file that references the downloaded files:

jaffle-platform/src/jaffle_platform/defs/ingest_files/replication.yaml
source: LOCAL
target: DUCKDB

defaults:
mode: full-refresh
object: "{stream_table}"

streams:
file://raw_customers.csv:
object: "main.raw_customers"
file://raw_orders.csv:
object: "main.raw_orders"
file://raw_payments.csv:
object: "main.raw_payments"

Finally, modify the component.yaml file to tell the Sling component where replicated data with the DUCKDB target should be written:

jaffle-platform/src/jaffle_platform/defs/ingest_files/component.yaml
type: dagster_sling.SlingReplicationCollectionComponent

attributes:
sling:
connections:
- name: DUCKDB
type: duckdb
instance: /tmp/jaffle_platform.duckdb
replications:
- path: replication.yaml

6. View and materialize assets in the Dagster UI

Load your project in the Dagster UI to see what you've built so far. To materialize assets and load tables in the DuckDB instance, click Materialize All:

dg dev

Verify the DuckDB tables on the command line:

duckdb /tmp/jaffle_platform.duckdb -c "SELECT * FROM raw_customers LIMIT 5;"
┌───────┬────────────┬───────────┬──────────────────┐
id │ first_name │ last_name │ _sling_loaded_at │
│ int32 │ varchar │ varchar │ int64 │
├───────┼────────────┼───────────┼──────────────────┤
1 │ Michael │ P. │ ... |
2 │ Shawn │ M. │ ... |
3 │ Kathleen │ P. │ ... |
4 │ Jimmy │ C. │ ... |
5 │ Katherine │ R. │ ... |
└───────┴────────────┴───────────┴──────────────────┘

Transform data

To transform the data, you will need to download a sample dbt project from GitHub and use the data ingested with Sling as an input for the dbt project.

1. Clone a sample dbt project from GitHub

First, clone the project and delete the embedded git repo:

git clone --depth=1 https://github.com/dagster-io/jaffle-platform.git dbt && rm -rf dbt/.git

2. Install the dbt project component type

To interface with the dbt project, you will need to instantiate a Dagster dbt project component. To access the dbt project component type, install the dbt integration dagster-dbt and dbt-duckdb:

uv add dagster-dbt dbt-duckdb

To confirm that the dagster_dbt.DbtProjectComponent component type is now available, run dg list plugins:

dg list plugins
Using /.../jaffle-platform/.venv/bin/dagster-components
┏━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Plugin ┃ Objects ┃
┡━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ dagster │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┓ │
│ │ ┃ Symbol ┃ Summary ┃ Features ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━┩ │
│ │ │ dagster.asset │ Create a │ [scaffold-targ… │ │
│ │ │ │ definition for │ │ │
│ │ │ │ how to compute │ │ │
│ │ │ │ an asset. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────────┼─────────────────┤ │
│ │ │ dagster.components.DefinitionsComponent │ An arbitrary set[component, │ │
│ │ │ │ of dagster │ scaffold-targe… │ │
│ │ │ │ definitions. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────────┼─────────────────┤ │
│ │ │ dagster.components.DefsFolderComponent │ A folder which[component, │ │
│ │ │ │ may contain │ scaffold-targe… │ │
│ │ │ │ multiple │ │ │
│ │ │ │ submodules, each │ │ │
│ │ │ │ which define │ │ │
│ │ │ │ components. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────────┼─────────────────┤ │
│ │ │ dagster.components.PipesSubprocessScriptCollectionComponent │ Assets that wrap │ [component, │ │
│ │ │ │ Python scripts │ scaffold-targe… │ │
│ │ │ │ executed with │ │ │
│ │ │ │ Dagster's │ │ │
│ │ │ │ PipesSubprocess… │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────────┼─────────────────┤ │
│ │ │ dagster.schedule │ Creates a │ [scaffold-targ… │ │
│ │ │ │ schedule │ │ │
│ │ │ │ following the │ │ │
│ │ │ │ provided cron │ │ │
│ │ │ │ schedule and │ │ │
│ │ │ │ requests runs │ │ │
│ │ │ │ for the provided │ │ │
│ │ │ │ job. │ │ │
│ │ ├─────────────────────────────────────────────────────────────┼──────────────────┼─────────────────┤ │
│ │ │ dagster.sensor │ Creates a sensor │ [scaffold-targ… │ │
│ │ │ │ where the │ │ │
│ │ │ │ decorated │ │ │
│ │ │ │ function is used │ │ │
│ │ │ │ as the sensor's │ │ │
│ │ │ │ evaluation │ │ │
│ │ │ │ function. │ │ │
│ │ └─────────────────────────────────────────────────────────────┴──────────────────┴─────────────────┘ │
│ dagster_dbt │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ │
│ │ ┃ Symbol ┃ Summary ┃ Features ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩ │
│ │ │ dagster_dbt.DbtProjectComponent │ Expose a DBT project to Dagster │ [component, scaffold-target] │ │
│ │ │ │ as a set of assets. │ │ │
│ │ └─────────────────────────────────┴─────────────────────────────────┴──────────────────────────────┘ │
│ dagster_sling │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┓ │
│ │ ┃ Symbol ┃ Summary ┃ Features ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━┩ │
│ │ │ dagster_sling.SlingReplicationCollectionComponent │ Expose one or more[component, │ │
│ │ │ │ Sling replications │ scaffold-target] │ │
│ │ │ │ to Dagster as │ │ │
│ │ │ │ assets. │ │ │
│ │ └───────────────────────────────────────────────────┴──────────────────────┴───────────────────────┘ │
└───────────────┴──────────────────────────────────────────────────────────────────────────────────────────────────────┘

3. Scaffold a new instance of the dbt project component

Next, scaffold a new instance of the dagster_dbt.DbtProjectComponent component, providing the path to the dbt project you cloned earlier as the project_path scaffold parameter:

dg scaffold dagster_dbt.DbtProjectComponent jdbt --project-path dbt/jdbt
Using /.../jaffle-platform/.venv/bin/dagster-components

This creates a new component instance in the project at jaffle_platform/defs/jdbt. To see the component configuration, open component.yaml in that directory:

jaffle-platform/src/jaffle_platform/defs/jdbt/component.yaml
type: dagster_dbt.DbtProjectComponent

attributes:
dbt:
project_dir: ../../../dbt/jdbt

4. Update the dbt project component configuration

note

A bug in the component scaffolding for DbtProjectComponent is currently causing the project_dir in jaffle_platform/defs/dbt/component.yaml path to be generated as ../../../dbt/jdbt when it should be ../../../../dbt/jdbt. Please update the project_dir to ../../../../dbt/jdbt before proceeding. This will be fixed in the next release.

Let’s see the project in the Dagster UI:

dg dev

You can see that there appear to be two copies of the raw_customers, raw_orders, and raw_payments tables. If you click on the assets, you can see their full asset keys. The keys generated by the dbt component are of the form main/*, whereas the keys generated by the Sling component are of the form target/main/*.

We need to update the configuration of the dagster_dbt.DbtProjectComponent component to match the keys generated by the Sling component. Update components/jdbt/component.yaml with the configuration below:

jaffle-platform/src/jaffle_platform/defs/jdbt/component.yaml
type: dagster_dt.dbt_project

attributes:
dbt:
project_dir: ../../../../dbt/jdbt
asset_attributes:
key: "target/main/{{ node.name }}

You might notice the typo in the above file--after updating a component file, it's useful to validate that the changes match the component's schema. You can do this by running dg check yaml:

dg check yaml
/.../jaffle-platform/src/jaffle_platform/defs/jdbt/component.yaml:7 -  Unable to parse YAML: while scanning a quoted scalar, found unexpected end of stream
|
6 | asset_attributes:
7 | key: "target/main/{{ node.name }}
| ^ Unable to parse YAML: while scanning a quoted scalar, found unexpected end of stream
|

You can see that the error message includes the filename, line number, and a code snippet showing the exact nature of the error. Let's fix the typo:

jaffle-platform/src/jaffle_platform/defs/jdbt/component.yaml
type: dagster_dbt.DbtProjectComponent

attributes:
dbt:
project_dir: ../../../../dbt/jdbt
asset_attributes:
key: "target/main/{{ node.name }}"

Finally, run dg check yaml again to validate the fix:

dg check yaml
All components validated successfully.

Reload the project in Dagster UI to verify that the keys load properly:

Now the keys generated by the Sling and dbt project components match, and the asset graph is correct. To materialize the new assets defined via the dbt project component, click Materialize All.

To verify the fix, you can view a sample of the newly materialized assets in DuckDB from the command line:

duckdb /tmp/jaffle_platform.duckdb -c "SELECT * FROM orders LIMIT 5;"
┌──────────┬─────────────┬────────────┬───┬───────────────┬──────────────────────┬──────────────────┬────────┐
│ order_id │ customer_id │ order_date │ … │ coupon_amount │ bank_transfer_amount │ gift_card_amount │ amount │
│ int32 │ int32 │ date │ │ double │ double │ double │ double │
├──────────┼─────────────┼────────────┼───┼───────────────┼──────────────────────┼──────────────────┼────────┤
112018-01-01 │ … │ 0.00.00.010.0
232018-01-02 │ … │ 0.00.00.020.0
3942018-01-04 │ … │ 1.00.00.01.0
4502018-01-05 │ … │ 25.00.00.025.0
5642018-01-05 │ … │ 0.017.00.017.0
├──────────┴─────────────┴────────────┴───┴───────────────┴──────────────────────┴──────────────────┴────────┤
5 rows 9 columns (7 shown)
└────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

Automate the pipeline

Now that you've defined some assets, let's schedule them.

First scaffold in a schedule:

dg scaffold dagster.schedule daily_jaffle.py
Using /.../jaffle-platform/.venv/bin/dagster-components

And now target * and schedule @daily:

jaffle-platform/src/jaffle_platform/defs/daily_jaffle.py
import dagster as dg


@dg.schedule(cron_schedule="@daily", target="*")
def daily_jaffle(context: dg.ScheduleEvaluationContext):
return dg.RunRequest()

Next steps

To continue your journey with components, you can add more components to your project or learn how to manage multiple components-ready projects with dg.