新聞信息是經過爬蟲獲取,使用scrapy框架進行爬蟲任務;使用airflow工做流監控平臺對爬蟲任務進行管理、監控(可以使用CeleryExecutor分佈式,也可以使用LocalExecutor多進程進行數據採集)。如下主要是對airflow的安裝和配置。php
目前使用的系統環境爲Centos Linux release 7.4.1708 (core)
,linux
版本的內核Linux version 3.10.0-693.2.2e17.x86_64
.html
ip 地址:python
47.104.191.52
172.31.178.92
下載地址1(官方網站)mysql
下載地址2(清華開源鏡像)linux
下載對應版本安裝文件web
將下載的文件上傳到Linux系統中 /opt
redis
一、執行命令安裝sql
cd /opt
數據庫
sh Anaconda3-5.2.0-Linux-x86_64.sh
(按回車鍵,直到出現>>> 輸入yes)apache
/opt/anaconda3
(安裝目錄)
二、配置環境變量
echo "export PATH=/opt/anaconda3/bin:$PATH" >> /etc/profile
source /etc/profile
mysql做爲airflow數據庫,主要是記錄airflow信息;
redis做爲celery的broker和backend(也能夠用RabbitMQ),若是不使用CeleryExecutor則不須要redis配置。
經過anaconda
安裝虛擬環境news_push
/opt/anaconda3/bin/conda create -y --name news_push python=3.6.5
airflow安裝、配置
激活虛擬環境news_push
source activate news_push
經過pip安裝airflow
pip install apache-airflow
配置airflow目錄(先建立/opt/NewsPush項目目錄)
echo "export AIRFLOW_HOME=/opt/NewsPush/airflow >> /etc/profile"
source /etc/profile
初始化數據庫
airflow initdb
啓動airflow
airflow webserver -p 5556
可到瀏覽器查看http://ip:5556/admin/
配置airflow
-更改數據庫爲mysql
修改mysql配置文件參數(/etc/my.cnf),並重啓mysql
explicit_defaults_for_timestamp=true
登陸mysql
mysql -uroot -p
回車後輸入密碼
新建用戶airflow
create user 'airflow'@'localhost' identified by 'airflow';
建立數據庫airflow
create database airflow;
賦予權限
grant all privileges on airflow.* to 'airflow'@'%' identified by 'airflow';
flush privileges;
修改airflow配置文件
vim /opt/NewsPush/airflow/airflow.cfg
修改內容爲:
executor = CeleryExecutor
sql_alchemy_conn=mysql://ariflow:airflow@localhost:3306/ariflow
load_examples = False
endpoint_url = http://localhost:5556
base_url = http://localhost:5556
web_server_port = 5556
broker_url = redis://172.31.178.92:6379/3
celery_result_backend = redis://172.31.178.92:6379/4
flower_port = 5557
複製代碼
安裝celery支持及celeryde redis組件
pip install airflow[celery]
pip install celery[redis]
安裝MySQL-python
yum install MySQL-python
pip install PyMySQL==0.7.1
若是PyMySQL版本爲0.8.0或以上則會有警告:
/opt/anaconda3/envs/news_push/lib/python3.6/site-packages/pymysql/cursors.py:170: Warning: (1300, "Invalid utf8mb4 chara result = self._query(query) 複製代碼
再次初始化
airflow initdb
錯誤解決
錯誤信息
Traceback (most recent call last):
File "/opt/anaconda3/envs/news_push/bin/airflow", line 17, in <module>
from airflow import configuration
File "/opt/anaconda3/envs/news_push/lib/python3.6/site-packages/airflow/__init__.py", line 30, in <module>
from airflow import settings
File "/opt/anaconda3/envs/news_push/lib/python3.6/site-packages/airflow/settings.py", line 159, in <module>
configure_orm()
File "/opt/anaconda3/envs/news_push/lib/python3.6/site-packages/airflow/settings.py", line 147, in configure_orm
engine = create_engine(SQL_ALCHEMY_CONN, **engine_args)
File "/opt/anaconda3/envs/news_push/lib/python3.6/site-packages/sqlalchemy/engine/__init__.py", line 424, in create_engine
return strategy.create(*args, **kwargs)
File "/opt/anaconda3/envs/news_push/lib/python3.6/site-packages/sqlalchemy/engine/strategies.py", line 81, in create
dbapi = dialect_cls.dbapi(**dbapi_args)
File "/opt/anaconda3/envs/news_push/lib/python3.6/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 102, in dbapi
return __import__('MySQLdb')
ModuleNotFoundError: No module named 'MySQLdb'
複製代碼
解決(MySQLdb對python3.*支持)
vim /opt/anaconda3/envs/news_push/lib/python3.6/site-packages/sqlalchemy/dialects/mysql/mysqldb.py
(最後一行錯誤信息.py文件路徑)
在代碼開頭增長
import pymysql
pymysql.install_as_MySQLdb()
複製代碼
再次初始化
airflow initdb
airflow啓動及測試
建立一個dag(/opt/NewsPush/airflow/dags/hello_world.py)
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'start_date': days_ago(1) #必須設置,儘可能用固定時間,若是使用動態的當前時間會有意想不到的問題。任務會先執行一次,再根據起始時間和schedule_interval設置開始執行
}
dag = DAG(
'example_hello_world_dag',
default_args=default_args,
description='my first DAG',
# schedule_interval=timedelta(days=1)
schedule_interval='0 */1 * * *' #每一個小時執行一次
)
def print_hello():
return 'Hello World!'
hello_operator = PythonOperator(
task_id='hello_task',
python_callable=print_hello,
dag=dag
)
複製代碼
airflow啓動
如下命令都是單獨開啓一個窗口來啓動,便於觀察日誌(也能夠在後臺啓動)
注意:celery worker啓動儘可能不要用root用戶啓動,若是要用root用戶啓動則添加環境變量。
用其餘用戶啓動則airflow啓動命令也對應用用戶啓動,並更改項目目錄權限屬於此用戶,不然日誌記錄時沒有權限會影響worker運行。
echo export C_FORCE_ROOT= true >> /etc/profile
source /etc/profile
複製代碼
airflow webserver #啓動airflow web頁面
airflow scheduler #啓動調度器,執行任務調度,不過任務默認是關閉的,須要在頁面手動開啓
airflow worker #啓動celery workd
airflow flower #啓動flower監控頁面
複製代碼
linux添加用戶、用戶組、密碼
groupadd airflow #添加用戶組airflow
useradd -g airflow airflow #添加用airflow到用戶組airflow
passwd airflow #設置密碼
複製代碼
更改項目目錄權限爲啓動用戶(airflow)權限
chowm -R airflow:airflow /opt/NewsPush/
airflow 瀏覽器訪問地址:http://47.104.191.52:5556/admin
flower 瀏覽器訪問地址:http://47.104.191.52:5557/