最近幾年裏,Python 已成爲數據科學、機器學習和深度學習領域的一門流行的編程語言。只需再配上查詢語言 SQL 便可完成大多數工做。SQL 很棒,用英語便可發出指令,且只需指示想要什麼,而無需關心具體如何查詢。這使得底層的查詢引擎能夠不改變 SQL 查詢就能對其進行優化。Python 也很棒,它有大量高質量的庫,自己也易於使用。html
做業編排是執行平常任務並使其自動化的行爲。在過去,這一般是經過 CRON 做業完成的。而在最近幾年,愈來愈多的企業開始使用 Apache Airflow 和 Spotify 的 Luigi 等建立更強大的系統。這些工具能夠監控做業、記錄結果並在發生故障時從新運行做業。若是您有興趣,我曾寫過一篇博客文章,其中包括 Airflow 的背景故事,題爲《使用 Airflow 構建數據管道》。前端
做爲數據探索和可視化工具的 Notebooks 在過去幾年中也在數據領域變得很是流行。像 Jupyter Notebook 和 Apache Zeppelin 這樣的工具旨在知足這一需求。Notebooks 不只向您顯示分析結果,還顯示產生這些結果的代碼和查詢。這有利於發現疏忽並可幫助分析師重現彼此的工做。node
Airflow 和 Jupyter Notebook 能夠很好地協同工做,您可使用 Airflow 自動將新數據輸入數據庫,而後數據科學家可使用 Jupyter Notebook 進行分析。python
在這篇博文中,我將安裝一個單節點的 Hadoop,讓 Jupyter Notebook 運行並展現如何建立一個 Airflow 做業,它能夠獲取天氣數據源,將其存儲在 HDFS 上,再轉換爲 ORC 格式,最後導出到 Microsoft Excel 格式的電子表格中。android
我正在使用的機器有一個主頻爲 3.40 GHz 的 Intel Core i5-4670K CPU、12 GB 的 RAM 和 200 GB 的 SSD。我將使用全新安裝的 Ubuntu 16.04.2 LTS,並根據個人博客文章《Hadoop 3:單節點安裝指南》 中的說明構建安裝單節點 Hadoop。ios
接下來將安裝 Ubuntu 上的依賴項。 git 包將用於從 GitHub 獲取天氣數據集,其他三個包是 Python 自己、Python 包安裝程序和 Python 環境隔離工具包。git
$ sudo apt install \
git \
python \
python-pip \
virtualenv
複製代碼
Airflow 將依靠 RabbitMQ 的幫助來跟蹤其做業。下面安裝 Erlang,這是編寫 RabbitMQ 的語言。github
$ echo "deb http://binaries.erlang-solutions.com/debian xenial contrib" | \
sudo tee /etc/apt/sources.list.d/erlang.list
$ wget -O - http://binaries.erlang-solutions.com/debian/erlang_solutions.asc | \
sudo apt-key add -
$ sudo apt update
$ sudo apt install esl-erlang
複製代碼
下面安裝 RabbitMQ。web
$ echo "deb https://dl.bintray.com/rabbitmq/debian xenial main" | \
sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list
$ wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | \
sudo apt-key add -
$ sudo apt update
$ sudo apt install rabbitmq-server
複製代碼
下面將安裝此博文中使用的 Python 上的依賴項和應用程序。redis
$ virtualenv .notebooks
$ source .notebooks/bin/activate
$ pip install \
apache-airflow \
celery \
cryptography \
jupyter \
jupyterthemes \
pyhive \
requests \
xlsxwriter
複製代碼
我將爲 Jupyter 建立一個文件夾來存儲其配置,而後爲服務器設置密碼。若是不設置密碼,您就會得到一個冗長的 URL,其中包含用於訪問 Jupyter 網頁界面的密鑰。每次啓動 Jupyter Notebook 時,密鑰都會更新。
$ mkdir -p ~/.jupyter/
$ jupyter notebook password
複製代碼
Jupyter Notebook 支持用戶界面主題。如下命令將主題設置爲 Chesterish。
$ jt -t chesterish
複製代碼
下面命令列出當前安裝的主題。內置的主題在 GitHub上都有屏幕截圖。
$ jt -l
複製代碼
要返回默認主題,請運行如下命令。
$ jt -r
複製代碼
首先確保您運行着 Hive 的 Metastore、Spark 的 Master & Slaves 服務,以及 Presto 的服務端。如下是啓動這些服務的命令。
$ hive --service metastore &
$ sudo /opt/presto/bin/launcher start
$ sudo /opt/spark/sbin/start-master.sh
$ sudo /opt/spark/sbin/start-slaves.sh
複製代碼
下面將啓動 Jupyter Notebook,以便您能夠與 PySpark 進行交互,PySpark 是 Spark 的基於 Python 的編程接口。
$ PYSPARK_DRIVER_PYTHON=ipython \
PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --ip=0.0.0.0 --NotebookApp.iopub_data_rate_limit=100000000" \
pyspark \
--master spark://ubuntu:7077
複製代碼
請注意,上面的 master 的 URL 以 ubuntu 爲主機名。此主機名是 Spark Master 服務端綁定的主機名。若是沒法鏈接到 Spark,請檢查 Spark Master 服務端的日誌,查找它已選擇綁定的主機名,由於它不接受尋址其餘主機名的鏈接。這可能會使人困惑,由於您一般會指望像 localhost 這樣的主機名不管如何都能正常工做。
運行 Jupyter Notebook 服務後,用下面命令打開網頁界面。
$ open http://localhost:8888/
複製代碼
系統將提示您輸入爲 Jupyter Notebook 設置的密碼。在右上角輸入後,您能夠從下拉列表中建立新的筆記本。咱們感興趣的兩種筆記本類型是 Python 和終端。終端筆記本使用您啓動 Jupyter Notebook 的 UNIX 賬戶爲您提供 shell 訪問權限。而我將使用的是 Python 筆記本。
啓動 Python 筆記本後,將如下代碼粘貼到單元格中,它將經過 Spark 查詢數據。調整查詢以使用您在安裝中建立的數據集。
cab_types = sqlContext.sql(""" SELECT cab_type, COUNT(*) FROM trips_orc GROUP BY cab_type """)
cab_types.take(2)
複製代碼
這就是上面查詢的輸出結果。只返回了一條記錄,包括兩個字段。
[Row(cab_type=u'yellow', count(1)=20000000)]
複製代碼
在前面用來查詢 Spark 的筆記本中,也能夠查詢 Presto。某些 Presto 查詢的性能可能超過 Spark,趁手的是這二者能夠在同一個筆記本中進行切換。在下面的示例中,我使用 Dropbox 的 PyHive 庫來查詢 Presto。
from pyhive import presto
cursor = presto.connect('0.0.0.0').cursor()
cursor.execute('SELECT * FROM trips_orc LIMIT 10')
cursor.fetchall()
複製代碼
這是上述查詢的部分輸出。
[(451221840,
u'CMT',
u'2011-08-23 21:03:34.000',
u'2011-08-23 21:21:49.000',
u'N',
1,
-74.004655,
40.742162,
-73.973489,
40.792922,
...
複製代碼
若是您想在 Jupyter Notebook 中生成數據圖表,能夠看看《在 Jupyter Notebook 中使用 SQLite 可視化數據》這篇博文,由於它有幾個使用 SQL 的繪圖示例,能夠與 Spark 和 Presto 一塊兒使用。
下面將建立一個 ~/airflow 文件夾,設置一個用於存儲在網頁界面上設置的 Airflow 的狀態和配置集的 SQLite 3 數據庫,升級配置模式併爲 Airflow 將要運行的 Python 做業代碼建立一個文件夾。
$ cd ~
$ airflow initdb
$ airflow upgradedb
$ mkdir -p ~/airflow/dags
複製代碼
默認狀況下,Presto、Spark 和 Airflow 的網頁界面都使用 TCP 8080 端口。若是您先啓動了 Spark,Presto 就將沒法啓動。但若是您是在 Presto 以後啓動 Spark,那麼 Presto 將在 8080 上啓動,而 Spark Master 服務端則會使用 8081,若是仍被佔用,會繼續嘗試更高端口,直到它找到一個空閒的端口。以後, Spark 將爲 Spark Worker 的網頁界面選擇更高的端口號。這種重疊一般不是問題,由於在生產設置中這些服務一般存在於不一樣的機器上。
由於此安裝中使用了 8080 - 8082 的 TCP 端口,我將在端口 8083 上啓動 Airflow 的網頁界面。
$ airflow webserver --port=8083 &
複製代碼
我常用如下命令之一來查看正在使用的網絡端口。
$ sudo lsof -OnP | grep LISTEN
$ netstat -tuplen
$ ss -lntu
複製代碼
Airflow 的 Celery 代理和做業結果的存儲都默認使用 MySQL。這裏改成使用 RabbitMQ。
$ vi ~/airflow/airflow.cfg
複製代碼
找到並編輯如下設置。
broker_url = amqp://airflow:airflow@localhost:5672/airflow
celery_result_backend = amqp://airflow:airflow@localhost:5672/airflow
複製代碼
上面使用了 airflow 做爲用戶名和密碼鏈接到 RabbitMQ。帳號密碼能夠隨意自定。
下面將爲 RabbitMQ 配置上述帳號密碼,以便它能訪問 Airflow 虛擬主機。
$ sudo rabbitmqctl add_vhost airflow
$ sudo rabbitmqctl add_user airflow airflow
$ sudo rabbitmqctl set_user_tags airflow administrator
$ sudo rabbitmqctl set_permissions -p airflow airflow ".*" ".*" ".*"
複製代碼
下面將打開 Airflow 網頁界面。
$ open http://localhost:8083/
複製代碼
打開 Airflow 網頁界面後,單擊頂部的 「Admin」 導航菜單,而後選擇 「Connections」。您將看到一長串默認數據庫鏈接。單擊以編輯 Presto 鏈接。 Airflow 鏈接到 Presto 須要進行如下更改。
保存這些更改,而後單擊頂部的 「Data Profiling」 導航菜單,選擇 「Ad Hoc Query」。從查詢框上方的下拉列表中選擇 「presto_default」,您就應該能夠經過 Presto 執行 SQL 代碼了。下面是針對我在安裝中導入的數據集運行的示例查詢。
SELECT count(*)
FROM trips_orc;
複製代碼
能夠將 Airflow DAG 視爲定時執行的做業。在下面的示例中,我將在 GitHub 上獲取 FiveThirtyEight 數據倉庫提供的天氣數據,將其導入 HDFS,將其從 CSV 轉換爲 ORC 並將其從 Presto 導出爲 Microsoft Excel 格式。
如下內容將 FiveThirtyEight 的數據存儲克隆到名爲 data 的本地文件夾中。
$ git clone \
https://github.com/fivethirtyeight/data.git \
~/data
複製代碼
而後我將啓動 Hive 並建立兩個表。一個存數據集的 CSV 格式,另外一個存數據集的 Presto 和 Spark 友好的 ORC 格式。
$ hive
複製代碼
CREATE EXTERNAL TABLE weather_csv (
date_ DATE,
actual_mean_temp SMALLINT,
actual_min_temp SMALLINT,
actual_max_temp SMALLINT,
average_min_temp SMALLINT,
average_max_temp SMALLINT,
record_min_temp SMALLINT,
record_max_temp SMALLINT,
record_min_temp_year INT,
record_max_temp_year INT,
actual_precipitation DECIMAL(18,14),
average_precipitation DECIMAL(18,14),
record_precipitation DECIMAL(18,14)
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/weather_csv/';
CREATE EXTERNAL TABLE weather_orc (
date_ DATE,
actual_mean_temp SMALLINT,
actual_min_temp SMALLINT,
actual_max_temp SMALLINT,
average_min_temp SMALLINT,
average_max_temp SMALLINT,
record_min_temp SMALLINT,
record_max_temp SMALLINT,
record_min_temp_year INT,
record_max_temp_year INT,
actual_precipitation DOUBLE,
average_precipitation DOUBLE,
record_precipitation DOUBLE
) STORED AS orc
LOCATION '/weather_orc/';
複製代碼
下面的 Python 代碼是 Airflow 做業(也稱爲DAG)。每隔 30 分鐘,它將執行如下操做。
在下面的 Python 代碼中有一個指向 CSV 的位置,完整路徑爲 /home/mark/data/us-weather-history/*.csv,請將其中的 「mark」 更換爲您本身的 UNIX 用戶名。
$ vi ~/airflow/dags/weather.py
複製代碼
from datetime import timedelta
import airflow
from airflow.hooks.presto_hook import PrestoHook
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import numpy as np
import pandas as pd
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(0),
'email': ['airflow@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=15),
}
dag = airflow.DAG('weather',
default_args=default_args,
description='將天氣數據複製到 HDFS 並導出爲 Excel',
schedule_interval=timedelta(minutes=30))
cmd = "hdfs dfs -rm /weather_csv/*.csv || true"
remove_csvs_task = BashOperator(task_id='remove_csvs',
bash_command=cmd,
dag=dag)
cmd = """hdfs dfs -copyFromLocal \ /home/mark/data/us-weather-history/*.csv \ /weather_csv/"""
csv_to_hdfs_task = BashOperator(task_id='csv_to_hdfs',
bash_command=cmd,
dag=dag)
cmd = """echo \"INSERT INTO weather_orc SELECT * FROM weather_csv;\" | \ hive"""
csv_to_orc_task = BashOperator(task_id='csv_to_orc',
bash_command=cmd,
dag=dag)
def presto_to_excel(**context):
column_names = [
"date",
"actual_mean_temp",
"actual_min_temp",
"actual_max_temp",
"average_min_temp",
"average_max_temp",
"record_min_temp",
"record_max_temp",
"record_min_temp_year",
"record_max_temp_year",
"actual_precipitation",
"average_precipitation",
"record_precipitation"
]
sql = """SELECT * FROM weather_orc LIMIT 20"""
ph = PrestoHook(catalog='hive',
schema='default',
port=8080)
data = ph.get_records(sql)
df = pd.DataFrame(np.array(data).reshape(20, 13),
columns=column_names)
writer = pd.ExcelWriter('weather.xlsx',
engine='xlsxwriter')
df.to_excel(writer, sheet_name='Sheet1')
writer.save()
return True
presto_to_excel_task = PythonOperator(task_id='presto_to_excel',
provide_context=True,
python_callable=presto_to_excel,
dag=dag)
remove_csvs_task >> csv_to_hdfs_task >> csv_to_orc_task >> presto_to_excel_task
if __name__ == "__main__":
dag.cli()
複製代碼
使用該代碼打開 Airflow 的網頁界面並將主頁底部的 「weather」 DAG 旁邊的開關切換爲 「on」。
調度程序將建立一個做業列表交給 workers 去執行。如下內容將啓動 Airflow 的調度程序服務和一個將完成全部預約做業的 worker。
$ airflow scheduler &
$ airflow worker &
複製代碼
感謝您抽出寶貴時間閱讀這篇文章。我爲北美和歐洲的客戶提供諮詢、架構和實際開發服務。若是您有意探討個人產品將如何幫助您的業務,請經過 LinkedIn 與我聯繫。
若是發現譯文存在錯誤或其餘須要改進的地方,歡迎到 掘金翻譯計劃 對譯文進行修改並 PR,也可得到相應獎勵積分。文章開頭的 本文永久連接 即爲本文在 GitHub 上的 MarkDown 連接。
掘金翻譯計劃 是一個翻譯優質互聯網技術文章的社區,文章來源爲 掘金 上的英文分享文章。內容覆蓋 Android、iOS、前端、後端、區塊鏈、產品、設計、人工智能等領域,想要查看更多優質譯文請持續關注 掘金翻譯計劃、官方微博、知乎專欄。