Airbnb的數據工程師 Maxime Beauchemin 激動地表示道:Airflow 是一個咱們正在用的工做流調度器,如今的版本已經更新到1.6.1了,而且引入了一些列調度引擎的改革。咱們喜歡它是由於它寫代碼太容易了,也便於調試和維護。咱們也喜歡全都用他來寫代碼,而不是像xml那樣的配置文件用來描述DAG。更不用說,咱們顯然不用再學習太多東西。html
在一個分佈式環境中,宕機是時有發生的。Airflow經過自動重啓任務來適應這一變化。到目前爲止一切安好。當咱們有一系列你想去重置狀態的任務時,你就會發現這個功能簡直是救世主。爲了解決這個問題,咱們的策略是創建子DAG。這個子DAG任務將自動重試本身的那一部分,所以,若是你以子DAG設置任務爲永不重試,那麼憑藉子DAG操做你就能夠獲得整個DAG成敗的結果。若是這個重置是DAG的第一個任務設置子DAG的策略就會很是有效,對於有一個相對複雜的依賴關係結構設置子DAG是很是棒的作法。注意到子DAG操做任務不會正確地標記失敗任務,除非你從GitHub用了最新版本的Airflow。解決這個問題的另一個策略是使用重試柄:python
def make_spooq_exporter(table, schema, task_id, dag): return SpooqExportOperator( jdbc_url=('jdbc:mysql://%s/%s?user=user&password=pasta' % (TARGET_DB_HOST,TARGET_DB_NAME)), target_table=table, hive_table='%s.%s' % (schema, table), dag=dag, on_retry_callback=truncate_db, task_id=task_id) def truncate_db(context): hook = MySqlHook('clean_db_export') hook.run( 'truncate `%s`'%context['task_instance'].task.target_table, autocommit=False, parameters=None)
這樣你的重試柄就能夠將任務隔離,每次執行某個特定的任務。mysql
這在執行一個特定的可重複的任務時很是管用。用代碼來定義工做流是這個系統最強大之處是你能夠以編碼的方式產生DAG。這在在沒有人工干預的狀況下自動接入新的數據源的時候很是有用。git
咱們藉助現有的日誌目錄將檢查HDFS日誌融入DAG,而且在每次融入這些數據的時候在每一個目錄下產生一個任務。示例代碼以下:github
lognames = list( hdfs.list_filenames(conf.get('incoming_log_path'), full_path=False)) for logname in lognames: # TODO 使用適當的正則表達式來過濾掉不良日誌名,使得Airflow 能用符合特定的字符找出相應任務的名字 if logname not in excluded_logs and '%' not in logname and '@' not in logname: ingest = LogIngesterOperator( # 由於log_name以做爲unicode返回值,因此須要用str()包裝task_id task_id=str('ingest_%s' % logname), db=conf.get('hive_db'), logname=logname, on_success_callback=datadog_api.check_data_lag, dag=dp_dag ) ingest.set_upstream(transfer_from_incoming) ingest.set_downstream(transform_hive)
在天天結束的時候執行每日任務,而不是在當天工做開始的時候去執行這些任務。你不能將子DAG放在DAG文件夾下,換句話說除非你保管一類DAG,不然你不能夠將子DAG放在本身的模塊中。正則表達式
或者更具體地說就是,雖然你也能夠將子DAG放在DAG文件夾下,可是接着子DAG將先主DAG同樣運行本身的調度。這裏是一個兩個DAG的例子(假設他們同時在DAG文件夾下,也就是所謂的差DAG)這裏的子DAG將在主DAG中經過調度器被單獨調度。sql
from airflow.models import DAG from airflow.operators import PythonOperator, SubDagOperator from bad_dags.subdag import hive_dag from datetime import timedelta, datetime main_dag = DAG( dag_id='main_dag', schedule_interval=timedelta(hours=1), start_date=datetime(2015, 9, 18, 21) ) # 顯然,這單獨執行不起做用 transform_hive = SubDagOperator( subdag=hive_dag, task_id='hive_transform', dag=main_dag, trigger_rule=TriggerRule.ALL_DONE )
from airflow.models import DAG from airflow.operators import HiveOperator from datetime import timedelta, datetime # 這將經過子DAG操做符被做爲像是本身的調度任務中那樣運行。 hive_dag = DAG('main_dag.hive_transform', # 注意到這裏的重複迭代 schedule_interval=timedelta(hours=1), start_date=datetime(2015, 9, 18, 21)) hive_transform = HiveOperator(task_id='flatten_tables', hql=send_charge_hql, dag=dag)
除非你真的想這個子DAG被主DAG調度。docker
咱們經過使用工廠函數解決這個問題。這是一個優點那就是 主DAG能夠傳遞一些必要的參數到子DAG,所以他們在調度的時候其餘參數也自動賦值了。當你的主DAG發生變化時,咱們不須要去跟蹤參數。數據庫
在下面的例子中,假設DAG是所謂的好DAG:apache
from airflow.models import DAG from airflow.operators import PythonOperator, SubDagOperator from good_dags.subdag import hive_dag from datetime import timedelta, datetime main_dag = DAG( dag_id='main_dag', schedule_interval=timedelta(hours=1), start_date=datetime(2015, 9, 18, 21) ) # 顯然,這單獨執行不起做用 transform_hive = SubDagOperator( subdag=hive_dag(main_dag.start_date, main_dag.schedule_interval), task_id='hive_transform', dag=main_dag, trigger_rule=TriggerRule.ALL_DONE )
from airflow.models import DAG from airflow.operators import HiveOperator # 對調度程序來講,沒有Dag的頂層模塊就不起做用了 def hive_dag(start_date, schedule_interval): # you might like to make the name a parameter too dag = DAG('main_dag.hive_transform', # 注意這裏的設置 schedule_interval=schedule_interval, start_date=start_date) hive_transform = HiveOperator(task_id='flatten_tables', hql=send_charge_hql, dag=dag) return dag
使用工廠類使得子DAG在保障調度器從開始運行時就可維護就更強。
另外一種模式是將主DAG和子DAG之間的共享設爲默認參數,而後傳遞到工廠函數中去,(感謝 Maxime 的建議)。
即便子DAG做爲其父DAG的一部分被觸發子DAG也必須有一個調度,若是他們的調度是設成None,這個子DAG操做符將不會觸發任何任務。
更糟糕的是,若是你對子DAG被禁用,接着你又去運行子DAG操做,並且還沒運行完,那麼之後你的子DAG就再也運行不起來了。
這將快速致使你的主DAG同時運行的任務數量一下就達到上限(默認一次寫入是16個)而且這將致使調度器形同虛設。
這兩個例子都是緣起子DAG操做符被當作了回填工做。這裏能夠看到這個
Airflow1.6的最大更新是引入了DagRun。如今,任務調度實例是由DagRun對象來建立的。
相應地,若是你想跑一個DAG而不是回填工做,你可能就須要用到DagRun。
你能夠在代碼裏寫一些airflow trigger_dag
命令,或者也能夠經過DagRun頁面來操做。
這個巨大的優點就是調度器的行爲能夠被很好的理解,就像它能夠遍歷DagRun同樣,基於正在運行的DagRun來調度任務實例。
這個服務器如今能夠向咱們顯示每個DagRun的狀態,而且將任務實例的狀態與之關聯。
新的模型也提供了一個控制調度器的方法。下一個DagRun會基於數據庫裏上一個DagRun的實例來調度。
除了服務峯值的例外以外,大多數實例是處於運行仍是結束狀態都不會影響總體任務的運行。
這意味着若是你想返回一個在現有和歷史上不連續集合的部分DagRun ,你能夠簡單刪掉這個DagRun任務實例,而且設置DagRun的狀態爲正在運行。
按照咱們的經驗,一個須要佔用很長時間運行的調度器至少是個最終沒有安排任務的CeleryExcecutor
。很不幸,咱們仍然不知道具體的緣由。不過慶幸的是,Airflow 內建了一個以num_runs形式做標記的權宜之計。它爲調度器確認了許多迭代器來在它退出以前確保執行這個循環。咱們運行了10個迭代,Airbnb通常運行5個。注意到這裏若是用LocalExecutor
將會引起一些問題。咱們如今使用chef
來重啓executor;咱們正計劃轉移到supervisor
上來自動重啓。
這個airflow.operators
包有一些魔法,它讓咱們只能使用正確導入的操做符。這意味着若是你沒有安裝必要的依賴,你的操做符就會失效。
Airflow 是正在快速迭代中,並且不僅是Airbnb本身在作貢獻。Airflow將會繼續演化,而我也將寫更多有關Airflow的技巧供你們學習使用。
若是你也對解決這些問題感興趣,那就加入咱們吧!
Youtube: Airflow An open source platform to author and monitor data pipelines
Jonathan Dinu: Scalable Pipelines with Luigi or: I’ll have the Data Engineering, hold the Java!
Managing Containerized Data Pipeline Dependencies With Luigi
Petabyte-Scale Data Pipelines with Docker, Luigi and Elastic Spot Instances
原做者:Marcin Tustin 翻譯:Harry Zhu
英文原文地址:Airflow: Tips, Tricks, and Pitfalls做爲分享主義者(sharism),本人全部互聯網發佈的圖文均聽從CC版權,轉載請保留做者信息並註明做者 Harry Zhu 的 FinanceR專欄:https://segmentfault.com/blog/harryprince,若是涉及源代碼請註明GitHub地址:https://github.com/harryprince。微信號: harryzhustudio商業使用請聯繫做者。