# Airflow 1.10+安裝本次安裝Airflow版本爲1.10+,其須要依賴Python和DB,本次選擇的DB爲Mysql。本次安裝組件及版本以下:Airflow == 1.10.0Python == 3.6.5Mysql == 5.7# 總體流程1. 建表2. 安裝3. 配置4. 運行5. 配置任務```啓動scheduleairflow scheduler -D啓動webserverairflow webserver -Dps -ef|grep -Ei '(airflow-webserver)'| grep master | awk '{print $2}'|xargs -i kill {}ps -ef | grep -Ei 'airflow' | grep -v 'grep' | awk '{print $2}' | xargs -i kill {}## 建庫、建用戶```庫名爲airflow'create database airflow;'建用戶用戶名爲airflow,而且設置全部ip都可以訪問。create user 'airflow'@'%' identified by 'airflow';create user 'airflow'@'localhost' identified by 'airflow';用戶受權這裏爲新建的airflow用戶授予airflow庫的全部權限grant all on airflow.* to 'airflow'@'%';flush privileges```## Airflow安裝```這裏經過 virtualenv 進行安裝。----- 經過virtualenv安裝$ mkdir /usr/local/virtual_env && cd /usr/local/virtual_env # 建立目錄$ virtualenv --no-site-packages airflow --python=python # 建立虛擬環境$ source /usr/local/virtual_env/airflow/bin/activate # 激活虛擬環境----- 安裝指定版本或者默認$ pip install apache-airflow -i https://pypi.douban.com/simple在安裝完一堆的依賴後,就須要配置 AIRFLOW_HOME 環境變量,後續的 DAG 和 Plugin 都將以該目錄做爲根目錄查找,如上,能夠直接設置爲 /tmp/project 。報錯ERROR: flask 1.1.1 has requirement Jinja2>=2.10.1, but you'll have jinja2 2.10 which is incompatible.ERROR: flask 1.1.1 has requirement Werkzeug>=0.15, but you'll have werkzeug 0.14.1 which is incompatible.執行:pip3 install -U Flask==1.0.4執行:pip3 install -U pika==0.13.1從新執行 :pip install apache-airflow -i https://pypi.douban.com/simple----- 設置環境變量(airflow) $ export AIRFLOW_HOME=/tmp/airflow----- 查看其版本信息(airflow) $ airflow version ____________ _____________ ____ |__( )_________ __/__ /________ ______ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ v1.8.0執行了上述的命令後,會生成 airflow.cfg 和 unittests.cfg 兩個文件,其中前者是一個配置文件 。## airflow 配置----- 修改Airflow DB配置### 1. 安裝Mysql模塊pip install "apache-airflow[mysql]"這裏能夠簡單說下,airflow依賴的其餘組件都可以此方式安裝。在以後安裝password組件一樣是經過此方式。修改Airflow DB配置修改${AIRFLOW_HOME}/airflow.cfgsql_alchemy_conn = mysql+mysqldb://airflow:airflow@localhost:3306/airflow參數的格式爲mysql://賬號:密碼@ip:port/db初始化db新建airflow依賴的表。airflow initdb如報錯 Can't connect to local MySQL server through socket '/var/lib/mysql/mysql.sock' (2)需改sql_alchemy_conn = mysql+mysqldb://airflow:airflow@127.0.0.1:3306/airflow```### 2. 用戶認證```本文采用的用戶認證方式爲password方式,其餘方式如LDAP一樣支持可是本文不會介紹。筆者在安裝時實驗過LDAP方式可是未成功過。安裝passsword組件pip install "apache-airflow[password]"2. 修改 airflow.cfg[webserver]authenticate = Trueauth_backend = airflow.contrib.auth.backends.password_auth3. 在python環境中執行以下代碼以添加帳戶:import airflow from airflow import models, settings from airflow.contrib.auth.backends.password_auth import PasswordUser user = PasswordUser(models.User()) user.username = 'admin' # 用戶名user.email = 'emailExample@163.com' # 用戶郵箱 user.password = 'password' # 用戶密碼session = settings.Session() session.add(user) session.commit() session.close() exit() ```### 3. 配置郵件服務此配置設置的是dag的task失敗或者重試時發送郵件的發送者。配置以下:```[smtp]# If you want airflow to send emails on retries, failure, and you want to use# the airflow.utils.email.send_email_smtp function, you have to configure ansmtp_host = smtp.163.comsmtp_starttls = Truesmtp_ssl = False# Uncomment and set the user/pass settings if you want to use SMTP AUTHsmtp_user = mailExample@163.comsmtp_password = passwordsmtp_port = 25smtp_mail_from = mailExample@163.com接下來簡單把dag的Python代碼列出來,以供參考:default_args = { 'owner': 'ownerExample', 'start_date': datetime(2018, 9, 18), 'email': ['mailReceiver@163.com'], # 出問題時,發送報警Email的地址,能夠填多個,用逗號隔開。 'email_on_failure': ['mailReceiver@163.com'], # 任務失敗且重試次數用完時發送Email。 'email_on_retry': True, # 任務重試時是否發送Email 'depends_on_past': False, # 是否依賴於過去。若是爲True,那麼必需要昨天的DAG執行成功了,今天的DAG才能執行。 'retries': 3, 'retry_delay': timedelta(minutes=3),}```### 四、配置Executor```設置Executor修改:airflow.cfgexecutor = LocalExecutor本文中因爲只有單節點因此使用的是LocalExecutor模式。```### 5. 修改log地址```[core]base_log_folder = /servers/logs/airflow[scheduler]child_process_log_directory = servers/logs/airflow/scheduler```### 6. 修改webserver地址```修改webserver地址[webserver]base_url = http://host:port能夠經過上面配置的地址訪問webserver。```### 7. 可選配置```(可選)修改Scheduler線程數若是調度任務很少的話能夠把線程數調小,默認爲32。參數爲:parallelism(可選)不加載example dag若是不想加載示例dag能夠把load_examples配置改成False,默認爲True。這個配置只有在第一次啓動airflow以前設置纔有效。若是此方法不生效,能夠刪除${PYTHON_HOME}/site-packages/airflow/example_dags目錄,也是一樣的效果。(可選)修改檢測新dag間隔修改min_file_process_interval參數爲10,每10s識別一次新的dag。默認爲0,沒有時間間隔。```## 運行airflow```啓動scheduleairflow scheduler 啓動webserverairflow webserver ```## 安裝問題彙總```1. Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql修改Mysql配置文件my.cnf,具體步驟以下:查找my.cnf文件位置mysql --help | grep my.cnf下圖紅框處爲my.cnf文件所在位置:修改文件explicit_defaults_for_timestamp=true注意:必須寫在【mysqld】下重啓Mysqlsudo systemctl restart mysqld.service查看修改是否生效。執行以下SQL,若是值爲1則爲生效。2. pip install "apache-airflow[mysql]"報錯:mysql_config not found安裝mysql-devel:首先查看是否有mysql_config文件。find / -name mysql_config若是沒有安裝mysql-develyum install mysql-devel安裝以後再次查找,結果如圖:3. 其餘問題找我```## 配置任務在 AirFlow 中,每一個節點都是一個任務,能夠是一條命令行 (BashOperator),能夠是一段 Python 腳本 (PythonOperator) 等等,而後這些節點根據依賴關係構成了一條流程,一個圖,稱爲一個 DAG 。默認會到 ${AIRFLOW_HOME}/dags 目錄下查找,能夠直接在該目錄下建立相應的文件。以下是一個簡單的示例。```import airflowfrom airflow import DAGfrom airflow.operators.bash_operator import BashOperatorfrom airflow.operators.python_operator import PythonOperatorfrom datetime import timedelta, datetimeimport pytz# -------------------------------------------------------------------------------# these args will get passed on to each operator# you can override them on a per-task basis during operator initializationdefault_args = { 'owner': 'qxy', 'depends_on_past': False, 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5),}tz = pytz.timezone('Asia/Shanghai')# naive = datetime.strptime("2018-06-13 17:40:00", "%Y-%m-%d %H:%M:%S")# local_dt = tz.localize(naive, is_dst=None)# utc_dt = local_dt.astimezone(pytz.utc).replace(tzinfo=None)dt = datetime(2019, 7, 16, 16, 30, tzinfo=tz)utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)dag = DAG( 'airflow_interval_test', default_args=default_args, description='airflow_interval_test', schedule_interval='35 17 * * *', start_date=utc_dt)t1 = BashOperator( task_id='sleep', bash_command='sleep 5', dag=dag)t2 = BashOperator( task_id='print_date', bash_command='date', dag=dag)t1 >> t2```該文件建立一個簡單的 DAG,只有三個運算符,兩個 BaseOperator ,也就是執行 Bash 命令分別打印日期以及休眠 5 秒;另外一個爲 PythonOperator 在執行任務時調用 print_hello() 函數。文件建立好後,放置到 ${AIRFLOW_HOME}/dags,airflow 自動讀取該DAG。----- 測試是否正常,若是無報錯那麼就說明正常$ python /tmp/project/dags/hello_world.py