Loading dbt models into Dagster as assets
Now is the moment that we’ve been building up to since the beginning of this module. Let’s see your dbt models in a Dagster asset graph!
Turning dbt models into assets with @dbt_assets
The star of the show here is the @dbt_assets decorator. This is a specialized asset decorator that wraps around a dbt project to tell Dagster what dbt models exist. In the body of the @dbt_assets definition, you write exactly how you want Dagster to run your dbt models.
Many Dagster projects may only need one @dbt_assets-decorated function that manages the entire dbt project. However, you may need to create multiple definitions for various reasons, such as:
- You have multiple dbt projects
- You want to exclude certain dbt models
- You want to run different dbt commands on specific model groups (for example,
dbt buildfor most models anddbt run --no-version-checkfor others) - You want to customize what happens after certain models finish, such as sending a notification
- You need to configure some sets of models differently (for example, partitioned vs. non-partitioned)
We’ll only create one @dbt_assets definition for now, but in a later lesson, we’ll encounter a use case for needing another @dbt_assets definition.
Decorator arguments vs function parameters
When writing a @dbt_assets definition, configuration appears in two places. Understanding why helps you decide where new config belongs.
Decorator arguments (manifest, select, exclude, partitions_def, dagster_dbt_translator) are static configuration. Dagster reads them at load time to build the asset graph — before any run starts. They define which assets exist and how they relate to each other.
Function parameters (context: dg.AssetExecutionContext, dbt: DbtCliResource) are injected at runtime, once per execution. context carries run-specific state (the active partition key, run ID, logger). dbt is the resource used to actually invoke dbt commands.
A useful rule of thumb: if Dagster needs to know it before a run starts, it’s a decorator argument. If it’s only meaningful during a run, it’s a function parameter.
This also explains command flexibility. The dbt command (dbt run, dbt build, etc.) lives inside the function body — not the decorator. A single @dbt_assets function issues one command for all its models. To run different commands for different model groups, you create multiple @dbt_assets definitions using select and exclude to split the models, each with its own command. Lesson 6 demonstrates this pattern.
Loading the models as assets
Create a new file
dbt.pyin thedefs/assets/directory. You can create this file manually or usedg:dg scaffold defs dagster.asset assets/dbt.py⚠️ Note: The scaffold command creates a basic
@dg.assettemplate as a starting point. We'll be replacing the scaffolded content entirely with dbt-specific code below, since we need to use the specialized@dbt_assetsdecorator instead of the standard@dg.assetdecorator.Replace the contents of
dbt.pywith the following imports:# src/dagster_and_dbt/defs/assets/dbt.py import dagster as dg from dagster_dbt import dbt_assets, DbtCliResource from dagster_and_dbt.defs.project import dbt_projectNext, we'll use the
@dbt_assetsdecorator to create a new asset function and provide it with a reference to the project's manifest file:@dbt_assets( manifest=dbt_project.manifest_path, ) def dbt_analytics(context: dg.AssetExecutionContext, dbt: DbtCliResource):Here, we used
dbt_project.manifest_pathto provide the reference to the project's manifest file. This is possible because thedbt_projectrepresentation we created earlier contains the manifest path, accessible by using themanifest_pathattribute.Notice we provided two arguments here. The first argument is the
context, which indicates which dbt models to run and any related configurations. The second refers to the dbt resource you’ll be using to run dbt.Finally, add the following to the body of
dbt_analyticsfunction:yield from dbt.cli(["run"], context=context).stream()Let’s review what’s happening in this line in a bit more detail:
- We use the
dbtresource (defined in the asset's second argument, which is aDbtCliResource) to execute a dbt command through its.climethod. - The
.stream()method fetches the events and results of this dbt execution.- This is one of multiple ways to get the Dagster events, such as what models materialized or tests passed. We recommend starting with this and exploring other methods in the future as your use cases grow (such as fetching the run artifacts after a run). In this case, the above line will execute
dbt run.
- This is one of multiple ways to get the Dagster events, such as what models materialized or tests passed. We recommend starting with this and exploring other methods in the future as your use cases grow (such as fetching the run artifacts after a run). In this case, the above line will execute
- The results of the
streamare a Python generator of what Dagster events happened. We usedyield from(not justyield!) to have Dagster track asset materializations.
- We use the
At this point, defs/assets/dbt.py should look like this:
# src/dagster_and_dbt/defs/assets/dbt.py
import dagster as dg
from dagster_dbt import DbtCliResource, dbt_assets
from dagster_and_dbt.defs.project import dbt_project
@dbt_assets(
manifest=dbt_project.manifest_path,
)
def dbt_analytics(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["run"], context=context).stream()