type
status
date
slug
summary
tags
category
icon
password
问题描述

The Airflow scheduler monitors all tasks and DAGs, then triggers the task instances once their dependencies are complete. Behind the scenes, the scheduler spins up a subprocess, which monitors and stays in sync with all DAGs in the specified DAG directory. Once per minute, by default, the scheduler collects DAG parsing results and checks whether any active tasks can be triggered.
Airflow为分布式架构,其中Scheduler负责DAG文件的解析和任务的调度。只有当DAG文件被Scheduler的DagFileProcessor解析为DAG对象并序列化存入数据库后,我们才可以通过WebUI和CLI方式来运行该DAG。
然后DagFileProcessor并没有监听
DAGS_FOLDER
文件夹内文件的变化,而是以 DAG_DIR_LIST_INTERVAL
(默认为300秒)设定的间隔,无限循环扫描文件夹内的文件并解析。因此,当我们新建DAG文件放入指定的文件夹时,除非正好遇上了下一个循环周期的开始,否则DAG一般是无法立马被运行的。当使用CLI命令
airflow dags list
来查看所有DAG时,已经放入DAG文件夹但是还未被解析的DAG会如以下的 example_dag_3
所示, paused
属性显示为 None
。如果我们有动态创建好DAG文件,需要立即去执行的需求时,Airflow自带的这套逻辑就不太好使了。尽管我们可以通过调小
DAG_DIR_LIST_INTERVAL
的方式来让我们的新DAG被更快的扫描到,但是这会导致CPU使用率升高,尤其是在DAG文件非常多的情况下。既然Airflow不提供立马扫描新文件的功能,那我们自己实现一个好了!首先我们需要了解DagFileProcessor工作的逻辑。在启动Scheduler的时候,默认情况下会启动一个子进程DagFileProcessorManager,他的工作流程如下图所示:

并且由于Airflow是分布式架构,可以同时启动多个Scheduler,那么启动多个DagFileProcessorManager自然也不是问题。所以我们不需要对Airflow本身做任何改动,可以保留它默认300秒扫描一次新文件的设定。因为除了扫描新文件,它还会处理已经被删除的DAG。
只需要手动执行Process files的流程,即在需要更新或新增DAG的时候,手动对指定文件进行解析。
DAG文件解析流程
下面我们就从CLI命令开始,梳理一个DAG文件被解析的流程。
DagProcessorCommand
使用
airflow -h
指令可以发现,有指令为 airflow dag-processor
可以启动一个独立的Dag Processor实例,对应的操作在文件 /airflow/cli/commands/dag_processor_command.py
中的 dag_processor
函数。执行的操作是构建一个
DagProcessorJobRunner
,构建时传入两个参数,一个是 Job
,一个是 DagFileProcessorManager
。然后调用
airflow.jobs.job
中的 run_job
函数,进入函数内部可以发现执行了以下操作:- 调用
Job
的prepare_for_execution
方法
- 使用
execute_job
函数(根据执行结果设置Job
的state
)执行传入run_job
的execute_callable
函数,对应就是DagProcessorJobRunner
的_execute
方法,该方法内部调用DagFileProcessorManager
的start
方法。
- 调用
Job
的complete_execution
方法。
Job
可以直接无视,它是一个ORM对象,用来将执行结果存入数据库,对我们来说没有必要。因此,直接进入下一步 DagFileProcessorManager
。DagFileProcessorManager
直接看
start
方法,内部执行的操作如下:- 调用
register_exit_signals
方法,注册了一堆退出的信号
- 调用
airflow.utils.process_utils
的set_new_process_group
函数,将当前进程抽出来,建立一个新的进程组
- 调用
_run_parsing_loop
方法
前两步也不用管,看
_run_parsing_loop
方法,内部执行的操作如下:- 调用
_refresh_dag_dir
方法,获取DAG文件夹内文件的路径
- 调用
prepare_file_path_queue
方法,构建DAG文件处理队列
- 根据一堆信号和条件确定工作状态
- 调用
_scan_stale_dags
处理已经被删除的DAG,并再次获取DAG文件夹内文件的路径,构建DAG文件处理队列
- 调用
start_new_processes
方法,内部根据PARSING_PROCESS
配置项判断能否创建更多的处理进程,同时启动处理进程
- 最后收集处理结果,并判断运行条件,返回步骤3进入下一个循环
其他也都不用管,看
start_new_processes
方法,内部执行操作如下:- 从DAG文件处理队列里弹一个DAG文件路径出来
- 调用
_create_process
方法创建处理进程
- 调用处理进程的
start
方法启动进程
进入
_create_process
方法,里面就是构建了一个 DagFileProcessorProcess
。DagFileProcessorProcess
直接看
start
方法,内部执行的操作如下:- 加载前置库
- 创建子进程,执行
_run_file_processor
方法,并启动进程
看
_run_file_processor
方法,内部执行的操作如下:- 调用
airflow.settings
的configure_orm
方法
- 构建
DagFileProcessor
- 调用Processor的
process_file
方法
那就进入
DagFileProcessor
接着看DagFileProcessor
直接看
process_file
方法,内部执行的操作如下:- 调用
DagFileProcessor
的类方法_get_dagbag
构建DagBag
对象
- 将
DagBag
中的DAG存入数据库,并处理导入错误
其实到这一步已经足够了,但是我们还可以简单看一下
DagBag
的实现。DagBag
看初始化方法
__init__
,参数 collect_dags
默认为 True
,所以在初始化时就会调用 collect_dags
方法,进入该方法。collect_dags
内部遍历 dag_folder
参数对应的文件,调用 process_file
方法解析文件,并将解析结果包装为 FileLoadStat
存入 dagbag_stats
。看
process_file
方法,内部调用 _load_modules_from_file
方法或 _load_modules_from_zip
方法来导入Python Modules,然后调用 _process_modules
方法来处理这些Modules。看
_process_modules
方法,内部操作通过Modules的 __dict__
参数获取顶层变量,并筛选类型为 DAG
的对象,遍历 DAG
对象,对其执行以下操作:- 调用
validate
方法校验DAG
- 调用
bag_dag
方法打包DAG
看
bag_dag
方法,内部操作如下:- 调用
check_cycle
函数检查DAG是否存在环
- 调用
resolve_template_files
方法处理模板
- 写入
last_loaded
时间
- 检查DAG和其Task是否符合策略
- 处理子DAG
- 将所有DAG存入
dags
参数
至此
DagBag
的逻辑结束。解决方案
了解了
DagFileProcessorManager
解析DAG文件的流程,我们就可以自己实现DAG文件的解析操作。主要思想就是实现 DagFileProcessorProcess
类中 _run_file_processor
方法的 _handle_dag_file_processing
函数,注意模仿 DagFileProcessorProcess
使用多进程去调用它,防止DAG文件解析时间过长造成程序卡死。- 作者:TerraceCN
- 链接:https://blog.terrace.ink/article/77d23126-f6e1-4e48-9c75-41630dc139a6
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。