凌晨两点,你的数据导出任务显示全部成功。数据库里却空空如也。这种"静默失败"比报错更致命——Airflow的内置告警根本不会触发。
本文基于官方技术文档,拆解两种外部监控方案:死亡开关(死信监控)+ 输出断言。核心目标只有一个:让"任务完成但零产出"这类故障能被及时发现。
为什么Airflow内置告警不够用
Airflow提供五种回调钩子:
• on_failure_callback — 任务或DAG运行失败
• on_success_callback — 任务或DAG运行成功
• on_retry_callback — 任务进入重试队列
• on_execute_callback — 任务即将开始执行
• on_skipped_callback — 任务触发跳过异常
这些钩子覆盖执行状态,但不覆盖业务结果。一个典型场景:导出任务连接了陈旧的数据库副本,超时后记录日志,干净退出。Airflow标记为绿色,on_success_callback正常触发,但数据从未落地。
你需要的是独立检查——一个Airflow外部的监控器,每次调度触发时追问两个问题:DAG完成了吗?产出了非零结果吗?
方案核心:死亡开关 + 输出断言
死亡开关(Dead Man's Switch)的工作逻辑:
• 设置预期上报间隔,例如"此DAG每24小时需上报一次"
• DAG完成时向监控器发送心跳
• 监控器在窗口期内未收到心跳即触发告警
这能捕获漏跑、暂停的DAG、调度器故障和执行漂移。但更强的是输出断言:上报时携带计数,监控器在计数为零时告警——即使任务成功完成并正常上报。
本文示例使用DeadManCheck,目前唯一支持输出断言的定时任务监控工具,免费版支持5个监控项。
正方:DAG级回调是最干净的方案
若需监控整个DAG运行而非单个任务,在DAG级别配置on_success_callback和on_failure_callback。
代码结构如下:
def ping_start(context):
"""标记DAG启动——启用耗时监控"""
try:
requests.get(f"{BASE_URL}/start", timeout=5)
except Exception:
pass # 绝不让监控逻辑中断主任务
def ping_success(context):
"""标记成功。从XCom拉取行数用于输出断言"""
try:
rows = context["ti"].xcom_pull(task_ids="export_data", key="rows_exported")
优势很明显:配置集中,一次设置覆盖整个DAG;逻辑与业务代码解耦;失败时通过on_failure_callback上报,成功时通过on_success_callback带上产出数据。
关键设计原则:监控代码必须包裹在try-except中,超时设为5秒。监控失效不能导致主任务失败——这是监控系统的基本底线。
反方:DAG级回调的盲区与替代方案
DAG级回调并非万能。它的盲区在于:
• 无法区分"部分任务成功"与"全部任务成功"
• 若DAG包含多个并行导出分支,单一回调难以聚合多源产出
• context["ti"]在DAG级别访问的是最后执行的任务实例,可能不是你想监控的那个
替代方案是任务级监控:在关键任务(如export_data)上使用PythonOperator的on_success_callback,直接读取任务返回值。
任务级方案的优势:精准定位产出环节;支持多分支独立监控;回调逻辑与具体任务绑定,上下文更清晰。
代价是配置分散,每个关键任务都需单独设置。对于复杂DAG,维护成本显著上升。
我的判断:按数据关键性分层部署
两种方案不是互斥关系,而是互补的监控层级。
核心数据管道(如财务结算、用户行为导出)采用任务级监控:在数据产出任务上硬编码输出断言,确保"零行即告警"。这是最后一道防线,不能依赖DAG级别的聚合逻辑。
辅助性管道(如日志清理、临时报表)使用DAG级监控:快速部署,覆盖"是否按时运行"的基础需求。成本最低,维护最简单。
关键认知转变:Airflow的绿色状态只承诺"代码执行完毕",不承诺"业务目标达成"。外部监控的本质,是在调度系统之外建立独立的事实核查机制。
死亡开关解决"有没有跑",输出断言解决"有没有产出"。两者结合,才能堵住静默失败的漏洞。
下一步行动:审计你当前Airflow DAG的监控覆盖。找出那些"成功但可能零产出"的关键任务,为它们添加输出断言。免费的5个监控额度足够验证方案可行性,之后再评估是否扩容。
热门跟贴