Apache Airflow:基于Python,支持的任务Python、Bash、HTTP、Mysql等,支持Operator的自定义扩展。通过Python格式的DAG任务定义文件可灵活配置任务。如果你是Python用户可选择它。
维护Airflow日志:Airflow的日志会占用很大的空间,需要定期去清理,下面地址中有各种维护脚本: https://github.com/teamclairvoyant/airflow-maintenance-dags
啥也不干的例子:
"""Used for unit tests""" from airflow import DAG from airflow.operators.bash import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id='test_utils', schedule_interval=None, tags=['example']) as dag: task = BashOperator( task_id='sleeps_forever', bash_command="sleep 10000000000", start_date=days_ago(2), owner='airflow', )
Airflow timezone,Airflow中默认总是使用UTC时间进行调度,可以使用指定时区进行调度;
https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
通过修改Timezone,我们可以解决调度安装本地时间
import pendulum dag = DAG("my_tz_dag", start_date=pendulum.datetime(2016, 1, 1, tz="Europe/Amsterdam")) op = EmptyOperator(task_id="empty", dag=dag) print(dag.timezone) # <Timezone [Europe/Amsterdam]>
但是调度时的时间参数还是UTC时间,需要通过时间宏解决。
task_enable = SimpleHttpOperator( task_id='in_hospital_realtime', http_conn_id="kettle_api_host", method='GET', endpoint='/kettle/executeJob/', data={ "rep": "r1", "job": "in-hospital", "level": "Debug", "statistic-date": '{{ (execution_date + macros.timedelta(hours=8)).strftime("%Y-%m-%d") }}' }, headers={ "Content-Type": "application/json", "Authorization": "Basic Y2x1c3RlcjpjbHVzdGVy" }, dag=dag )