Apache Airflow:基于Python,支持的任务Python、Bash、HTTP、Mysql等,支持Operator的自定义扩展。通过Python格式的DAG任务定义文件可灵活配置任务。如果你是Python用户可选择它。

维护Airflow日志:Airflow的日志会占用很大的空间,需要定期去清理,下面地址中有各种维护脚本: https://github.com/teamclairvoyant/airflow-maintenance-dags

使用Docker来启动Airflow

啥也不干的例子:

"""Used for unit tests"""
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

with DAG(dag_id='test_utils', schedule_interval=None, tags=['example']) as dag:

    task = BashOperator(
        task_id='sleeps_forever',
        bash_command="sleep 10000000000",
        start_date=days_ago(2),
        owner='airflow',
    )

Airflow timezone,Airflow中默认总是使用UTC时间进行调度,可以使用指定时区进行调度;

https://airflow.apache.org/docs/apache-airflow/stable/timezone.html

通过修改Timezone,我们可以解决调度安装本地时间

import pendulum

dag = DAG("my_tz_dag", start_date=pendulum.datetime(2016, 1, 1, tz="Europe/Amsterdam"))
op = EmptyOperator(task_id="empty", dag=dag)
print(dag.timezone)  # <Timezone [Europe/Amsterdam]>

但是调度时的时间参数还是UTC时间,需要通过时间宏解决。

task_enable = SimpleHttpOperator(
    task_id='in_hospital_realtime',
    http_conn_id="kettle_api_host",
    method='GET',
    endpoint='/kettle/executeJob/',
    data={
        "rep": "r1",
        "job": "in-hospital",
        "level": "Debug",
        "statistic-date": '{{ (execution_date + macros.timedelta(hours=8)).strftime("%Y-%m-%d") }}'
    },
    headers={
        "Content-Type": "application/json",
        "Authorization": "Basic Y2x1c3RlcjpjbHVzdGVy"
    },
    dag=dag
)

[ 编辑 | 历史 ]
最近由“jilili”在“2022-05-31 00:31:46”修改