zeppelin中運行spark streaming kakfa & 實時可視化

notebook方式運行spark程序是一種比較agile的方式,一方面能夠體驗像spark-shell那樣repl的便捷,同時能夠藉助notebook的做圖能力實現快速數據可視化,很是方便快速驗證和demo。notebook有兩種選擇,一種是ipython notebook,主要針對pyspark;另外一種是zeppelin,能夠執行scala spark,pyspark以及其它執行引擎,包括hive等。比較而言,ipython notebook的可視化能力更強,zeppelin的功能更強。這裏主要介紹基於zeppelin的方式。html

spark standalone 部署

本地搭建端到端環境能夠採用spark standalone部署方案。
從spark官方網站下載壓縮包spark-2.2.1-bin-hadoop2.7.tgz,解壓後執行python

#start cluster
./sbin/start-all.sh
# check with spark shell
spark-shell --master spark://localhost:7077
# check the web UI
http://localhost:8080

kafka 演示部署

kafka在spark streaming應用場景中使用很是普遍,它有不少優秀的特性,橫向擴展、持久化、有序性、API支持三種一致性語義等。
官方網站下載kafka_2.11-0.8.2.0.tar,並解壓。
這裏簡單啓動單節點:web

#start zookeeper
./bin/zookeeper-server-start.sh config/zookeeper.properties
#start kafka borker
./bin/kafka-server-start.sh config/server.properties

zeppelin部署及示例

官方網站下載zeppelin-0.7.3-bin-all.tgz,解壓。
爲了不端口衝突,先指定zeppelin的web端口:export ZEPPELIN_PORT=8088.
啓動:sql

# start daemon
./bin/zeppelin-daemon.sh start
# check status
./bin/zeppelin-daemon.sh status

訪問localhost:8088:
zeppelin webshell

建立一個notebook並嘗試運行幾個快速示例:
Python, pyspark, spark示例apache

python或者pyspark數據可視化可使用matplotlib也能夠直接將數據打印出來加上table頭的註解%table {column name1}\t{column name2}\t...restful

spark-streaming + direct kafka

kafka0.10.0的API跟以前版本變化較大,參照http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html,總結以下:maven

LocationStrategy

kafka partition跟spark executor之間對應關係
-LocationStrategies.PreferConsistent partition被均勻地對應到executor;
-PreferBrokers partition被分配給本地的executor,適合kafka跟spark集羣部署在相同節點上的狀況;
-PreferFixed 指定partition跟executor的映射關係oop

ConsumerStrategies

能夠subscribe到過個topic網站

Offset保存

0.10以前的版本中咱們須要本身在代碼中保存offset,以防止spark程序異常退出,在重啓自後可以從failure point開始從新處理數據。新版本的kafka consumer API自身支持了offset commit,週期地commit。示例代碼中沒有使用自動commit,由於從kafka中成功獲取數據後就commit offset存在一些問題。數據成功被讀取並不能保證數據被spark成功處理完。在以前的項目中咱們的方案也是本身保存offset,例如保存在zookeeper中。

官網表示spark和kafka 0.10.0的集成目前依然是experimental狀態。因此咱們將基於0.8版本kafka開發。http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html

spark-streaming + kafka + zeppelin

在zeppelin中執行streaming程序並將結果建立成temporary table,進而用於實時數據可視化

準備依賴

zeppelin有相似maven的依賴解決方法,paragraph以下:

%dep

z.reset()
z.load("org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1")
//z.load("org.apache.kafka:kafka_2.11:0.8.2.0")
z.load("org.apache.kafka:kafka-clients:0.8.2.0")

單詞統計代碼

讀取kafka數據,分詞,統計單詞數量,並將統計結果建立成temporary table counts

%spark
import _root_.kafka.serializer.DefaultDecoder
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
 
// prevent INFO logging from pollution output
sc.setLogLevel("INFO")
 
// creating the StreamingContext with 5 seconds interval

val ssc = new StreamingContext(sc, Seconds(5))
 
val kafkaConf = Map(
    "metadata.broker.list" -> "localhost:9092",
    "zookeeper.connect" -> "localhost:2181",
    "group.id" -> "kafka-streaming-example",
    "zookeeper.connection.timeout.ms" -> "1000"
)
 
val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
    ssc,
    kafkaConf,
    Map("test" -> 1),   // subscripe to topic and partition 1
    StorageLevel.MEMORY_ONLY
)
 
val words = lines.flatMap{ case(x, y) => y.split(" ")}
 

import spark.implicits._

val w=words.map(x=> (x,1L)).reduceByKey(_+_)
w.foreachRDD(rdd => rdd.toDF.registerTempTable("counts"))

 
ssc.start()

數據展現

從上面的temporary table counts 中查詢每小批量的數據中top 10 的單詞值。

%sql
select * from counts order by _2 desc limit 10

端到端演示

爲了快速搭建端到端的數據流分析,咱們能夠在上述各個步驟的基礎上再建立一個restful service,有不少方式,例如jetty + jersery,或者直接使用nifi鏈接到kafka。

相關文章
相關標籤/搜索