加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 百科 > 正文

airflow FAQ

发布时间:2020-12-14 02:09:29 所属栏目:百科 来源:网络整理
导读:1、关于airflow设置环境变量 在BashOperator的env(dict类型)参数中添加环境变量,当然也可以在声明DAG时的default_args中添加env的声明, 但需要注意,如果设置了 env , airflow 就不再访问系统的环境变量,所以这里设置的 env 一定要包含程序运行所需的 所

1、关于airflow设置环境变量

在BashOperator的env(dict类型)参数中添加环境变量,当然也可以在声明DAG时的default_args中添加env的声明,但需要注意,如果设置了envairflow就不再访问系统的环境变量,所以这里设置的env一定要包含程序运行所需的所有环境变量,否则会出错

import os
local_env = os.environ
local_env['PATH'] = os.environ['PATH'] + ":" + Variable.get('PATH')
local_env['JAVA_HOME'] = Variable.get('JAVA_HOME')
 
在dag的default_args中添加'env':dict(local_env)

2、mark_success

当task执行完成,但只是返回值为失败的时候,可以不rerun该task,而是marksuccess,然后继续执行下面的task

UI中的操作暂时未成功,点击总是提示“No task instances to markas successful ”,目前可以通过强制rerun下一个task,使下一个task成功,虽然失败的task的状态未改变,但已经不影响下面的task执行

强制rerun的命令为airflowrun -f -i dag_id task_id execute_time

3、rerun

在UI界面中clear某个task,则airflow会自动rerun该task

当使用run按钮时,要求使用CeleryExecutor才可执行,因需要使用redis数据库,故暂时未尝试

4、hold某个dag或task

暂时只支持hold(pause)某个dag,可以使用命令airflow pause/unpause dagid来pause或unpause某个dag,也可在页面中点击dag前面的On/Off按钮来unpause/pause某个dag,若当前dag有某个task已经启动,则无法停止,只能保证下一个task不执行

暂时无法hold或pause某个task,只支持以dag为单位pause

5、当使用BashOperator时,command需要调用脚本时,脚本后需要有个空格,否则报错,暂时不清楚原因,但加空格后可以正常执行,如下例,run.sh后需加空格

t1 = BashOperator(
       task_id='process_rankinglist',bash_command='/home/rankinglist_processor/run.sh ',dag=dag)

6、airflow提供了很多Macros Variables,可以直接使用jinja模板语言调用宏变量

templated_command = """
echo "dag_run:{{ dag_run }}"
echo "run_id:{{ run_id }}"
echo "execution_date:{{ execution_date }}"
echo "ts:{{ ts }}"
echo "ti:{{ ti }}"
sleep 3
"""
 
t1 = BashOperator(
       task_id='xcom',bash_command=templated_command,dag=dag)


但是需要注意,其中的execution_date并不是task的真正执行时间,而是上一周期task的执行时间。

即可以理解为“actual_execution_date= execution_date +schedual_interval”,或者我们换一种说法,我们在airflow上看到一个任务是6am执行的,而且interval=4hours,那么execution_date的值是2am,而不是6am,所以获取某个task的真正执行时间,需要获取execution_date的下个执行周期时间,即使用dag.following_schedule(execution_date)

7、使用Xcom在task之间传参

可以直接使用jinja模板语言,在{{}}中调用ti的xcom_push和xcom_pull方法,下面的例子为t1使用xcom_push推出了一个kv,t2通过taskid和key来接收

dag = DAG(
   dag_id='xcomtest',default_args=default_args,schedule_interval='*/2 * ** *')
 
t1 = BashOperator(
   task_id='xcom',bash_command='''{{ ti.xcom_push(key='aaa',value='bbb') }}''',dag=dag)
 
t2 = BashOperator(
   task_id='xcom2',bash_command='''echo"{{ ti.xcom_pull(key='aaa',task_ids='xcom') }}" ''',dag=dag)
t2.set_upstream(t1)

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读