Airflow安裝部署

Airflow安裝部署

新聞信息是經過爬蟲獲取,使用scrapy框架進行爬蟲任務;使用airflow工做流監控平臺對爬蟲任務進行管理、監控(可以使用CeleryExecutor分佈式,也可以使用LocalExecutor多進程進行數據採集)。如下主要是對airflow的安裝和配置。php

1.系統環境y

目前使用的系統環境爲Centos Linux release 7.4.1708 (core),linux版本的內核Linux version 3.10.0-693.2.2e17.x86_64.html

ip 地址:python

  • 外網:47.104.191.52
  • 內網:172.31.178.92

2.準備python環境,安裝Anaconda

2.1下載安裝文件

下載地址1(官方網站)mysql

下載地址2(清華開源鏡像)linux

下載對應版本安裝文件web

2.2上傳安裝文件,開始安裝

將下載的文件上傳到Linux系統中 /optredis

一、執行命令安裝sql

cd /opt數據庫

sh Anaconda3-5.2.0-Linux-x86_64.sh (按回車鍵,直到出現>>> 輸入yes)apache

/opt/anaconda3 (安裝目錄)

二、配置環境變量

echo "export PATH=/opt/anaconda3/bin:$PATH" >> /etc/profile

source /etc/profile

3.安裝mysql (供airflow使用)、redis

mysql做爲airflow數據庫,主要是記錄airflow信息;

redis做爲celery的broker和backend(也能夠用RabbitMQ),若是不使用CeleryExecutor則不須要redis配置。

4.安裝配置airflow

  1. 經過anaconda安裝虛擬環境news_push

    /opt/anaconda3/bin/conda create -y --name news_push python=3.6.5

  2. airflow安裝、配置

    • 激活虛擬環境news_push

      source activate news_push

    • 經過pip安裝airflow

      pip install apache-airflow

    • 配置airflow目錄(先建立/opt/NewsPush項目目錄)

      echo "export AIRFLOW_HOME=/opt/NewsPush/airflow >> /etc/profile"

      source /etc/profile

    • 初始化數據庫

      airflow initdb

    • 啓動airflow

      airflow webserver -p 5556

      可到瀏覽器查看http://ip:5556/admin/

    • 配置airflow-更改數據庫爲mysql

      • 修改mysql配置文件參數(/etc/my.cnf),並重啓mysql

        explicit_defaults_for_timestamp=true

      • 登陸mysql

        mysql -uroot -p 回車後輸入密碼

      • 新建用戶airflow

        create user 'airflow'@'localhost' identified by 'airflow';

      • 建立數據庫airflow

        create database airflow;

      • 賦予權限

        grant all privileges on airflow.* to 'airflow'@'%' identified by 'airflow';

        flush privileges;

    • 修改airflow配置文件

      vim /opt/NewsPush/airflow/airflow.cfg

      修改內容爲:

      executor = CeleryExecutor
      sql_alchemy_conn=mysql://ariflow:airflow@localhost:3306/ariflow
      load_examples = False
      endpoint_url = http://localhost:5556
      base_url = http://localhost:5556
      web_server_port = 5556
      broker_url = redis://172.31.178.92:6379/3
      celery_result_backend = redis://172.31.178.92:6379/4
      flower_port = 5557
      複製代碼
    • 安裝celery支持及celeryde redis組件

      pip install airflow[celery]

      pip install celery[redis]

    • 安裝MySQL-python

      yum install MySQL-python

      pip install PyMySQL==0.7.1

      若是PyMySQL版本爲0.8.0或以上則會有警告:

      /opt/anaconda3/envs/news_push/lib/python3.6/site-packages/pymysql/cursors.py:170: Warning: (1300, "Invalid utf8mb4 chara result = self._query(query) 複製代碼
    • 再次初始化

      airflow initdb

    • 錯誤解決

      • 錯誤信息

        Traceback (most recent call last):
          File "/opt/anaconda3/envs/news_push/bin/airflow", line 17, in <module>
            from airflow import configuration
          File "/opt/anaconda3/envs/news_push/lib/python3.6/site-packages/airflow/__init__.py", line 30, in <module>
            from airflow import settings
          File "/opt/anaconda3/envs/news_push/lib/python3.6/site-packages/airflow/settings.py", line 159, in <module>
            configure_orm()
          File "/opt/anaconda3/envs/news_push/lib/python3.6/site-packages/airflow/settings.py", line 147, in configure_orm
            engine = create_engine(SQL_ALCHEMY_CONN, **engine_args)
          File "/opt/anaconda3/envs/news_push/lib/python3.6/site-packages/sqlalchemy/engine/__init__.py", line 424, in create_engine
            return strategy.create(*args, **kwargs)
          File "/opt/anaconda3/envs/news_push/lib/python3.6/site-packages/sqlalchemy/engine/strategies.py", line 81, in create
            dbapi = dialect_cls.dbapi(**dbapi_args)
          File "/opt/anaconda3/envs/news_push/lib/python3.6/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 102, in dbapi
            return __import__('MySQLdb')
        ModuleNotFoundError: No module named 'MySQLdb'
        
        複製代碼
      • 解決(MySQLdb對python3.*支持)

        vim /opt/anaconda3/envs/news_push/lib/python3.6/site-packages/sqlalchemy/dialects/mysql/mysqldb.py (最後一行錯誤信息.py文件路徑)

        在代碼開頭增長

        import pymysql
        pymysql.install_as_MySQLdb()
        複製代碼
    • 再次初始化

      airflow initdb

  3. airflow啓動及測試

    • 建立一個dag(/opt/NewsPush/airflow/dags/hello_world.py)

      from airflow import DAG
      from airflow.utils.dates import days_ago
      from airflow.operators.bash_operator import BashOperator
      from airflow.operators.python_operator import PythonOperator
      
      default_args = {
          'owner': 'airflow',
          'start_date': days_ago(1) #必須設置,儘可能用固定時間,若是使用動態的當前時間會有意想不到的問題。任務會先執行一次,再根據起始時間和schedule_interval設置開始執行
      }
      
      dag = DAG(
          'example_hello_world_dag',
          default_args=default_args,
          description='my first DAG',
          # schedule_interval=timedelta(days=1)
          schedule_interval='0 */1 * * *'	#每一個小時執行一次
      )
      
      def print_hello():
          return 'Hello World!'
      
      hello_operator = PythonOperator(
          task_id='hello_task',
          python_callable=print_hello,
          dag=dag
      )
      複製代碼
    • airflow啓動

      如下命令都是單獨開啓一個窗口來啓動,便於觀察日誌(也能夠在後臺啓動)

      注意:celery worker啓動儘可能不要用root用戶啓動,若是要用root用戶啓動則添加環境變量。

      用其餘用戶啓動則airflow啓動命令也對應用用戶啓動,並更改項目目錄權限屬於此用戶,不然日誌記錄時沒有權限會影響worker運行。

      echo export C_FORCE_ROOT= true >> /etc/profile
      source /etc/profile
      複製代碼
      airflow webserver #啓動airflow web頁面
      airflow scheduler #啓動調度器,執行任務調度,不過任務默認是關閉的,須要在頁面手動開啓
      airflow worker #啓動celery workd
      airflow flower #啓動flower監控頁面
      複製代碼

      linux添加用戶、用戶組、密碼

      groupadd airflow #添加用戶組airflow
      useradd -g airflow airflow #添加用airflow到用戶組airflow
      passwd airflow #設置密碼
      複製代碼

      更改項目目錄權限爲啓動用戶(airflow)權限

      chowm -R airflow:airflow /opt/NewsPush/

      airflow 瀏覽器訪問地址:http://47.104.191.52:5556/admin

      flower 瀏覽器訪問地址:http://47.104.191.52:5557/

參考資料

Airflow使用

Airflow安裝啓動

Airflow框架下支持celery的問題

相關文章
相關標籤/搜索