玖叶教程网

前端编程开发入门

Airflow调度工具笔记(一)(airflow调度时间)

1 airflow 核心概念

  • DAGs:即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行顺序。
  • Operators:可以简单理解为一个class,描述了DAG中某个的task具体要做的事。其中,airflow内置了很多operators,如BashOperator 执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator 用于执行SQL命令等等,同时,用户可以自定义Operator,这给用户提供了极大的便利性。
  • Tasks:Task 是 Operator的一个实例,也就是DAGs中的一个node。
  • Task Instance:task的一次运行。Web 界面中可以看到task instance 有自己的状态,包括"running", "success", "failed", "skipped", "up for retry"等。
  • Task Relationships:DAGs中的不同Tasks之间可以有依赖关系,如 Task1 >> Task2,表明Task2依赖于Task2了。
  • 通过将DAGs和Operators结合起来,用户就可以创建各种复杂的 工作流(workflow)。

2 Airflow 的服务构成

一个正常运行的 Airflow 通常 由以下几个服务构成

  • WebServer

Airflow 提供了一个可视化的 Web 界面。启动 WebServer 后,就可以在 Web 界面上查看定义好的 DAG 并监控及改变运行状况。也可以在 Web 界面中对一些变量进行配置。

  • Worker

一般来说我们用 Celery Worker 来执行具体的作业。Worker 可以部署在多台机器上,并可以分别设置接收的队列。当接收的队列中有作业任务时,Worker 就会接收这个作业任务,并开始执行。Airflow 会自动在每个部署 Worker 的机器上同时部署一个 Serve Logs 服务,这样我们就可以在 Web 界面上方便的浏览分散在不同机器上的作业日志了。

  • Scheduler

整个 Airflow 的调度由 Scheduler 负责发起,每隔一段时间 Scheduler 就会检查所有定义完成的 DAG 和定义在其中的作业,如果有符合运行条件的作业,Scheduler 就会发起相应的作业任务以供 Worker 接收。

  • Flower

Flower 提供了一个可视化界面以监控所有 Celery Worker 的运行状况。这个服务并不是必要的。

3 基本操作命令

0、将py文件上传到Airflow集群

1、运行自己写的dags任务:

python ~/airflow/dags/tutorial.py

2、查看任务是否运行

# print the list of active DAGs

airflow list_dags

# prints the list of tasks the "tutorial" dag_id

airflow list_tasks tutorial

# prints the hierarchy of tasks in the tutorial DAG

airflow list_tasks tutorial --tree

3、测试dags是不是可以正常传参

注意:日期需要传递当前时间之前的,当前时间之后的还没有运行。

# testing print_date

airflow test tutorial print_date 2015-06-01

# testing sleep tag_id task_id 2015-06-01

airflow test airflow test tutorial sleep 2015-06-01

4 示例

# -*- coding: utf-8 -*-
from __future__ import print_function
from builtins import range
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta
import os
import sys
sys.path.append( ' ' )
args = {
'owner': 'airflow',
'start_date': datetime(2018, 9, 2)
}
dag = DAG(
#schedule_interval='0 0 * * *' 配置 DAG 的执行周期,语法和 crontab 的一致。 minute hour day month week
dag_id='example_python_operator_zz', default_args=args, schedule_interval=None)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
def my_sleeping_function():
os.popen(
" spark2-submit"
" --master yarn-client "
"'--deploy-mode' 'client'"
" '--class' 'dd.test_zan'"
" --num-executors 20 "
"--executor-cores 1 "
"--executor-memory 4G "
"--driver-memory 2G "
"hdfs://nameservice1/tmp/zz/first-1.0-SNAPSHOT.jar")
run_this = PythonOperator(
task_id='print_the_context',
provide_context=False,
python_callable=my_sleeping_function,
dag=dag)
run_this.set_upstream(t2)

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言