Skip to main content

Run status sensors

If you want to act on the status of a run, Dagster provides a way to create a sensor that reacts to run statuses. You can use run_status_sensor with a specified DagsterRunStatus to decorate a function that will run when the given status occurs. This can be used to launch other runs, send alerts to a monitoring service on run failure, or report a run success.

Launching a job run if a run is successful

Here is an example of a run status sensor that launches a run of status_reporting_job if a run is successful:

src/<project_name>/defs/sensors.py
@dg.run_status_sensor(
run_status=dg.DagsterRunStatus.SUCCESS,
request_job=status_reporting_job,
)
def report_status_sensor(context: dg.RunStatusSensorContext):
# this condition prevents the sensor from triggering status_reporting_job again after it succeeds
if context.dagster_run.job_name != status_reporting_job.name:
run_config = {
"ops": {
"status_report": {"config": {"job_name": context.dagster_run.job_name}}
}
}
return dg.RunRequest(run_key=None, run_config=run_config)
else:
return dg.SkipReason("Don't report status of status_reporting_job")

request_job is the job that will be run when the RunRequest is returned.

Note that in report_status_sensor we conditionally return a RunRequest. This ensures that when report_status_sensor runs status_reporting_job it doesn't enter an infinite loop where the success of status_reporting_job triggers another run of status_reporting_job, which triggers another run, and so on.

Reporting job success in a Slack message

Here is an example of a sensor that reports job success in a Slack message:

src/<project_name>/defs/sensors.py
import dagster as dg


@dg.run_status_sensor(run_status=dg.DagsterRunStatus.SUCCESS)
def my_slack_on_run_success(context: dg.RunStatusSensorContext):
slack_client = WebClient(token=os.environ["SLACK_DAGSTER_ETL_BOT_TOKEN"])

slack_client.chat_postMessage(
channel="#alert-channel",
text=f'Job "{context.dagster_run.job_name}" succeeded.',
)

When a run status sensor is triggered by a run but doesn't return anything, Dagster will report an event back to the run to indicate that the sensor ran.

Coordinating multiple independent jobs

Run status sensors can be used to coordinate the execution of multiple independent jobs. This approach is useful when you need to trigger a downstream job only after several upstream jobs have completed successfully, particularly when those upstream jobs run on different schedules or are triggered independently.

To coordinate multiple jobs, use a run status sensor that monitors completion status and tracks processed runs using a cursor. This ensures the downstream job triggers exactly once per batch of upstream completions.

src/<project_name>/defs/assets.py
import json
from datetime import datetime, timedelta

import dagster as dg


@dg.asset
def first_asset(context: dg.AssetExecutionContext) -> None:
context.log.info("First asset")


@dg.asset
def second_asset(context: dg.AssetExecutionContext) -> None:
context.log.info("Second asset")


@dg.asset
def third_asset(context: dg.AssetExecutionContext) -> None:
context.log.info("Third asset")


# Define the upstream jobs
upstream_job_1 = dg.define_asset_job(name="upstream_job_1", selection="first_asset")

upstream_job_2 = dg.define_asset_job(name="upstream_job_2", selection="second_asset")

downstream_job = dg.define_asset_job(name="downstream_job", selection="third_asset")
src/<project_name>/defs/sensors.py
import json
from datetime import datetime, timedelta

import dagster as dg

# Define the job names for the upstream jobs
UPSTREAM_JOB_A = "upstream_job_1"
UPSTREAM_JOB_B = "upstream_job_2"

# Define the downstream job
DOWNSTREAM_JOB = "downstream_job"


@dg.sensor(job_name=DOWNSTREAM_JOB)
def compare_completion_times_sensor(context: dg.SensorEvaluationContext):
instance = context.instance
now = datetime.now()
one_day_ago = now - timedelta(days=1)

filter_a = dg.RunsFilter(
job_name=UPSTREAM_JOB_A, statuses=[dg.DagsterRunStatus.SUCCESS]
)
filter_b = dg.RunsFilter(
job_name=UPSTREAM_JOB_B, statuses=[dg.DagsterRunStatus.SUCCESS]
)

run_records_a = instance.get_run_records(
filters=filter_a, limit=1, order_by="update_timestamp", ascending=False
)
run_records_b = instance.get_run_records(
filters=filter_b, limit=1, order_by="update_timestamp", ascending=False
)
if not run_records_a or not run_records_b:
return dg.SkipReason(
"One or both upstream jobs have not completed successfully."
)

completion_time_a = run_records_a[0].end_time
completion_time_b = run_records_b[0].end_time

previous_cursor = json.loads(context.cursor) if context.cursor else {}
previous_completion_time_a = previous_cursor.get("completion_time_a")
previous_completion_time_b = previous_cursor.get("completion_time_b")

if isinstance(previous_completion_time_a, float):
previous_completion_time_a = datetime.fromtimestamp(previous_completion_time_a)
if isinstance(previous_completion_time_b, float):
previous_completion_time_b = datetime.fromtimestamp(previous_completion_time_b)

completion_time_a = (
datetime.fromtimestamp(completion_time_a)
if completion_time_a is not None
else None
)
completion_time_b = (
datetime.fromtimestamp(completion_time_b)
if completion_time_b is not None
else None
)

if (
completion_time_a
and completion_time_b
and completion_time_a > one_day_ago
and completion_time_b > one_day_ago
and (
not previous_completion_time_a
or completion_time_a > previous_completion_time_a
)
and (
not previous_completion_time_b
or completion_time_b > previous_completion_time_b
)
):
new_cursor = json.dumps(
{
"completion_time_a": completion_time_a.timestamp(),
"completion_time_b": completion_time_b.timestamp(),
}
)
context.update_cursor(new_cursor)

return dg.RunRequest(run_key=None, run_config={})
else:
return dg.SkipReason(
"One or both upstream jobs did not complete within the past day."
)

This sensor monitors two upstream jobs (upstream_job_1 and upstream_job_2) and triggers a downstream job (downstream_job) only after both have completed successfully.