[Spark] 05 - Apache Kafka

前言


Ref: kafka中文教程html

做爲消息中間件,其餘組件先跟Kafka交流,而後再有Kafka統一跟Hadoop溝通。python

 

1、kafka名詞解釋

producer:生產者,就是它來生產「雞蛋」的。sql

consumer:消費者,生出的「雞蛋」它來消費。apache

topic:你把它理解爲標籤,生產者每生產出來一個雞蛋就貼上一個標籤(topic),消費者可不是誰生產的「雞蛋」都吃的,這樣不一樣的生產者生產出來的「雞蛋」,消費者就能夠選擇性的「吃」了。bootstrap

broker:就是籃子了。ubuntu


2、與 Storm 比較

除了Kafka Streams,還有Apache Storm和Apache Samza可選擇。

Ref: 大牛總結分享:大數據技術Storm 區別 Kafka 哪些場景更適合centos

可見更多的是「集成合做關係」。服務器

 

3、Zookeeper安裝

要與kafka文件夾中自帶的zk的版本要同樣:https://archive.apache.org/dist/zookeeper/zookeeper-3.4.8/app

Ref: ZooKeeper的安裝與部署(集羣模式)dom

Ref: How to Setup Apache ZooKeeper Cluster on Ubuntu 18.04 LTS(單機模式下實踐沒問題)

Ref: 報錯:WARN [WorkerSender[myid=1]:QuorumCnxManager@584] - Cannot open channel to 2 at election address /x.x.x.x:3888

ipv6的坑,記得直接關掉就行了。

$ sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1
$ sudo sysctl -w net.ipv6.conf.default.disable_ipv6=1

 

4、Kafka安裝

Ref: Spark2.1.0+入門:Apache Kafka做爲DStream數據源(Python版) 

Ref: Kafka集羣部署 (守護進程啓動)

Ref: centos7下kafka集羣安裝部署

Ref: Zookeeper+Kafka集羣部署(已測、可用)

 

 

 

 

高級輸入源


Kafka 基本知識

1、Kafka 註冊信息

Broker 和 Topic

消息具備類別 (Topic) 屬性。一個 topic 的消息可能保存在一個活多個broker上。

分區 (Partition) 是物理上的概念,每一個 topic 包含一個或多個 partition。

 

生產者 和 消費者

Producer --> Kafka Broker --> Consumer (Spark Streaming)

每個消費者只屬於某一個組 (Consumer Group),沒指定就在默認的組。

 

 

2、ZooKeeper

Kafka的運行依賴於 ZooKeeper,其 "註冊信息" 都在其中。

因此,先啓動 ZooKeeper,再啓動 Kafka。

 

安裝Kafka

參考資料:Spark2.1.0+入門:Apache Kafka做爲DStream數據源(Python版) 

不一樣的版本兼容不一樣的spark,例如:Kafka_2.11 - 0.8.2.2.tgz,2.11是scala版本號。

根據spark配置Kafka,過程在此略,詳見 「課時64」。

記得下載對應的jar包以及/usr/local/kafka/libs下的內容,一併拷貝到/usr/local/spark/jars/kafka子目錄。

在spark-env.sh設置:

 

啓動Kafka

# 打開第一個終端,先啓動zookeeper
$ cd /usr/local/kafka
$ ./bin/zookeeper-server-start.sh  config/zookeeper.properties

# 打開第二個終端,再啓動kafka
$ cd /usr/local/kafka
$ bin/kafka-server-start.sh config/server.properties

# 打開第三個終端
# 建立一個topic:wordsendertest
$ cd /usr/local/kafka
$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 \
>--replication-factor 1 --partitions 1 --topic wordsendertest

# 列出全部建立的Topic,驗證是否建立成功
$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181

# 建立生產者給topic扔數據,能夠在當前終端輸入一些測試文字
./bin/kafka-console-producer.sh --broker-list localhost:9092 \
> --topic wordsendertest

# 打開第四個終端
# 建立消費者接收topic的數據,接收到「以上輸入的文字」
$ cd /usr/local/kafka
$ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 \
> --topic wordsendertest --from-beginning

 

 

Spark Streaming

將以上 」第四個終端「 換爲以下自定義的 」消費者程序「。

localhost:9092 ----> Kafka 做爲數據源 ----> localhost:2181

from __future__ import print_function
 
import sys
 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)
 
# 準備參數 sc
= SparkContext(appName="PythonStreamingKafkaWordCount") ssc = StreamingContext(sc, 1) zkQuorum, topic = sys.argv[1:]

kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})


# 至此,kafka做爲數據源,開始「轉換」

lines
= kvs.map(lambda x: x[1]) counts = lines.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a+b) counts.pprint() ssc.start() ssc.awaitTermination()

運行程序:

/usr/local/spark/bin/spark-submit ./KafkaWordCount.py  localhost:2181  wordsendertest

 

  

Structured Streaming

1、準備期間

例子邏輯

生產者每0.1秒生成2個單詞並寫入此topic。

消費者訂閱 wordcount-topic 收到單詞,並每隔8秒鐘進行一次統計。

統計結果發送給另外一個主題:wordcount-result-topic

 

啓動 Kafka

# (1) 啓動 Zookeeper 服務
bin/zookeeper-server-start.sh config/zookeeper.properties
  
# (2) 啓動 Kafka 服務
bin/kafka-server-start.sh config/server.properties
  
# (3) 監督"輸入"終端 
bin/kafka-console-consumer.sh -bootstrap-server localhost:9092 --topic wordcount-topic
 
# (4) 監督"輸出"終端
bin/kafka-console-consumer.sh -bootstrap-server localhost:9092 --topic wordcount-result-topic 

 

 

2、編寫生產者

# spark_ss_kafka_producer.py

import string
import random
import time
from kafka import KafkaProducer if __name__ = "__main__":
  # broker服務器的位置9092
  producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

  while True: 
    s2 = (random.choice(string.ascii_lowercase) for _ in range(2))  # 隨機生成兩個小寫字母
    word = ''.join(s2)                # 拼接起來
    value = bytearray(word, 'utf-8')  # 字節序列

    producer.send('wordcount-topic', value=value).get(timeout=10)  # 生產者向該主題發送出去,循環發送
    time.sleep(0.1)

運行代碼 

sudo apt-get install pip3 sudo pip3 install kafka-python python3 spark_ss_kafka_producer.py

 

  

3、編寫消費者

從 topic: wordcount-topic 得到消息,而後再往 topic: wordcount-result-topic 中投入消息。

# spark_ss_kafka_consumer.py

from pyspark.sql import SparkSession

if __name__ == "__main__":

  spark = SparkSessoin.builder.appName("StructuredKafkaWordCount").getOrCreate()
  spark.sparkContext.setLogLevel('WARN')

  lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", 'wordcount-topic').load().selectExpr("CAST(value ASSTRING)")  # 轉化爲字符串類型

  wordCounts = lines.groupBy("value").count()

運行代碼

/usr/local/spark/bin/spark-submit  \
> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 > spark_ss_kafka_consumer.py

 

 End. 

相關文章
相關標籤/搜索