Connecting dbt models to Dagster assets
With where we left off, you may have noticed that the sources for your dbt projects are not just tables that exist in DuckDB, but also assets that Dagster created. However, the staging models (stg_trips
, stg_zones
) that use those sources aren’t linked to the Dagster assets (taxi_trips
, taxi_zones
) that produced them:
Let’s fix that by telling Dagster that the dbt sources are the tables that the taxi_trips
and taxi_zones
asset definitions produce. To match up these assets, we'll override the dbt assets' keys. By having the asset keys line up, Dagster will know that these assets are the same and should merge them.
This is accomplished by changing the dbt source’s asset keys to be the same as the matching assets that Dagster makes. In this case, the dbt source’s default asset key is raw_taxis/trips
, and the table that we’re making with Dagster has an asset key of taxi_trips
.
To adjust how Dagster names the asset keys for your project’s dbt models, we’ll need to override the dagster-dbt
integration’s default logic for how to interpret the dbt project. This mapping is contained in the DagsterDbtTranslator
class.
Customizing how Dagster understands dbt projects
The DagsterDbtTranslator
class is the default mapping for how Dagster interprets and maps your dbt project. As Dagster loops through each of your dbt models, it will execute each of the translator’s functions and use the return value to configure your new Dagster asset.
However, you can override its methods by making a new class that inherits from and provides your logic for a dbt model. Refer to the dagster-dbt
package’s API Reference for more info on the different functions you can override in the DagsterDbtTranslator
class.
For now, we’ll customize how asset keys are defined by overriding the translator’s get_asset_key
method.
Open the assets/dbt.py
file and do the following:
Update the imports to include:
- From the
dagster_dbt
module, importDagsterDbtTranslator
- From the
dagster
module, importAssetKey
- From the
Create a new class called
CustomizedDagsterDbtTranslator
that inherits from theDagsterDbtTranslator
. Add this code after the imports inassets/dbt.py
:class CustomizedDagsterDbtTranslator(DagsterDbtTranslator):
In this class, create a method called
get_asset_key.
This is a method of
DagsterDbtTranslator
class that we'll override and customize to do as we need. It is an instance method, so we'll have its first argument beself
, to follow Pythonic conventions. The second argument refers to a dictionary/JSON object for the dbt model’s properties, which is based on the manifest file from earlier. Let’s call that second argumentdbt_resource_props
. The return value of this function is an object of theAssetKey
class.class CustomizedDagsterDbtTranslator(DagsterDbtTranslator): def get_asset_key(self, dbt_resource_props):
Now, let’s fill in the
get_asset_key
method with our own logic for defining asset keys.There are two properties that we’ll want from
dbt_resource_props
: theresource_type
(ex., model, source, seed, snapshot) and thename
, such astrips
orstg_trips
. Access both of those properties from thedbt_resource_props
argument and store them in their own respective variables (type
andname
):def get_asset_key(self, dbt_resource_props): resource_type = dbt_resource_props["resource_type"] name = dbt_resource_props["name"]
As mentioned above, the asset keys of our existing Dagster assets used by our dbt project are named
taxi_trips
andtaxi_zones
. If you were to print out thename
, you’d see that the dbt sources are namedtrips
andzones
. Therefore, to match our asset keys up, we can prefix our keys with the stringtaxi_
.Copy and paste the following code to return an
AssetKey
ofAssetKey(f"taxi_{name}")
:def get_asset_key(self, dbt_resource_props): resource_type = dbt_resource_props["resource_type"] name = dbt_resource_props["name"] return AssetKey(f"taxi_{name}")
You have full control over how each asset can be named, as you can define how asset keys are created. In our case we only want to rename the dbt sources, but we can keep the asset keys of the models the same.
The object-oriented pattern of the
DagsterDbtTranslator
means that we can leverage the existing implementations of the parent class by using thesuper
method. We’ll use this pattern to customize how the sources are defined but default to the original logic for deciding the model asset keys. Copy and paste the code below to complete theget_asset_key
function:def get_asset_key(self, dbt_resource_props): resource_type = dbt_resource_props["resource_type"] name = dbt_resource_props["name"] if resource_type == "source": return AssetKey(f"taxi_{name}") else: return super().get_asset_key(dbt_resource_props)
You’ve successfully written your first translator!
💡 Important! dbt models and Dagster asset keys must be unique. If you're receiving a
DuplicateKeyError
, add some logging to verify that the logic inget_asset_key
doesn't return two of the same key for different values!
Now, update the definition that uses
@dbt_assets
to be configured with an instance of theCustomizedDagsterDbtTranslator
. The@dbt_assets
decorator has adagster_dbt_translator
argument that you can pass this instance into. Don’t forget to instantiate the class!Your code should look something like this:
@dbt_assets( manifest=dbt_project.manifest_path, dagster_dbt_translator=CustomizedDagsterDbtTranslator() ) def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["build"], context=context).stream()
At this point, your dbt.py
file should match the following:
from dagster import AssetExecutionContext, AssetKey
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets
from ..project import dbt_project
class CustomizedDagsterDbtTranslator(DagsterDbtTranslator):
def get_asset_key(self, dbt_resource_props):
resource_type = dbt_resource_props["resource_type"]
name = dbt_resource_props["name"]
if resource_type == "source":
return AssetKey(f"taxi_{name}")
else:
return super().get_asset_key(dbt_resource_props)
@dbt_assets(
manifest=dbt_project.manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator(),
)
def dbt_analytics(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()