type
status
date
slug
summary
tags
category
icon
password

问题描述

Airflow 架构
Airflow 架构
The Airflow scheduler monitors all tasks and DAGs, then triggers the task instances once their dependencies are complete. Behind the scenes, the scheduler spins up a subprocess, which monitors and stays in sync with all DAGs in the specified DAG directory. Once per minute, by default, the scheduler collects DAG parsing results and checks whether any active tasks can be triggered.
Airflow为分布式架构,其中Scheduler负责DAG文件的解析和任务的调度。只有当DAG文件被Scheduler的DagFileProcessor解析为DAG对象并序列化存入数据库后,我们才可以通过WebUI和CLI方式来运行该DAG。
然后DagFileProcessor并没有监听 DAGS_FOLDER 文件夹内文件的变化,而是以 DAG_DIR_LIST_INTERVAL (默认为300秒)设定的间隔,无限循环扫描文件夹内的文件并解析。
因此,当我们新建DAG文件放入指定的文件夹时,除非正好遇上了下一个循环周期的开始,否则DAG一般是无法立马被运行的。当使用CLI命令 airflow dags list 来查看所有DAG时,已经放入DAG文件夹但是还未被解析的DAG会如以下的 example_dag_3 所示, paused 属性显示为 None
如果我们有动态创建好DAG文件,需要立即去执行的需求时,Airflow自带的这套逻辑就不太好使了。尽管我们可以通过调小 DAG_DIR_LIST_INTERVAL 的方式来让我们的新DAG被更快的扫描到,但是这会导致CPU使用率升高,尤其是在DAG文件非常多的情况下。
既然Airflow不提供立马扫描新文件的功能,那我们自己实现一个好了!首先我们需要了解DagFileProcessor工作的逻辑。在启动Scheduler的时候,默认情况下会启动一个子进程DagFileProcessorManager,他的工作流程如下图所示:
DagFileProcessorManager 工作流程
DagFileProcessorManager 工作流程
并且由于Airflow是分布式架构,可以同时启动多个Scheduler,那么启动多个DagFileProcessorManager自然也不是问题。所以我们不需要对Airflow本身做任何改动,可以保留它默认300秒扫描一次新文件的设定。因为除了扫描新文件,它还会处理已经被删除的DAG。
只需要手动执行Process files的流程,即在需要更新或新增DAG的时候,手动对指定文件进行解析。

DAG文件解析流程

下面我们就从CLI命令开始,梳理一个DAG文件被解析的流程。

DagProcessorCommand

使用 airflow -h 指令可以发现,有指令为 airflow dag-processor 可以启动一个独立的Dag Processor实例,对应的操作在文件 /airflow/cli/commands/dag_processor_command.py 中的 dag_processor 函数。
执行的操作是构建一个 DagProcessorJobRunner ,构建时传入两个参数,一个是 Job ,一个是 DagFileProcessorManager
然后调用 airflow.jobs.job 中的 run_job 函数,进入函数内部可以发现执行了以下操作:
  1. 调用 Jobprepare_for_execution 方法
  1. 使用 execute_job 函数(根据执行结果设置 Jobstate )执行传入 run_jobexecute_callable 函数,对应就是 DagProcessorJobRunner_execute 方法,该方法内部调用 DagFileProcessorManagerstart 方法。
  1. 调用 Jobcomplete_execution 方法。
Job 可以直接无视,它是一个ORM对象,用来将执行结果存入数据库,对我们来说没有必要。因此,直接进入下一步 DagFileProcessorManager

DagFileProcessorManager

直接看 start 方法,内部执行的操作如下:
  1. 调用 register_exit_signals 方法,注册了一堆退出的信号
  1. 调用 airflow.utils.process_utilsset_new_process_group 函数,将当前进程抽出来,建立一个新的进程组
  1. 调用 _run_parsing_loop 方法
前两步也不用管,看 _run_parsing_loop 方法,内部执行的操作如下:
  1. 调用 _refresh_dag_dir 方法,获取DAG文件夹内文件的路径
  1. 调用 prepare_file_path_queue 方法,构建DAG文件处理队列
  1. 根据一堆信号和条件确定工作状态
  1. 调用 _scan_stale_dags 处理已经被删除的DAG,并再次获取DAG文件夹内文件的路径,构建DAG文件处理队列
  1. 调用 start_new_processes 方法,内部根据 PARSING_PROCESS 配置项判断能否创建更多的处理进程,同时启动处理进程
  1. 最后收集处理结果,并判断运行条件,返回步骤3进入下一个循环
其他也都不用管,看 start_new_processes 方法,内部执行操作如下:
  1. 从DAG文件处理队列里弹一个DAG文件路径出来
  1. 调用 _create_process 方法创建处理进程
  1. 调用处理进程的 start 方法启动进程
进入 _create_process 方法,里面就是构建了一个 DagFileProcessorProcess

DagFileProcessorProcess

直接看 start 方法,内部执行的操作如下:
  1. 加载前置库
  1. 创建子进程,执行 _run_file_processor 方法,并启动进程
_run_file_processor 方法,内部执行的操作如下:
  1. 调用 airflow.settingsconfigure_orm 方法
  1. 构建 DagFileProcessor
  1. 调用Processor的 process_file 方法
那就进入 DagFileProcessor 接着看

DagFileProcessor

直接看 process_file 方法,内部执行的操作如下:
  1. 调用 DagFileProcessor 的类方法 _get_dagbag 构建 DagBag 对象
  1. DagBag 中的DAG存入数据库,并处理导入错误
其实到这一步已经足够了,但是我们还可以简单看一下 DagBag 的实现。

DagBag

看初始化方法 __init__ ,参数 collect_dags 默认为 True ,所以在初始化时就会调用 collect_dags 方法,进入该方法。
collect_dags 内部遍历 dag_folder 参数对应的文件,调用 process_file 方法解析文件,并将解析结果包装为 FileLoadStat 存入 dagbag_stats
process_file 方法,内部调用 _load_modules_from_file 方法或 _load_modules_from_zip 方法来导入Python Modules,然后调用 _process_modules 方法来处理这些Modules。
_process_modules 方法,内部操作通过Modules的 __dict__ 参数获取顶层变量,并筛选类型为 DAG 的对象,遍历 DAG 对象,对其执行以下操作:
  1. 调用 validate 方法校验DAG
  1. 调用 bag_dag 方法打包DAG
bag_dag 方法,内部操作如下:
  1. 调用 check_cycle 函数检查DAG是否存在环
  1. 调用 resolve_template_files 方法处理模板
  1. 写入 last_loaded 时间
  1. 检查DAG和其Task是否符合策略
  1. 处理子DAG
  1. 将所有DAG存入 dags 参数
至此 DagBag 的逻辑结束。

解决方案

了解了 DagFileProcessorManager 解析DAG文件的流程,我们就可以自己实现DAG文件的解析操作。主要思想就是实现 DagFileProcessorProcess 类中 _run_file_processor 方法的 _handle_dag_file_processing 函数,注意模仿 DagFileProcessorProcess 使用多进程去调用它,防止DAG文件解析时间过长造成程序卡死。
 
未来降雨概率API工业大数据分析算法(三):振动分析类