1 airflow 核心概念
- DAGs:即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行顺序。
- Operators:可以简单理解为一个class,描述了DAG中某个的task具体要做的事。其中,airflow内置了很多operators,如BashOperator 执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator 用于执行SQL命令等等,同时,用户可以自定义Operator,这给用户提供了极大的便利性。
- Tasks:Task 是 Operator的一个实例,也就是DAGs中的一个node。
- Task Instance:task的一次运行。Web 界面中可以看到task instance 有自己的状态,包括"running", "success", "failed", "skipped", "up for retry"等。
- Task Relationships:DAGs中的不同Tasks之间可以有依赖关系,如 Task1 >> Task2,表明Task2依赖于Task2了。
- 通过将DAGs和Operators结合起来,用户就可以创建各种复杂的 工作流(workflow)。
2 Airflow 的服务构成
一个正常运行的 Airflow 通常 由以下几个服务构成
- WebServer
Airflow 提供了一个可视化的 Web 界面。启动 WebServer 后,就可以在 Web 界面上查看定义好的 DAG 并监控及改变运行状况。也可以在 Web 界面中对一些变量进行配置。
- Worker
一般来说我们用 Celery Worker 来执行具体的作业。Worker 可以部署在多台机器上,并可以分别设置接收的队列。当接收的队列中有作业任务时,Worker 就会接收这个作业任务,并开始执行。Airflow 会自动在每个部署 Worker 的机器上同时部署一个 Serve Logs 服务,这样我们就可以在 Web 界面上方便的浏览分散在不同机器上的作业日志了。
- Scheduler
整个 Airflow 的调度由 Scheduler 负责发起,每隔一段时间 Scheduler 就会检查所有定义完成的 DAG 和定义在其中的作业,如果有符合运行条件的作业,Scheduler 就会发起相应的作业任务以供 Worker 接收。
- Flower
Flower 提供了一个可视化界面以监控所有 Celery Worker 的运行状况。这个服务并不是必要的。
3 基本操作命令
0、将py文件上传到Airflow集群
1、运行自己写的dags任务:
python ~/airflow/dags/tutorial.py
2、查看任务是否运行
# print the list of active DAGs
airflow list_dags
# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial
# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree
3、测试dags是不是可以正常传参
注意:日期需要传递当前时间之前的,当前时间之后的还没有运行。
# testing print_date
airflow test tutorial print_date 2015-06-01
# testing sleep tag_id task_id 2015-06-01
airflow test airflow test tutorial sleep 2015-06-01
4 示例
# -*- coding: utf-8 -*- from __future__ import print_function from builtins import range import airflow from airflow.operators.python_operator import PythonOperator from airflow.operators.bash_operator import BashOperator from airflow.models import DAG from datetime import datetime, timedelta import os import sys sys.path.append( ' ' ) args = { 'owner': 'airflow', 'start_date': datetime(2018, 9, 2) } dag = DAG( #schedule_interval='0 0 * * *' 配置 DAG 的执行周期,语法和 crontab 的一致。 minute hour day month week dag_id='example_python_operator_zz', default_args=args, schedule_interval=None) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag) def my_sleeping_function(): os.popen( " spark2-submit" " --master yarn-client " "'--deploy-mode' 'client'" " '--class' 'dd.test_zan'" " --num-executors 20 " "--executor-cores 1 " "--executor-memory 4G " "--driver-memory 2G " "hdfs://nameservice1/tmp/zz/first-1.0-SNAPSHOT.jar") run_this = PythonOperator( task_id='print_the_context', provide_context=False, python_callable=my_sleeping_function, dag=dag) run_this.set_upstream(t2)