Creating an asset triggered by a sensor
Now that you’ve defined how the asset can be materialized, let’s create the ad hoc report asset.
Add the following imports to
requests.py
:from dagster import asset, Config from dagster_duckdb import DuckDBResource import plotly.express as px import plotly.io as pio from . import constants
Create a new asset named
adhoc_request
with the following arguments:config
, type annotated toAdhocRequestConfig
- the
taxi_zones
andtaxi_trips
assets as dependencies database
, type annotated toDuckDBResource
to be able to query DuckDB.
The asset should look like this at this point:
@asset( deps=["taxi_zones", "taxi_trips"] ) def adhoc_request(config: AdhocRequestConfig, database: DuckDBResource) -> None:
When the report is written to a file, it should have a similar name to the request. A template has been provided in
assets/constants.py
that contains a template namedREQUEST_DESTINATION_TEMPLATE_FILE_PATH
.Generate the file name by using the template and stripping the
.json
extension from the request’s file name:file_path = constants.REQUEST_DESTINATION_TEMPLATE_FILE_PATH.format(config.filename.split('.')[0])
Next, write a SQL query that:
Filters out trips that didn’t start in the named borough during the provided date range
Aggregates the data by the day of week and hour the trip started
For example:
query = f""" select date_part('hour', pickup_datetime) as hour_of_day, date_part('dayofweek', pickup_datetime) as day_of_week_num, case date_part('dayofweek', pickup_datetime) when 0 then 'Sunday' when 1 then 'Monday' when 2 then 'Tuesday' when 3 then 'Wednesday' when 4 then 'Thursday' when 5 then 'Friday' when 6 then 'Saturday' end as day_of_week, count(*) as num_trips from trips left join zones on trips.pickup_zone_id = zones.zone_id where pickup_datetime >= '{config.start_date}' and pickup_datetime < '{config.end_date}' and pickup_zone_id in ( select zone_id from zones where borough = '{config.borough}' ) group by 1, 2 order by 1, 2 asc """
Next, run the query in DuckDB and store the results as a DataFrame:
with database.get_connection() as conn: results = conn.execute(query).fetch_df()
Now that the query data is stored as a DataFrame in the
results
variable, use the Plotly library to visualize the frequency of trips at each time of the day for the borough, stratified by the day of the week:fig = px.bar( results, x="hour_of_day", y="num_trips", color="day_of_week", barmode="stack", title=f"Number of trips by hour of day in {config.borough}, from {config.start_date} to {config.end_date}", labels={ "hour_of_day": "Hour of Day", "day_of_week": "Day of Week", "num_trips": "Number of Trips" } )
Lastly, save the report to the destination file specified by the
file_path
variable you declared earlier:pio.write_image(fig, file_path)
Verify that requests.py
file looks like the code below:
from dagster import Config, asset
from dagster_duckdb import DuckDBResource
import plotly.express as px
import plotly.io as pio
from . import constants
class AdhocRequestConfig(Config):
filename: str
borough: str
start_date: str
end_date: str
@asset(
deps=["taxi_zones", "taxi_trips"]
)
def adhoc_request(config: AdhocRequestConfig, database: DuckDBResource) -> None:
"""
The response to an request made in the `requests` directory.
See `requests/README.md` for more information.
"""
# strip the file extension from the filename, and use it as the output filename
file_path = constants.REQUEST_DESTINATION_TEMPLATE_FILE_PATH.format(config.filename.split('.')[0])
# count the number of trips that picked up in a given borough, aggregated by time of day and hour of day
query = f"""
select
date_part('hour', pickup_datetime) as hour_of_day,
date_part('dayofweek', pickup_datetime) as day_of_week_num,
case date_part('dayofweek', pickup_datetime)
when 0 then 'Sunday'
when 1 then 'Monday'
when 2 then 'Tuesday'
when 3 then 'Wednesday'
when 4 then 'Thursday'
when 5 then 'Friday'
when 6 then 'Saturday'
end as day_of_week,
count(*) as num_trips
from trips
left join zones on trips.pickup_zone_id = zones.zone_id
where pickup_datetime >= '{config.start_date}'
and pickup_datetime < '{config.end_date}'
and pickup_zone_id in (
select zone_id
from zones
where borough = '{config.borough}'
)
group by 1, 2
order by 1, 2 asc
"""
with database.get_connection() as conn:
results = conn.execute(query).fetch_df()
fig = px.bar(
results,
x="hour_of_day",
y="num_trips",
color="day_of_week",
barmode="stack",
title=f"Number of trips by hour of day in {config.borough}, from {config.start_date} to {config.end_date}",
labels={
"hour_of_day": "Hour of Day",
"day_of_week": "Day of Week",
"num_trips": "Number of Trips"
}
)
pio.write_image(fig, file_path)