To send messages to MS Teams channel, an incoming webhook has to be created. The incoming webhook url must be given as a part of the resource config to the msteams_resource in dagster.
Default Value: 60
This resource is for connecting to Microsoft Teams.
The resource object is a dagster_msteams.TeamsClient.
By configuring this resource, you can post messages to MS Teams from any Dagster solid:
Examples:
import os
from dagster import ModeDefinition, execute_pipeline
from dagster._legacy import pipeline, solid
from dagster_msteams import Card, msteams_resource
@solid(required_resource_keys={"msteams"})
def teams_solid(context):
    card = Card()
    card.add_attachment(text_message="Hello There !!")
    context.resources.msteams.post_message(payload=card.payload)
@pipeline(
    mode_defs=[ModeDefinition(resource_defs={"msteams": msteams_resource})],
)
def teams_pipeline():
    teams_solid()
execute_pipeline(
    teams_pipeline,
    {"resources": {"msteams": {"config": {"hook_url": os.getenv("TEAMS_WEBHOOK_URL")}}}},
)
Create a hook on step failure events that will message the given MS Teams webhook URL.
message_fn (Optional(Callable[[HookContext], str])) – Function which takes in the HookContext outputs the message you want to send.
dagit_base_url – (Optional[str]): The base url of your Dagit instance. Specify this to allow messages to include deeplinks to the specific run that triggered the hook.
Examples
@teams_on_failure(dagit_base_url="http://localhost:3000")
@job(...)
def my_job():
    pass
def my_message_fn(context: HookContext) -> str:
    return f"Op {context.op.name} failed!"
@op
def a_op(context):
    pass
@job(...)
def my_job():
    a_op.with_hooks(hook_defs={teams_on_failure("#foo", my_message_fn)})
Create a hook on step success events that will message the given MS Teams webhook URL.
message_fn (Optional(Callable[[HookContext], str])) – Function which takes in the HookContext outputs the message you want to send.
dagit_base_url – (Optional[str]): The base url of your Dagit instance. Specify this to allow messages to include deeplinks to the specific run that triggered the hook.
Examples
@teams_on_success(dagit_base_url="http://localhost:3000")
@job(...)
def my_job():
    pass
def my_message_fn(context: HookContext) -> str:
    return f"Op {context.op.name} failed!"
@op
def a_op(context):
    pass
@job(...)
def my_job():
    a_op.with_hooks(hook_defs={teams_on_success("#foo", my_message_fn)})
Create a sensor on run failures that will message the given MS Teams webhook URL.
hook_url (str) – MS Teams incoming webhook URL.
message_fn (Optional(Callable[[RunFailureSensorContext], str])) – Function which
takes in the RunFailureSensorContext and outputs the message you want to send.
Defaults to a text message that contains error message, job name, and run ID.
http_proxy – (Optional[str]): Proxy for requests using http protocol.
https_proxy – (Optional[str]): Proxy for requests using https protocol.
timeout – (Optional[float]): Connection timeout in seconds. Defaults to 60.
verify – (Optional[bool]): Whether to verify the servers TLS certificate.
name – (Optional[str]): The name of the sensor. Defaults to “teams_on_run_failure”.
dagit_base_url – (Optional[str]): The base url of your Dagit instance. Specify this to allow messages to include deeplinks to the failed run.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
Examples
teams_on_run_failure = make_teams_on_run_failure_sensor(
    hook_url=os.getenv("TEAMS_WEBHOOK_URL")
)
@repository
def my_repo():
    return [my_job + teams_on_run_failure]
def my_message_fn(context: RunFailureSensorContext) -> str:
    return "Job {job_name} failed! Error: {error}".format(
        job_name=context.dagster_run.job_name,
        error=context.failure_event.message,
    )
teams_on_run_failure = make_teams_on_run_failure_sensor(
    hook_url=os.getenv("TEAMS_WEBHOOK_URL"),
    message_fn=my_message_fn,
    dagit_base_url="http://localhost:3000",
)