在 mac 上創建 Python 的 Kafka 與 Spark 環境

Introduction

「Producer-Consumer」問題是資工系很經常使用來解釋訊息交換的一種範例,用生產者跟消費者間的關係來描述訊息的傳遞。生產者負責產生資料並放在有限或是無限的緩衝區讓等待消費者來處理。串流資料(Streaming Data)本質上就是一端不斷的丟出資料,另外一端須要持續地進行處理,就像 Producer-Consumer 同樣。java

Kafka 是近期一個用來處理串流資料的熱們框架,其概念的示意圖以下:python

Imgur

接到串流資料以後咱們能夠搭配 Spark 進行處理:macos

Imgur

串起來以後,一個很典型的應用就像這樣:bootstrap

Imgur

Kafka 是什麼?

Kafka 是由 Apache 軟體基金會開發的一個開源流處理平臺,目標是爲處理即時資料提供一個統1、高吞吐、低延遲的平臺。其持久化層本質上是一個「按照分散式事務紀錄檔架構的大規模發佈訂閱訊息佇列」。bash

相關的術語有:架構

  • Topic:用來對訊息進行分類,每一個進入到 Kafka 的資訊都會被放到一個 Topic 下
  • Broker:用來實現資料儲存的主機伺服器
  • Partition:每一個 Topic 中的訊息會被分爲若干個 Partition ,以提升訊息的處理效率

Imgur

Spark/PySpark 是什麼?

Apache Spark 是一個延伸於 Hadoop MapReduce 的開源叢集運算框架,Spark 使用了記憶體內運算技術,能在資料還沒有寫入硬碟時即在記憶體內分析運算。簡單來講就是一個利用分散式架構的資料運算工具,其主要的運算也是利用 Map - Reduce 的 運算邏輯。 Spark 本來是用 Java/Scala 作開發,PySpark 是一個封裝後的介面,適合 Python 開發者使用。app

環境準備

  1. kafka + zookeeper

咱們直接用 brew kafka 比較方便,安裝以後再到資料夾中確認使用設定檔都存在。這邊能夠補充一下 ZooKeeper 是一個 Hadoop 的分散式資源管理工具,kafka 也能夠用它來作資源的管理與分配。若是直接用 brew 安裝的話,zookeeper 也會一併安裝。框架

$ brew install kafka
$ ls /usr/local/etc/kafka # kafka 跟 zookeeper 設定檔會放在此資料夾下
connect-console-sink.properties consumer.properties
connect-console-source.properties log4j.properties
connect-distributed.properties producer.properties
connect-file-sink.properties server.properties
connect-file-source.properties tools-log4j.properties
connect-log4j.properties trogdor.conf
connect-standalone.properties zookeeper.properties
複製代碼
  1. PySpark
$ pip install pyspark
$ pyspark
Welcome to
      ____ __
     / __/__ ___ _____/ /__
    _\ \/ _ \/ _ `/ __/ '_/    /__ / .__/\_,_/_/ /_/\_\ version 2.4.0       /_/ Using Python version 3.5.2 (default, Nov 30 2016 12:41:46) SparkSession available as 'spark'. 複製代碼

若是有出現找不到 SPARK_HOME 的錯誤訊息,在另外要配置一下的環境變數wordpress

# Could not find valid SPARK_HOME while searching
# ['/home/user', '/home/user/.local/bin']

export PYSPARK_PYTHON=python3
export SPARK_HOME=/Users/wei/Envs/py3dev/lib/python3.5/site-packages/pyspark
複製代碼
  1. JAVA8
$ brew cask install java8
$ java -version
java version "1.8.0_111"
Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)
複製代碼

另外要配置一下 JAVA_HOME 的環境變數工具

export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)
複製代碼

Workflow

⓪ 先切換到 kafka 安裝的目錄下

cd /usr/local/Cellar/kafka/_________(版本號)
複製代碼

① 啓動 zookeeper 服務(須要長駐執行在背景

./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
複製代碼

② 啓動 kafka 服務(須要長駐執行在背景

./bin/kafka-server-start /usr/local/etc/kafka/server.properties
複製代碼

③ 建立/查看 Topic

# 建一個名爲 test-kafka 的 Topic
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-kafka

# 查看目前已經創建過的 Topic
./bin/kafka-topics --list --zookeeper localhost:2181\n\n
複製代碼

Streaming Data

產生與接收串流資料的方法有三種,可使用 kafka-console 指令、kafka-python 套件,也可使用 PySpark 來處,如下示範簡單的程式碼:

kafka-console

① Producer

使用指令(kafka-console-producer)產生串流資料到特定的 Topic

$ ./bin/kafka-console-producer --broker-list localhost:9092 --topic test-kafka
複製代碼

② Costomer

使用指令(kafka-console-consumer)從特定的 Topic 接收串流資料

$ ./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test-kafka --from-beginning
複製代碼

kafka-python

① Producer

from kafka import KafkaProducer
import time

brokers, topic = 'localhost:9092', 'test-kafka'

def start():
    while True:
        print(" --- produce ---")
        time.sleep(10)
        producer.send('topic', key=b'foo', value=b'bar')
        producer.flush()
 
 
if __name__ == '__main__':
    producer = KafkaProducer(bootstrap_servers=brokers)
    start()
    producer.close()

複製代碼

執行:

$ python kafka-producer.py
複製代碼

② Costomer

from kafka import KafkaConsumer

brokers, topic = 'localhost:9092', 'test-kafka'

if __name__ == '__main__':
  consumer = KafkaConsumer(topic, group_id='test-consumer-group', bootstrap_servers=[brokers])
  for msg in consumer:
    print("key=%s, value=%s" % (msg.key, msg.value))

複製代碼

執行:

$ python kafka-costomer.py
複製代碼

PySpark

① Costomer

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

brokers, topic = 'localhost:9092', 'test-kafka'

if __name__ == "__main__":
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 10)
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" "))
    ssc.start()
    ssc.awaitTermination()

複製代碼

執行 pyspark 的執行要用 spark-submit 指令,並且要加上 kafka 的配置檔 (因此要在同目錄下下載 spark-streaming-kafka-0-8-assembly_2.11-2.1.0.jar 配置檔)

$ spark-submit --jars spark-streaming-kafka-0-8-assembly_2.11-2.1.0.jar pyspark-consumer.py
複製代碼

Connect Producer & Costomer

能夠利用以上不一樣的 Producer 方法對同一個 topic 發送訊息,而後用 Costomer 的方法來接收!

小結

本系列預計會有三篇文章,把一個串流資料歷來源到資料庫的過程走過一次:

Reference

  1. mac下kafka環境搭建測試
  2. 用pip 在macOS 上安裝單機使用的pyspark
  3. What should I set JAVA_HOME to on OSX
  4. Running pyspark after pip install pyspark
  5. Getting Streaming data from Kafka with Spark Streaming using Python.

License

本著做由Chang Wei-Yaun (v123582)製做, 以創用CC 姓名標示-相同方式分享 3.0 Unported受權條款釋出。

相關文章
相關標籤/搜索