[譯] Python 與大數據:Airflow、 Jupyter Notebook 與 Hadoop 三、Spark、Presto

最近幾年裏,Python 已成爲數據科學、機器學習和深度學習領域的一門流行的編程語言。只需再配上查詢語言 SQL 便可完成大多數工做。SQL 很棒,用英語便可發出指令,且只需指示想要什麼,而無需關心具體如何查詢。這使得底層的查詢引擎能夠不改變 SQL 查詢就能對其進行優化。Python 也很棒,它有大量高質量的庫,自己也易於使用。html

做業編排是執行平常任務並使其自動化的行爲。在過去,這一般是經過 CRON 做業完成的。而在最近幾年,愈來愈多的企業開始使用 Apache AirflowSpotify 的 Luigi 等建立更強大的系統。這些工具能夠監控做業、記錄結果並在發生故障時從新運行做業。若是您有興趣,我曾寫過一篇博客文章,其中包括 Airflow 的背景故事,題爲《使用 Airflow 構建數據管道》前端

做爲數據探索和可視化工具的 Notebooks 在過去幾年中也在數據領域變得很是流行。像 Jupyter NotebookApache Zeppelin 這樣的工具旨在知足這一需求。Notebooks 不只向您顯示分析結果,還顯示產生這些結果的代碼和查詢。這有利於發現疏忽並可幫助分析師重現彼此的工做。node

Airflow 和 Jupyter Notebook 能夠很好地協同工做,您可使用 Airflow 自動將新數據輸入數據庫,而後數據科學家可使用 Jupyter Notebook 進行分析。python

在這篇博文中,我將安裝一個單節點的 Hadoop,讓 Jupyter Notebook 運行並展現如何建立一個 Airflow 做業,它能夠獲取天氣數據源,將其存儲在 HDFS 上,再轉換爲 ORC 格式,最後導出到 Microsoft Excel 格式的電子表格中。android

我正在使用的機器有一個主頻爲 3.40 GHz 的 Intel Core i5-4670K CPU、12 GB 的 RAM 和 200 GB 的 SSD。我將使用全新安裝的 Ubuntu 16.04.2 LTS,並根據個人博客文章《Hadoop 3:單節點安裝指南》 中的說明構建安裝單節點 Hadoop。ios

安裝依賴項

接下來將安裝 Ubuntu 上的依賴項。 git 包將用於從 GitHub 獲取天氣數據集,其他三個包是 Python 自己、Python 包安裝程序和 Python 環境隔離工具包。git

$ sudo apt install \
    git \
    python \
    python-pip \
    virtualenv
複製代碼

Airflow 將依靠 RabbitMQ 的幫助來跟蹤其做業。下面安裝 Erlang,這是編寫 RabbitMQ 的語言。github

$ echo "deb http://binaries.erlang-solutions.com/debian xenial contrib" | \
    sudo tee /etc/apt/sources.list.d/erlang.list
$ wget -O - http://binaries.erlang-solutions.com/debian/erlang_solutions.asc | \
    sudo apt-key add -
$ sudo apt update
$ sudo apt install esl-erlang
複製代碼

下面安裝 RabbitMQ。web

$ echo "deb https://dl.bintray.com/rabbitmq/debian xenial main" | \
    sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list
$ wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | \
    sudo apt-key add -
$ sudo apt update
$ sudo apt install rabbitmq-server
複製代碼

下面將安裝此博文中使用的 Python 上的依賴項和應用程序。redis

$ virtualenv .notebooks
$ source .notebooks/bin/activate
$ pip install \
    apache-airflow \
    celery \
    cryptography \
    jupyter \
    jupyterthemes \
    pyhive \
    requests \
    xlsxwriter
複製代碼

配置 Jupyter Notebook

我將爲 Jupyter 建立一個文件夾來存儲其配置,而後爲服務器設置密碼。若是不設置密碼,您就會得到一個冗長的 URL,其中包含用於訪問 Jupyter 網頁界面的密鑰。每次啓動 Jupyter Notebook 時,密鑰都會更新。

$ mkdir -p ~/.jupyter/
$ jupyter notebook password
複製代碼

Jupyter Notebook 支持用戶界面主題。如下命令將主題設置爲 Chesterish

$ jt -t chesterish
複製代碼

下面命令列出當前安裝的主題。內置的主題在 GitHub上都有屏幕截圖

$ jt -l
複製代碼

要返回默認主題,請運行如下命令。

$ jt -r
複製代碼

經過 Jupyter Notebook 查詢 Spark

首先確保您運行着 Hive 的 Metastore、Spark 的 Master & Slaves 服務,以及 Presto 的服務端。如下是啓動這些服務的命令。

$ hive --service metastore &
$ sudo /opt/presto/bin/launcher start
$ sudo /opt/spark/sbin/start-master.sh
$ sudo /opt/spark/sbin/start-slaves.sh
複製代碼

下面將啓動 Jupyter Notebook,以便您能夠與 PySpark 進行交互,PySpark 是 Spark 的基於 Python 的編程接口。

$ PYSPARK_DRIVER_PYTHON=ipython \
    PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --ip=0.0.0.0 --NotebookApp.iopub_data_rate_limit=100000000" \
    pyspark \
    --master spark://ubuntu:7077
複製代碼

請注意,上面的 master 的 URL 以 ubuntu 爲主機名。此主機名是 Spark Master 服務端綁定的主機名。若是沒法鏈接到 Spark,請檢查 Spark Master 服務端的日誌,查找它已選擇綁定的主機名,由於它不接受尋址其餘主機名的鏈接。這可能會使人困惑,由於您一般會指望像 localhost 這樣的主機名不管如何都能正常工做。

運行 Jupyter Notebook 服務後,用下面命令打開網頁界面。

$ open http://localhost:8888/
複製代碼

系統將提示您輸入爲 Jupyter Notebook 設置的密碼。在右上角輸入後,您能夠從下拉列表中建立新的筆記本。咱們感興趣的兩種筆記本類型是 Python 和終端。終端筆記本使用您啓動 Jupyter Notebook 的 UNIX 賬戶爲您提供 shell 訪問權限。而我將使用的是 Python 筆記本。

啓動 Python 筆記本後,將如下代碼粘貼到單元格中,它將經過 Spark 查詢數據。調整查詢以使用您在安裝中建立的數據集。

cab_types = sqlContext.sql(""" SELECT cab_type, COUNT(*) FROM trips_orc GROUP BY cab_type """)

cab_types.take(2)
複製代碼

這就是上面查詢的輸出結果。只返回了一條記錄,包括兩個字段。

[Row(cab_type=u'yellow', count(1)=20000000)]
複製代碼

經過 Jupyter Notebook 查詢 Presto

在前面用來查詢 Spark 的筆記本中,也能夠查詢 Presto。某些 Presto 查詢的性能可能超過 Spark,趁手的是這二者能夠在同一個筆記本中進行切換。在下面的示例中,我使用 Dropbox 的 PyHive 庫來查詢 Presto。

from pyhive import presto

cursor = presto.connect('0.0.0.0').cursor()
cursor.execute('SELECT * FROM trips_orc LIMIT 10')
cursor.fetchall()
複製代碼

這是上述查詢的部分輸出。

[(451221840,
  u'CMT',
  u'2011-08-23 21:03:34.000',
  u'2011-08-23 21:21:49.000',
  u'N',
  1,
  -74.004655,
  40.742162,
  -73.973489,
  40.792922,
...
複製代碼

若是您想在 Jupyter Notebook 中生成數據圖表,能夠看看《在 Jupyter Notebook 中使用 SQLite 可視化數據》這篇博文,由於它有幾個使用 SQL 的繪圖示例,能夠與 Spark 和 Presto 一塊兒使用。

啓動 Airflow

下面將建立一個 ~/airflow 文件夾,設置一個用於存儲在網頁界面上設置的 Airflow 的狀態和配置集的 SQLite 3 數據庫,升級配置模式併爲 Airflow 將要運行的 Python 做業代碼建立一個文件夾。

$ cd ~
$ airflow initdb
$ airflow upgradedb
$ mkdir -p ~/airflow/dags
複製代碼

默認狀況下,Presto、Spark 和 Airflow 的網頁界面都使用 TCP 8080 端口。若是您先啓動了 Spark,Presto 就將沒法啓動。但若是您是在 Presto 以後啓動 Spark,那麼 Presto 將在 8080 上啓動,而 Spark Master 服務端則會使用 8081,若是仍被佔用,會繼續嘗試更高端口,直到它找到一個空閒的端口。以後, Spark 將爲 Spark Worker 的網頁界面選擇更高的端口號。這種重疊一般不是問題,由於在生產設置中這些服務一般存在於不一樣的機器上。

由於此安裝中使用了 8080 - 8082 的 TCP 端口,我將在端口 8083 上啓動 Airflow 的網頁界面。

$ airflow webserver --port=8083 &
複製代碼

我常用如下命令之一來查看正在使用的網絡端口。

$ sudo lsof -OnP | grep LISTEN
$ netstat -tuplen
$ ss -lntu
複製代碼

Airflow 的 Celery 代理和做業結果的存儲都默認使用 MySQL。這裏改成使用 RabbitMQ。

$ vi ~/airflow/airflow.cfg
複製代碼

找到並編輯如下設置。

broker_url = amqp://airflow:airflow@localhost:5672/airflow

celery_result_backend = amqp://airflow:airflow@localhost:5672/airflow
複製代碼

上面使用了 airflow 做爲用戶名和密碼鏈接到 RabbitMQ。帳號密碼能夠隨意自定。

下面將爲 RabbitMQ 配置上述帳號密碼,以便它能訪問 Airflow 虛擬主機。

$ sudo rabbitmqctl add_vhost airflow
$ sudo rabbitmqctl add_user airflow airflow
$ sudo rabbitmqctl set_user_tags airflow administrator
$ sudo rabbitmqctl set_permissions -p airflow airflow ".*" ".*" ".*"
複製代碼

將 Airflow 鏈接到 Presto

下面將打開 Airflow 網頁界面。

$ open http://localhost:8083/
複製代碼

打開 Airflow 網頁界面後,單擊頂部的 「Admin」 導航菜單,而後選擇 「Connections」。您將看到一長串默認數據庫鏈接。單擊以編輯 Presto 鏈接。 Airflow 鏈接到 Presto 須要進行如下更改。

  • 將 schema 從 hive 改成 default。
  • 將端口從 3400 改成 8080。

保存這些更改,而後單擊頂部的 「Data Profiling」 導航菜單,選擇 「Ad Hoc Query」。從查詢框上方的下拉列表中選擇 「presto_default」,您就應該能夠經過 Presto 執行 SQL 代碼了。下面是針對我在安裝中導入的數據集運行的示例查詢。

SELECT count(*)
FROM trips_orc;
複製代碼

下載天氣數據集

能夠將 Airflow DAG 視爲定時執行的做業。在下面的示例中,我將在 GitHub 上獲取 FiveThirtyEight 數據倉庫提供的天氣數據,將其導入 HDFS,將其從 CSV 轉換爲 ORC 並將其從 Presto 導出爲 Microsoft Excel 格式。

如下內容將 FiveThirtyEight 的數據存儲克隆到名爲 data 的本地文件夾中。

$ git clone \
    https://github.com/fivethirtyeight/data.git \
    ~/data
複製代碼

而後我將啓動 Hive 並建立兩個表。一個存數據集的 CSV 格式,另外一個存數據集的 Presto 和 Spark 友好的 ORC 格式。

$ hive
複製代碼
CREATE EXTERNAL TABLE weather_csv (
    date_                 DATE,
    actual_mean_temp      SMALLINT,
    actual_min_temp       SMALLINT,
    actual_max_temp       SMALLINT,
    average_min_temp      SMALLINT,
    average_max_temp      SMALLINT,
    record_min_temp       SMALLINT,
    record_max_temp       SMALLINT,
    record_min_temp_year  INT,
    record_max_temp_year  INT,
    actual_precipitation  DECIMAL(18,14),
    average_precipitation DECIMAL(18,14),
    record_precipitation  DECIMAL(18,14)
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  LOCATION '/weather_csv/';

CREATE EXTERNAL TABLE weather_orc (
    date_                 DATE,
    actual_mean_temp      SMALLINT,
    actual_min_temp       SMALLINT,
    actual_max_temp       SMALLINT,
    average_min_temp      SMALLINT,
    average_max_temp      SMALLINT,
    record_min_temp       SMALLINT,
    record_max_temp       SMALLINT,
    record_min_temp_year  INT,
    record_max_temp_year  INT,
    actual_precipitation  DOUBLE,
    average_precipitation DOUBLE,
    record_precipitation  DOUBLE
) STORED AS orc
  LOCATION '/weather_orc/';
複製代碼

建立 Airflow DAG

下面的 Python 代碼是 Airflow 做業(也稱爲DAG)。每隔 30 分鐘,它將執行如下操做。

  • 清除 HDFS上 /weather_csv/ 文件夾中的任何現有數據。
  • 將 ~/data 文件夾中的 CSV 文件複製到 HDFS 上的 /weather_csv/ 文件夾中。
  • 使用 Hive 將 HDFS 上的 CSV 數據轉換爲 ORC 格式。
  • 使用 Presto 將 ORC 格式的數據導出爲 Microsoft Excel 2013 格式。

在下面的 Python 代碼中有一個指向 CSV 的位置,完整路徑爲 /home/mark/data/us-weather-history/*.csv,請將其中的 「mark」 更換爲您本身的 UNIX 用戶名。

$ vi ~/airflow/dags/weather.py
複製代碼
from datetime import timedelta

import airflow
from   airflow.hooks.presto_hook         import PrestoHook
from   airflow.operators.bash_operator   import BashOperator
from   airflow.operators.python_operator import PythonOperator
import numpy  as np
import pandas as pd


default_args = {
    'owner':            'airflow',
    'depends_on_past':  False,
    'start_date':       airflow.utils.dates.days_ago(0),
    'email':            ['airflow@example.com'],
    'email_on_failure': True,
    'email_on_retry':   False,
    'retries':          3,
    'retry_delay':      timedelta(minutes=15),
}

dag = airflow.DAG('weather',
                  default_args=default_args,
                  description='將天氣數據複製到 HDFS 並導出爲 Excel',
                  schedule_interval=timedelta(minutes=30))

cmd = "hdfs dfs -rm /weather_csv/*.csv || true"
remove_csvs_task = BashOperator(task_id='remove_csvs',
                                bash_command=cmd,
                                dag=dag)

cmd = """hdfs dfs -copyFromLocal \ /home/mark/data/us-weather-history/*.csv \ /weather_csv/"""
csv_to_hdfs_task = BashOperator(task_id='csv_to_hdfs',
                                bash_command=cmd,
                                dag=dag)

cmd = """echo \"INSERT INTO weather_orc SELECT * FROM weather_csv;\" | \ hive"""
csv_to_orc_task = BashOperator(task_id='csv_to_orc',
                               bash_command=cmd,
                               dag=dag)


def presto_to_excel(**context):
    column_names = [
        "date",
        "actual_mean_temp",
        "actual_min_temp",
        "actual_max_temp",
        "average_min_temp",
        "average_max_temp",
        "record_min_temp",
        "record_max_temp",
        "record_min_temp_year",
        "record_max_temp_year",
        "actual_precipitation",
        "average_precipitation",
        "record_precipitation"
    ]

    sql = """SELECT * FROM weather_orc LIMIT 20"""

    ph = PrestoHook(catalog='hive',
                    schema='default',
                    port=8080)
    data = ph.get_records(sql)

    df = pd.DataFrame(np.array(data).reshape(20, 13),
                      columns=column_names)

    writer = pd.ExcelWriter('weather.xlsx',
                            engine='xlsxwriter')
    df.to_excel(writer, sheet_name='Sheet1')
    writer.save()

    return True

presto_to_excel_task = PythonOperator(task_id='presto_to_excel',
                                      provide_context=True,
                                      python_callable=presto_to_excel,
                                      dag=dag)

remove_csvs_task >> csv_to_hdfs_task >> csv_to_orc_task >> presto_to_excel_task

if __name__ == "__main__":
    dag.cli()
複製代碼

使用該代碼打開 Airflow 的網頁界面並將主頁底部的 「weather」 DAG 旁邊的開關切換爲 「on」。

調度程序將建立一個做業列表交給 workers 去執行。如下內容將啓動 Airflow 的調度程序服務和一個將完成全部預約做業的 worker。

$ airflow scheduler &
$ airflow worker &
複製代碼

感謝您抽出寶貴時間閱讀這篇文章。我爲北美和歐洲的客戶提供諮詢、架構和實際開發服務。若是您有意探討個人產品將如何幫助您的業務,請經過 LinkedIn 與我聯繫。

若是發現譯文存在錯誤或其餘須要改進的地方,歡迎到 掘金翻譯計劃 對譯文進行修改並 PR,也可得到相應獎勵積分。文章開頭的 本文永久連接 即爲本文在 GitHub 上的 MarkDown 連接。


掘金翻譯計劃 是一個翻譯優質互聯網技術文章的社區,文章來源爲 掘金 上的英文分享文章。內容覆蓋 AndroidiOS前端後端區塊鏈產品設計人工智能等領域,想要查看更多優質譯文請持續關注 掘金翻譯計劃官方微博知乎專欄

相關文章
相關標籤/搜索