sparkstreaming

流(Streaming),在大數據時代爲數據流處理,就像水流同樣,是數據流;既然是數據流處理,就會想到數據的流入、數據的加工、數據的流出。html

平常工做、生活中數據來源不少不一樣的地方。例如:工業時代的汽車製造、監控設備、工業設備會產生不少源數據;信息時代的電商網站、日誌服務器、社交網絡、金融交易系統、黑客攻擊、垃圾郵件、交通監控等;通訊時代的手機、平板、智能設備、物聯網等會產生不少實時數據,數據流無處不在。java

在大數據時代Spark Streaming能作什麼?web

平時用戶都有網上購物的經歷,用戶在網站上進行的各類操做經過Spark Streaming流處理技術能夠被監控,用戶的購買愛好、關注度、交易等能夠進行行爲分析。在金融領域,經過Spark Streaming流處理技術能夠對交易量很大的帳號進行監控,防止罪犯洗錢、財產轉移、防欺詐等。在網絡安全性方面,黑客攻擊時有發生,經過Spark Streaming流處理技術能夠將某類可疑IP進行監控並結合機器學習訓練模型匹配出當前請求是否屬於黑客攻擊。其餘方面,如:垃圾郵件監控過濾、交通監控、網絡監控、工業設備監控的背後都是Spark Streaming發揮強大流處理的地方。算法

大數據時代,數據價值通常怎麼定義?數據庫

全部沒通過流處理的數據都是無效數據或沒有價值的數據;數據產生以後當即處理產生的價值是最大的,數據放置越久或越滯後其使用價值越低。之前絕大多數電商網站盈利走的是網絡流量(即用戶的訪問量),現在,電商網站不只僅須要關注流量、交易量,更重要的是要經過數據流技術讓電商網站的各類數據流動起來,經過實時流動的數據及時分析、挖掘出各類有價值的數據;好比:對不一樣交易量的用戶指定用戶畫像,從而提供不一樣服務質量;準對用戶訪問電商網站板塊愛好及時推薦相關的信息。apache

SparkStreaming VS Hadoop MR:編程

Spark Streaming是一個準實時流處理框架,而Hadoop MR是一個離線、批處理框架;很顯然,在數據的價值性角度,Spark Streaming完勝於Hadoop MR。ubuntu

SparkStreaming VS Storm:api

Spark Streaming是一個準實時流處理框架,處理響應時間通常以分鐘爲單位,也就是說處理實時數據的延遲時間是秒級別的;Storm是一個實時流處理框架,處理響應是毫秒級的。因此在流框架選型方面要看具體業務場景。須要澄清的是如今不少人認爲Spark Streaming流處理運行不穩定、數據丟失、事務性支持很差等等,那是由於不少人不會駕馭Spark Streaming及Spark自己。在Spark Streaming流處理的延遲時間方面,Spark定製版本,會將Spark Streaming的延遲從秒級別推動到100毫秒以內甚至更少。數組

SparkStreaming優勢:

一、提供了豐富的API,企業中能快速實現各類複雜的業務邏輯。

二、流入Spark Streaming的數據流經過和機器學習算法結合,完成機器模擬和圖計算。

三、Spark Streaming基於Spark優秀的血統。

 

SparkStreaming能不能像Storm同樣,一條一條處理數據?

Storm處理數據的方式是以條爲單位來一條一條處理的,而Spark Streaming基於單位時間處理數據的,SparkStreaming能不能像Storm同樣呢?答案是:能夠的。

業界通常的作法是Spark Streaming和Kafka搭檔便可達到這種效果,入下圖:

 

Kafka業界認同最主流的分佈式消息框架,此框架即符合消息廣播模式又符合消息隊列模式。

Kafka內部使用的技術:

一、  Cache

二、  Interface

三、  Persistence(默認最大持久化一週)

四、  Zero-Copy技術讓Kafka每秒吞吐量幾百兆,並且數據只須要加載一次到內核提供其餘應用程序使用

外部各類源數據推動(Push)Kafka,而後再經過Spark Streaming抓取(Pull)數據,抓取的數據量能夠根據本身的實際狀況肯定每一秒中要處理多少數據。

 

經過Spark Streaming動手實戰wordCount實例

這裏是運行一個Spark Streaming的程序:統計這個時間段內流進來的單詞出現的次數. 它計算的是:他規定的時間段內每一個單詞出現了多少次。

一、先啓動下Spark集羣:

咱們從集羣裏面打開下官方網站

 

接受這個數據進行加工,就是流處理的過程,剛纔那個WordCount就是以1s作一個單位。

剛纔運行的時候,爲何沒有結果呢?由於須要數據源。

二、獲取數據源:

 

新開一個命令終端,而後輸入:

$ nc -lk 9999

如今咱們拷貝數據源進入運行:

 

 

而後按回車運行

 

 

DStream和RDD關係:

沒有輸入數據會打印的是空結果:

 

 可是實際上,Job的執行是Spark Streaming框架幫咱們產生的和開發者本身寫的Spark代碼業務邏輯沒有關係,並且Spark Streaming框架的執行時間間隔能夠手動配置,如:每隔一秒鐘就會產生一次Job的調用。因此在開發者編寫好的Spark代碼時(如:flatmap、map、collect),不會致使job的運行,job運行是Spark Streaming框架產生的,能夠配置成每隔一秒中都會產生一次job調用。
Spark Streaming流進來的數據是DStream,但Spark Core框架只認RDD,這就產生矛盾了?
Spark Streaming框架中,做業實例的產生都是基於rdd實例來產生,你寫的代碼是做業的模板,即rdd是做業的模板,模板一運行rdd就會被執行,此時action必須處理數據。RDD的模板就是DStream離散流,RDD之間存在依賴關係,DStream就有了依賴關係,也就構成了DStream 有向無環圖。這個DAG圖,是模板。Spark Streaming只不過是在附在RDD上面一層薄薄的封裝而已。你寫的代碼不能產生Job,只有框架才能產生Job.
若是一秒內計算不完數據,就只能調優了.

總結:

使用Spark Streaming能夠處理各類數據來源類型,如:數據庫、HDFS,服務器log日誌、網絡流,其強大超越了你想象不到的場景,只是不少時候你們不會用,其真正緣由是對Spark、spark streaming自己不瞭解。

 

Scala和Java二種方式實戰Spark Streaming開發

1、Java方式開發

一、開發前準備:假定您以搭建好了Spark集羣。

二、開發環境採用eclipse maven工程,須要添加Spark Streaming依賴。

三、Spark streaming 基於Spark Core進行計算,須要注意事項:

設置本地master,若是指定local的話,必須配置至少二條線程,也可經過sparkconf來設置,由於Spark Streaming應用程序在運行的時候,至少有一條線程用於不斷的循環接收數據,而且至少有一條線程用於處理接收的數據(不然的話沒法有線程用於處理數據),隨着時間的推移,內存和磁盤都會不堪重負)。

舒適提示:

對於集羣而言,每隔exccutor通常確定不僅一個Thread,那對於處理Spark Streaming應用程序而言,每一個executor通常分配多少core比較合適?根據咱們過去的經驗,5個左右的core是最佳的(段子:分配爲奇數個core的表現最佳,例如:分配3個、5個、7個core等)

接下來,讓咱們開始動手寫寫Java代碼吧!

第一步:建立SparkConf對象

 

第二步:建立SparkStreamingContext

咱們採用基於配置文件的方式建立SparkStreamingContext對象:

第三步,建立Spark Streaming輸入數據來源:

  咱們將數據來源配置爲本地端口9999(注意端口要求沒有被佔用):

第四步:咱們就像對RDD編程同樣,基於DStream進行編程,緣由是DStream是RDD產生的模板,在Spark Streaming發生計算前,其實質是把每一個Batch的DStream的操做翻譯成爲了RDD操做。

一、flatMap操做:

二、 mapToPair操做:

 

三、reduceByKey操做:

四、print等操做:

舒適提示:

除了print()方法將處理後的數據輸出以外,還有其餘的方法也很是重要,在開發中須要重點掌握,好比SaveAsTextFile,SaveAsHadoopFile等,最爲重要的是foreachRDD方法,這個方法能夠將數據寫入Redis,DB,DashBoard等,甚至能夠隨意的定義數據放在哪裏,功能很是強大。

1、Scala方式開發

第一步,接收數據源:

第二步,flatMap操做:

第三步,map操做:

第四步,reduce操做:

第五步,print()等操做:

第六步:awaitTermination操做

 

總結:

使用Spark Streaming能夠處理各類數據來源類型,如:數據庫、HDFS,服務器log日誌、網絡流,其強大超越了你想象不到的場景,只是不少時候你們不會用,其真正緣由是對Spark、spark streaming自己不瞭解。

 
 

StreamingContext、DStream、Receiver深度剖析

 

1、StreamingContext功能及源碼剖析:

一、  經過Spark Streaming對象jssc,建立應用程序主入口,並連上Driver上的接收數據服務端口9999寫入源數據:

 

二、  Spark Streaming的主要功能有:

  • 主程序的入口;
  • 提供了各類建立DStream的方法接收各類流入的數據源(例如:Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等);
  • 經過構造函數實例化Spark Streaming對象時,能夠指定master URL、appName、或者傳入SparkConf配置對象、或者已經建立的SparkContext對象;
  • 將接收的數據流傳入DStreams對象中;
  • 經過Spark Streaming對象實例的start方法啓動當前應用程序的流計算框架或經過stop方法結束當前應用程序的流計算框架;

 

 2、DStream功能及源碼剖析:

一、  DStream是RDD的模板,DStream是抽象的,RDD也是抽象

二、  DStream的具體實現子類以下圖所示:

 

三、  以StreamingContext實例的socketTextSteam方法爲例,其執行完的結果返回DStream對象實例,其源碼調用過程以下圖:

socket.getInputStream獲取數據,while循環來存儲儲蓄數據(內存、磁盤)

3、Receiver功能及源碼剖析:

一、Receiver表明數據的輸入,接收外部輸入的數據,如從Kafka上抓取數據;

二、Receiver運行在Worker節點上;

三、Receiver在Worker節點上抓取Kafka分佈式消息框架上的數據時,具體實現類是KafkaReceiver;

四、Receiver是抽象類,其抓取數據的實現子類以下圖所示:

 

五、  若是上述實現類都知足不了您的要求,您本身能夠定義Receiver類,只須要繼承Receiver抽象類來實現本身子類的業務需求。

4、StreamingContext、DStream、Receiver結合流程分析:

 

(1)inputStream表明了數據輸入流(如:Socket、Kafka、Flume等)

(2)Transformation表明了對數據的一系列操做,如flatMap、map等

(3)outputStream表明了數據的輸出,例如wordCount中的println方法:

數據數據在流進來以後最終會生成Job,最終仍是基於Spark Core的RDD進行執行:在處理流進來的數據時是DStream進行Transformation因爲是StreamingContext因此根本不會去運行,StreamingContext會根據Transformation生成」DStream的鏈條」及DStreamGraph,而DStreamGraph就是DAG的模板,這個模板是被框架託管的。當咱們指定時間間隔的時候,Driver端就會根據這個時間間隔來觸發Job而觸發Job的方法就是根據OutputDStream中指定的具體的function,例如wordcount中print,這個函數必定會傳給ForEachDStream,它會把函數交給最後一個DStream產生的RDD,也就是RDD的print操做,而這個操做就是RDD觸發Action。

總結:

使用Spark Streaming能夠處理各類數據來源類型,如:數據庫、HDFS,服務器log日誌、網絡流,其強大超越了你想象不到的場景,只是不少時候你們不會用,其真正緣由是對Spark、spark streaming自己不瞭解。

 
 

基於HDFS的SparkStreaming案例實戰

一:Spark集羣開發環境準備

  1. 啓動HDFS,以下圖所示:

 

經過web端查看節點正常啓動,以下圖所示:

2.啓動Spark集羣,以下圖所示:

經過web端查看集羣啓動正常,以下圖所示:

3.啓動start-history-server.sh,以下圖所示:

二:HDFS的SparkStreaming案例實戰(代碼部分)

package com.dt.spark.SparkApps.sparkstreaming;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.Java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import java.util.Arrays;

/**
 * Created by Jonson on 2016/4/17.
 */
public class SparkStreamingOnHDFS {
    public static void main(String[] args){
        /**
         * 第一步:配置SparkConf
         * 1. 至少兩條線程:
         * 由於Spark Streaming應用程序在運行的時候,至少有一條線程用於不斷的循環接收數據,
         * 而且至少有一條線程用於處理接收的數據(不然的話沒法有線程用於處理數據,隨着時間的推移,內存和磁盤都不堪重負)
         * 2. 對於集羣而言,每一個Executor通常而言確定不止一個線程,對於處理Spark Streaming的應用程序而言,每一個Executor通常
         * 分配多少個Core合適呢?根據咱們過去的經驗,5個左右的core是最佳的(分配爲奇數個Core爲最佳)。
         */
        final SparkConf conf = new SparkConf().setMaster("spark://Master:7077").setAppName("SparkOnStreamingOnHDFS");
        /**
         * 第二步:建立SparkStreamingContext,這個是Spark Streaming應用程序全部功能的起始點和程序調度的核心
         * 1,SparkStreamingContext的構建能夠基於SparkConf參數,也能夠基於持久化SparkStreamingContext的內容
         * 來恢復過來(典型的場景是Driver崩潰後從新啓動,因爲Spark Streaming具備連續7*24小時不間斷運行的特徵,
         * 全部須要在Driver從新啓動後繼續上一次的狀態,此時狀態的恢復須要基於曾經的checkpoint)
         * 2,在一個Spark Streaming應用程序中能夠建立若干個SparkStreamingContext對象,使用下一個SparkStreamingContext
         * 以前須要把前面正在運行的SparkStreamingContext對象關閉掉,由此,咱們得到一個重大啓發:SparkStreamingContext
         * 是Spark core上的一個應用程序而已,只不過Spark Streaming框架箱運行的話須要Spark工程師寫業務邏輯
         */
//        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));//Durations.seconds(5)設置每隔5秒

        final String checkpointDirectory = "hdfs://Master:9000/library/SparkStreaming/Checkpoint_Data";
        JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
            @Override
            public JavaStreamingContext create() {
                return createContext(checkpointDirectory,conf);
            }
        };
        /**
         * 能夠從失敗中恢復Driver,不過還須要制定Driver這個進程運行在Cluster,而且提交應用程序的時候
         * 指定 --supervise;
         */
        JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);
        /**
         * 如今是監控一個文件系統的目錄
         * 此處沒有Receiver,Spark Streaming應用程序只是按照時間間隔監控目錄下每一個Batch新增的內容(把新增的)
         * 做爲RDD的數據來源生成原始的RDD
         */
        //指定從HDFS中監控的目錄
        JavaDStream lines = jsc.textFileStream("hdfs://Master:9000/library/SparkStreaming/Data");
        /**
         * 第四步:接下來就像對於RDD編程同樣基於DStreaming進行編程!!!
         * 緣由是:
         *  DStreaming是RDD產生的模板(或者說類)。
         *  在Spark Streaming具體發生計算前其實質是把每一個batch的DStream的操做翻譯成對RDD的操做!!
         *  對初始的DStream進行Transformation級別的處理,例如Map,filter等高階函數的編程,來進行具體的數據計算。
         *  第4.1步:將每一行的字符串拆分紅單個單詞
         */
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,String>() {
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));
            }
        });
        /**
         * 第4.2步:對初始的JavaRDD進行Transformation級別的處理,例如map,filter等高階函數等的編程,來進行具體的數據計算
         * 在4.1的基礎上,在單詞拆分的基礎上對每一個單詞實例計數爲1,也就是word => (word,1)
         */
        JavaPairDStream<String,Integer> pairs  = words.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String,Integer>(word,1);
            }
        });
        /**
         * 第4.3步:在每一個單詞實例計數的基礎上統計每一個單詞在文件中出現的總次數
         */
        JavaPairDStream<String,Integer> wordscount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        /**
         * 此處的print並不會直接觸發Job的執行,由於如今的一切都是在Spark Streaming框架控制下的,對於Spark而言具體是否
         * 觸發真正的Job運行是基於設置的Duration時間間隔的
         * 必定要注意的是:Spark Streaming應用程序要想執行具體的Job,對DStream就必須有output Stream操做,
         * output Stream有不少類型的函數觸發,例如:print,saveAsTextFile,saveAsHadoopFiles等,其實最爲重要的一個方法是
         * foraeachRDD,由於Spark Streaming處理的結果通常都會放在Redis,DB,DashBoard等上面,foreachRDD主要就是用來完成這些
         * 功能的,並且能夠隨意的自定義具體數據到底存放在哪裏!!!
         */
        wordscount.print();
        /**
         * Spark Streaming執行引擎也就是Driver開始運行,Driver啓動的時候是位於一條新的線程中的。
         * 固然其內部有消息循環體用於接收應用程序自己或者Executor的消息;
         */
        jsc.start();
        jsc.awaitTermination();
        jsc.close();
    }
    /**
     * 工廠化模式構建JavaStreamingContext
     */
    private static JavaStreamingContext createContext(String checkpointDirectory,SparkConf conf){
        System.out.println("Creating new context");
        SparkConf = conf;
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,Durations.seconds(5));
        ssc.checkpoint(checkpointDirectory);
        return ssc;
    }
}

代碼打包在集羣中運行

  1. 建立目錄

 

  

2.腳本運行

  腳本內容以下:

此時Spark Streaming會每隔5秒執行一次,不斷的掃描監控目錄下是否有新的文件。

3.上傳文件到HDFS中的Data目錄下

4.輸出結果

三:Spark Streaming on HDFS源碼解密

  1. JavaStreamingContextFactory的create方法能夠建立JavaStreamingContext
  2. 而咱們在具體實現的時候覆寫了該方法,內部就是調用createContext方法來具體實現。上述實戰案例中咱們實現了createContext方法。
/*** Factory interface for creating a new JavaStreamingContext
 */
trait JavaStreamingContextFactory {
  def create(): JavaStreamingContext
}

3.checkpoint:

  一方面:保持容錯

  一方面保持狀態

  在開始和結束的時候每一個batch都會進行checkpoint

** Sets the context to periodically checkpoint the DStream operations for master

 * fault-tolerance. The graph will be checkpointed every batch interval.
 * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
 */
def checkpoint(directory: String) {
  ssc.checkpoint(directory)
}
4.remember:
流式處理中過一段時間數據就會被清理掉,可是能夠經過remember能夠延長數據在程序中的生命週期,另外延長RDD更長的時間。

應用場景:

假設數據流進來,進行ML或者Graphx的時候有時須要很長時間,可是bacth定時定條件的清除RDD,因此就能夠經過remember使得數據能夠延長更長時間。/**

 * Sets each DStreams in this context to remember RDDs it generated in the last given duration.
 * DStreams remember RDDs only for a limited duration of duration and releases them for garbage
 * collection. This method allows the developer to specify how long to remember the RDDs (
 * if the developer wishes to query old data outside the DStream computation).
 * @param duration Minimum duration that each DStream should remember its RDDs
 */
def remember(duration: Duration) {
  ssc.remember(duration)
}
5.在JavaStreamingContext中,getOrCreate方法源碼以下:

  若是設置了checkpoint ,重啓程序的時候,getOrCreate()會從新從checkpoint目錄中初始化出StreamingContext。

/* * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.

 * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
 * recreated from the checkpoint data. If the data does not exist, then the provided factory
 * will be used to create a JavaStreamingContext.
 *
 * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program
 * @param factory        JavaStreamingContextFactory object to create a new JavaStreamingContext
 * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactor.
 */
@deprecated("use getOrCreate without JavaStreamingContextFactor", "1.4.0")
def getOrCreate(
    checkpointPath: String,
    factory: JavaStreamingContextFactory
  ): JavaStreamingContext = {
  val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
    factory.create.ssc
  })
  new JavaStreamingContext(ssc)
}
異常問題思考:

爲啥會報錯?
  1. Streaming會按期的進行checkpoint。
  2. 從新啓動程序的時候,他會從曾經checkpoint的目錄中,若是沒有作額外配置的時候,全部的信息都會放在checkpoint的目錄中(包括曾經應用程序信息),所以下次再次啓動的時候就會報錯,沒法初始化ShuffleDStream。

總結:

使用Spark Streaming能夠處理各類數據來源類型,如:數據庫、HDFS,服務器log日誌、網絡流,其強大超越了你想象不到的場景,只是不少時候你們不會用,其真正緣由是對Spark、spark streaming自己不瞭解。

 

SparkStreaming數據源Flume實際案例

1、什麼是Flume?
  flume 做爲 cloudera 開發的實時日誌收集系統,受到了業界的承認與普遍應用。Flume 初始的發行版本目前被統稱爲 Flume OG(original generation),屬於 cloudera。但隨着 FLume 功能的擴展,Flume OG 代碼工程臃腫、核心組件設計不合理、核心配置不標準等缺點暴露出來,尤爲是在 Flume OG 的最後一個發行版本 0.94.0 中,日誌傳輸不穩定的現象尤其嚴重,爲了解決這些問題,2011 年 10 月 22 號,cloudera 完成了 Flume-728,對 Flume 進行了里程碑式的改動:重構核心組件、核心配置以及代碼架構,重構後的版本統稱爲 Flume NG(next generation);改動的另外一緣由是將 Flume 歸入 apache 旗下,cloudera Flume 更名爲 Apache Flume。

   flume的特色:
  flume是一個分佈式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。支持在日誌系統中定製各種數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各類數據接受方(好比文本、HDFS、Hbase等)的能力 。
  flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位,它攜帶日誌數據(字節數組形式)而且攜帶有頭信息,這些Event由Agent外部的Source生成,當Source捕獲事件後會進行特定的格式化,而後Source會把事件推入(單個或多個)Channel中。你能夠把Channel看做是一個緩衝區,它將保存事件直到Sink處理完該事件。Sink負責持久化日誌或者把事件推向另外一個Source。
 flume的可靠性 
  當節點出現故障時,日誌可以被傳送到其餘節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別爲:end-to-end(收到數據agent首先將event寫到磁盤上,當數據傳送成功後,再刪除;若是數據發送失敗,能夠從新發送。),Store on failure(這也是scribe採用的策略,當數據接收方crash時,將數據寫到本地,待恢復後,繼續發送),Besteffort(數據發送到接收方後,不會進行確認)。
flume的可恢復性:
  仍是靠Channel。推薦使用FileChannel,事件持久化在本地文件系統裏(性能較差)。 
flume的一些核心概念:
Agent        使用JVM 運行Flume。每臺機器運行一個agent,可是能夠在一個agent中包含多個sources和sinks。

  1. Client        生產數據,運行在一個獨立的線程。
  2. Source        從Client收集數據,傳遞給Channel。
  3. Sink        從Channel收集數據,運行在一個獨立線程。
  4. Channel        鏈接 sources 和 sinks ,這個有點像一個隊列。
  5. Events        能夠是日誌記錄、 avro 對象等。

 Flume以agent爲最小的獨立運行單位。一個agent就是一個JVM。單agent由Source、Sink和Channel三大組件構成,以下圖:

  值得注意的是,Flume提供了大量內置的Source、Channel和Sink類型。不一樣類型的Source,Channel和Sink能夠自由組合。組合方式基於用戶設置的配置文件,很是靈活。好比:Channel能夠把事件暫存在內存裏,也能夠持久化到本地硬盤上。Sink能夠把日誌寫入HDFS, HBase,甚至是另一個Source等等。Flume支持用戶創建多級流,也就是說,多個agent能夠協同工做,而且支持Fan-in、Fan-out、Contextual Routing、Backup Routes,這也正是NB之處。以下圖所示:

2、Flume+Kafka+Spark Streaming應用場景:

一、Flume集羣採集外部系統的業務信息,將採集後的信息發生到Kafka集羣,最終提供Spark Streaming流框架計算處理,流處理完成後再將最終結果發送給Kafka存儲,架構以下圖:

二、Flume集羣採集外部系統的業務信息,將採集後的信息發生到Kafka集羣,最終提供Spark Streaming流框架計算處理,流處理完成後再將最終結果發送給Kafka存儲,同時將最終結果經過Ganglia監控工具進行圖形化展現,架構以下圖:

三、咱們要作:Spark streaming 交互式的360度的可視化,Spark streaming 交互式3D可視化UI;Flume集羣採集外部系統的業務信息,將採集後的信息發生到Kafka集羣,最終提供Spark Streaming流框架計算處理,流處理完成後再將最終結果發送給Kafka存儲,將最終結果同時存儲在數據庫MySQL)、內存中間件(Redis、MemSQL)中,同時將最終結果經過Ganglia監控工具進行圖形化展現,架構以下圖:

3、Kafka數據寫入Spark Streaming有二種方式:

一種是Receivers,這個方法使用了Receivers來接收數據,Receivers的實現使用到Kafka高層次的消費者API,對於全部的Receivers,接收到的數據將會保存在Spark 分佈式的executors中,而後由Spark Streaming啓動的Job來處理這些數據;然而,在默認的配置下,這種方法在失敗的狀況下會丟失數據,爲了保證零數據丟失,你能夠在Spark Streaming中使用WAL日誌功能,這使得咱們能夠將接收到的數據保存到WAL中(WAL日誌能夠存儲在HDFS上),因此在失敗的時候,咱們能夠從WAL中恢復,而不至於丟失數據。

另外一種是DirectAPI,產生數據和處理數據的時候是在兩臺機器上?實際上是在同一臺數據上,因爲在一臺機器上有Driver和Executor,因此這臺機器要足夠強悍。

Flume集羣將採集的數據放到Kafka集羣中,Spark Streaming會實時在線的從Kafka集羣中經過DirectAPI拿數據,能夠經過Kafka中的topic+partition查詢最新的偏移量(offset)來讀取每一個batch的數據,即便讀取失敗也可再根據偏移量來讀取失敗的數據,保證應用運行的穩定性和數據可靠性。

 

舒適提示:

一、Flume集羣數據寫入Kafka集羣時可能會致使數據存放不均衡,即有些Kafka節點數據量很大、有些不大,後續會對分發數據進行自定義算法來解決數據存放不均衡問題。

二、我的強烈推薦在生產環境下用DirectAPI,可是咱們的發行版,會對DirectAPI進行優化,下降其延遲。

總結:

  實際生產環境下,蒐集分佈式的日誌以Kafka爲核心。

使用Spark Streaming能夠處理各類數據來源類型,如:數據庫、HDFS,服務器log日誌、網絡流,其強大超越了你想象不到的場景,只是不少時候你們不會用,其真正緣由是對Spark、spark streaming自己不瞭解。

 

Spark Streaming on Kafka解析和安裝實戰

本課分2部分講解:

第一部分,講解Kafka的概念、架構和用例場景;

第二部分,講解Kafka的安裝和實戰。

因爲時間關係,今天的課程只講到如何用官網的例子驗證Kafka的安裝是否成功。後續課程會接着講解如何集成Spark Streaming和Kafka。

1、Kafka的概念、架構和用例場景

http://kafka.apache.org/documentation.html#introdution

一、Kafka的概念

Apache Kafka是分佈式發佈-訂閱消息系統。它最初由LinkedIn公司開發,以後成爲Apache項目的一部分。Kafka是一種快速、可擴展的、設計內在就是分佈式的,分區的和可複製的提交日誌服務。zookeeper和kafka和大數據是不僅能用於大數據的,集羣啓不啓動,它均可以使用,也能夠用於Javaserver普通的企業級平臺上。

什麼是消息組件:

以帥哥和美女聊天爲例,帥哥如何和美女交流呢?這中間一般想到的是微信、QQ、電話、郵件等通訊媒介,這些通訊媒介就是消息組件,帥哥把聊天信息發送給消息組件、消息組件將消息推送給美女,這就是常說的生產者、消費者模型,kafka不只僅說是生產者消費者模式中廣播的概念,也能夠實現隊列的方式,kafka的消費者中有一個group的概念,group中能夠有不少實體,也能夠只有一個實體,group中只有一個實體的話,就是隊列的方式,因此從消息驅動的角度講,它是廣播的方式和隊列的方式的完美結合體。並且在發送信息時能夠將內容進行分類,即所謂的Topic主題。Kafka就是這樣的通訊組件,將不一樣對象組件粘合起來的紐帶, 且是解耦合方式傳遞數據。

完善的流處理系統的特色:

1)能在線的以很是低的延遲,來處理數據,並且是穩定可靠的

2)能對流進來的數據進行很是複雜的分析,而不是簡單的僅僅統計的分析

3)不只能處理當前在線的數據,也能處理過去一天,一週,一個月甚至一年的數據

Apache Kafka與傳統消息系統相比,有如下不一樣的特色:

  • 分佈式系統,易於向外擴展;
  • 在線低延遲,同時爲發佈和訂閱提供高吞吐量;
  • 流進來的數據通常處理完後就消失了,也能夠將消息存儲到磁盤,所以能夠處理1天甚至1周前內容,因此kafka不只是一個消息中間件,仍是一個存儲系統

二、Kafka的架構

Kafka既然具有消息系統的基本功能,那麼就必然會有組成消息系統的組件:

Topic,Producer和Consumer。Kafka還有其特殊的Kafka Cluster組件。

Topic主題:

表明一種數據的類別或類型,工做、娛樂、生活有不一樣的Topic,生產者須要說明把說明數據分別放在那些Topic中,裏面就是一個個小對象,並將數據數據推到Kafka,消費者獲取數據是pull的過程。一組相同類型的消息數據流。這些消息在Kafka會被分區存放,而且有多個副本,以防數據丟失。每一個分區的消息是順序寫入的,而且不可改寫。

-       Producer(生產者):把數據推到Kafka系統的任何對象。

 

- Kafka Cluster(Kafka集羣):把推到Kafka系統的消息保存起來的一組服務器,也叫Broker。由於Kafka集羣用到了Zookeeper做爲底層支持框架,因此由一個選出的服務器做爲Leader來處理全部消息的讀和寫的請求,其餘服務器做爲Follower接受Leader的廣播同步備份數據,以備災難恢復時用。

- Consumer(消費者):從Kafka系統訂閱消息的任何對象。

消費者能夠有多個,而且某些消費者還能夠組成Consumer Group。多個Consumer Group之間組成消息廣播的關係,因此各個Group能夠拉相同的消息數據。在Consumer Group內部,各消費者之間對Consumer Group拉出來的消息數據是隊列先進先出的關係,某個消息數據只能給該Group的一個消費者使用,同一個Group中的實體是互斥的,對一個消息,這樣是避免重複消費。若是有多個group,每一個group中只有一個實體,這就是隊列的方式了,由於它是互斥的。若是不是一個實體,則是廣播模式,以下圖所示,廣播只能廣播給一個group中的一個消費實體

kafka的數據傳輸是基於kernel(內核)級別的(傳輸速度接近0拷貝-ZeroCopy)、沒有用戶空間的參與。Linux自己是軟件,軟件啓動時第一個啓動進程叫init,在init進程啓動後會進入用戶空間;kafka是用java寫的,是基於jvm虛擬機的。例如:在分佈式系統中,機器A上的應用程序須要讀取機器B上的Java服務數據,因爲Java程序對應的JVM是用戶空間級別並且數據在磁盤上,A上應用程序讀取數據時會首先進入機器B上的內核空間再進入機器B的用戶空間,讀取用戶空間的數據後,數據再通過B機器上的內核空間分發到網絡中(之因此要再通過B的內核,由於要經過網絡通訊,不經過內核,哪裏來的網絡通訊),機器A網卡接收到傳輸過來的數據後再將數據寫入A機器的內核空間,從而最終將數據傳輸給A的用戶空間進行處理。以下圖:網絡自己是一種硬件,磁盤只是硬件的一種。

正常狀況下,外部系統從Java程序中讀取數據,傳輸給內核空間並依賴網卡將數據寫入到網絡中,從而把數據傳輸出去。其實Java自己是內核的一層外衣,Java Socket編程,操做的各類數據都是在JVM的用戶空間中進行的。而Kafka操做數據是放在內核空間的,一般內核空間處理數據的速度比用戶空間快上萬倍,由於沒用用戶態和內核態的切換,因此經過kafka能夠實現高速讀、寫數據。只要磁盤空間足夠大,能夠無限量的存儲數據,kafka的數據就是存儲在磁盤中的,不是存在內核中的。而不少消息組件是把數據存內存中的。kafka用zookeeperg管理元數據,並且按順序寫數據,比隨機寫要快不少。又有副本!

三、Kafka的用例場景 

相似微信,手機和郵箱等等這樣你們熟悉的消息組件,Kafka也能夠:

-       支持文字/圖片

-       能夠存儲內容

-       分門別類

從內容消費的角度,Kafka把郵箱中的郵件類型當作是Topic。

2、Kafka的安裝和實戰

http://kafka.apache.org/documentation.html#quickstart

一、安裝和配置Zookeeper

Kafka集羣模式須要提早安裝好Zookeeper。

-       提示:Kafka單例模式不須要安裝額外的Zookeeper,可使用內置的Zookeeper。

-       Kafka集羣模式須要至少3臺服務器。本課實戰用到的服務器Hostname:master,slave1,slave2。

-       本課中用到的Zookeeper版本是Zookeeper-3.4.6。

1)    下載Zookeeper

進入http://www.apache.org/dyn/closer.cgi/zookeeper/,你能夠選擇其餘鏡像網址去下載,用官網推薦的鏡像:http://mirror.bit.edu.cn/apache/zookeeper/。提示:能夠直接下載羣裏的Zookeeper安裝文件。

下載zookeeper-3.4.6.tar.gz

1)    安裝Zookeeper

提示:下面的步驟發生在master服務器。

以ubuntu14.04舉例,把下載好的文件放到/root目錄,用下面的命令解壓:

cd /root

tar -zxvf zookeeper-3.4.6.tar.gz

解壓後在/root目錄會多出一個zookeeper-3.4.6的新目錄,用下面的命令把它剪切到指定目錄即安裝好Zookeeper了:

cd /root

mv zookeeper-3.4.6 /usr/local/spark

以後在/usr/local/spark目錄會多出一個zookeeper-3.4.6的新目錄。下面咱們講如何配置安裝好的Zookeeper。

2)    配置Zookeeper

提示:下面的步驟發生在master服務器。

  1. 配置.bashrc

-       打開文件:vi /root/.bashrc

-       在PATH配置行前添加:

export ZOOKEEPER_HOME=/usr/local/spark/zookeeper-3.4.6

-       最後修改PATH:

export PATH=${JAVA_HOME}/bin:${ZOOKEEPER_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${SCALA_HOME}/bin:${SPARK_HOME}/bin:${SPARK_HOME}/sbin:${HIVE_HOME}/bin:${KAFKA_HOME}/bin:$PATH

-       使配置的環境變量當即生效:source /root/.bashrc

  1. 建立data目錄

-       cd $ZOOKEEPER_HOME

-       mkdir data

  1. 建立並打開zoo.cfg文件

-       cd $ZOOKEEPER_HOME/conf

-       cp zoo_sample.cfg zoo.cfg

-       vi zoo.cfg

  1. 配置zoo.cfg

# 配置Zookeeper的日誌和服務器身份證號等數據存放的目錄。

# 千萬不要用默認的/tmp/zookeeper目錄,由於/tmp目錄的數據容易被意外刪除。

dataDir=../data

# Zookeeper與客戶端鏈接的端口

clientPort=2181

# 在文件最後新增3行配置每一個服務器的2個重要端口:Leader端口和選舉端口

# server.A=B:C:D:其中 A 是一個數字,表示這個是第幾號服務器;

# B 是這個服務器的hostname或ip地址;

# C 表示的是這個服務器與集羣中的 Leader 服務器交換信息的端口;

# D 表示的是萬一集羣中的 Leader 服務器掛了,須要一個端口來從新進行選舉,

# 選出一個新的 Leader,而這個端口就是用來執行選舉時服務器相互通訊的端口。

# 若是是僞集羣的配置方式,因爲 B 都是同樣,因此不一樣的 Zookeeper 實例通訊

# 端口號不能同樣,因此要給它們分配不一樣的端口號。

server.1=master:2888:3888

server.2=slave1:2888:3888

server.3=slave2:2888:3888

改爲以下方式:

dataDir=/usr/local/spark/zookeeper-3.4.6/data
dataLogDir=/usr/local/spark/zookeeper-3.4.6/logs
clientPort=2181
server.0=master1:2888:3888
server.1=work1:2888:3888
server.2=work2:2888:3888

 

  1. 建立並打開myid文件

-       cd $ZOOKEEPER_HOME/data

-       touch myid

-       vi myid

  1. 配置myid

按照zoo.cfg的配置,myid的內容就是1。要寫成0,和上面zoo.cfg裏面的配置server.0,server.1,server.2一致,因此下面work1中myid內容爲1,work2中myid內容爲2

3)    同步master的安裝和配置到slave1和slave2

-       在master服務器上運行下面的命令

cd /root

scp ./.bashrc root@slave1:/root

scp ./.bashrc root@slave2:/root

cd /usr/local/spark

scp -r ./zookeeper-3.4.6 root@slave1:/usr/local/spark

scp -r ./zookeeper-3.4.6 root@slave2:/usr/local/spark

-       在slave1服務器上運行下面的命令

vi $ZOOKEEPER_HOME/data/myid

按照zoo.cfg的配置,myid的內容就是1。

-       在slave2服務器上運行下面的命令

vi $ZOOKEEPER_HOME/data/myid

按照zoo.cfg的配置,myid的內容就是2。

4)    啓動Zookeeper服務

-       在master服務器上運行下面的命令

zkServer.sh start

-       在slave1服務器上運行下面的命令

source /root/.bashrc

zkServer.sh start

-       在slave1服務器上運行下面的命令

source /root/.bashrc

zkServer.sh start

5)    驗證Zookeeper是否安裝和啓動成功

-       在master服務器上運行命令:jps和zkServer.sh status

root@master:/usr/local/spark/zookeeper-3.4.6/bin# jps

3844 QuorumPeerMain

4790 Jps

zkServer.sh status

root@master:/usr/local/spark/zookeeper-3.4.6/bin# zkServer.sh status

JMX enabled by default

Using config: /usr/local/spark/zookeeper-3.4.6/bin/../conf/zoo.cfg

Mode: follower

-       在slave1服務器上運行命令:jps和zkServer.sh status

source /root/.bashrc

root@slave1:/usr/local/spark/zookeeper-3.4.6/bin# jps

3462 QuorumPeerMain

4313 Jps

root@slave1:/usr/local/spark/zookeeper-3.4.6/bin# zkServer.sh status

JMX enabled by default

Using config: /usr/local/spark/zookeeper-3.4.6/bin/../conf/zoo.cfg

Mode: follower

-       在slave2服務器上運行命令:jps和zkServer.sh status

root@slave2:/usr/local/spark/zookeeper-3.4.6/bin# jps

4073 Jps

3277 QuorumPeerMain

root@slave2:/usr/local/spark/zookeeper-3.4.6/bin# zkServer.sh status

JMX enabled by default

Using config: /usr/local/spark/zookeeper-3.4.6/bin/../conf/zoo.cfg

Mode: leader

      至此,表明Zookeeper已經安裝和配置成功。

二、安裝和配置Kafka

本課中用到的Kafka版本是Kafka-2.10-0.9.0.1。

1)    下載Kafka 

進入http://kafka.apache.org/downloads.html,左鍵單擊kafka_2.10-0.9.0.1.tgz。提示:能夠直接下載羣裏的Kafka安裝文件。

下載kafka_2.10-0.9.0.1.tgz

1)    安裝Kafka

提示:下面的步驟發生在master服務器。

以ubuntu14.04舉例,把下載好的文件放到/root目錄,用下面的命令解壓:

cd /root

tar -zxvf kafka_2.10-0.9.0.1.tgz

解壓後在/root目錄會多出一個kafka_2.10-0.9.0.1的新目錄,用下面的命令把它剪切到指定目錄即安裝好Kafka了:

cd /root

mv kafka_2.10-0.9.0.1 /usr/local

以後在/usr/local目錄會多出一個kafka_2.10-0.9.0.1的新目錄。下面咱們講如何配置安裝好的Kafka。

2)    配置Kafka

提示:下面的步驟發生在master服務器。

  1. 配置.bashrc

-       打開文件:vi /root/.bashrc

-       在PATH配置行前添加:

export KAFKA_HOME=/usr/local/kafka_2.10-0.9.0.1

-       最後修改PATH:

export PATH=${JAVA_HOME}/bin:${ZOOKEEPER_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${SCALA_HOME}/bin:${SPARK_HOME}/bin:${SPARK_HOME}/sbin:${HIVE_HOME}/bin:${KAFKA_HOME}/bin:$PATH

-       使配置的環境變量當即生效:source /root/.bashrc

  1. 打開server.properties

-       cd $ZOOKEEPER_HOME/config

-       vi server.properties

  1. 配置server.properties

broker.id=0

port=9092

zookeeper.connect=master:2181,slave1:2181,slave2:2181

3)    同步master的安裝和配置到slave1和slave2

-       在master服務器上運行下面的命令

cd /root

scp ./.bashrc root@slave1:/root

scp ./.bashrc root@slave2:/root

cd /usr/local

scp -r ./kafka_2.10-0.9.0.1 root@slave1:/usr/local

scp -r ./kafka_2.10-0.9.0.1 root@slave2:/usr/local

-       在slave1服務器上運行下面的命令

vi $KAFKA_HOME/config/server.properties

修改broker.id=1。

-       在slave2服務器上運行下面的命令

vi $KAFKA_HOME/config/server.properties

修改broker.id=2。

4)    啓動Kafka服務

-       在master服務器上運行下面的命令,nohup,在集羣上終端不輸出啓動日誌

cd $KAFKA_HOME/bin

nohup ./kafka-server-start.sh ../config/server.properties &

-       在slave1服務器上運行下面的命令

source /root/.bashrc

cd $KAFKA_HOME/bin

nohup ./kafka-server-start.sh ../config/server.properties &

-       在slave2服務器上運行下面的命令

source /root/.bashrc

cd $KAFKA_HOME/bin

kafka-server-start.sh ../config/server.properties &

5)    驗證Kafka是否安裝和啓動成功

-       在任意服務器上運行命令建立Topic「HelloKafka」:

kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 3 --partitions 1 --topic HelloKafka

-       在任意服務器上運行命令爲建立的Topic「HelloKafka」生產一些消息:

kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092 --topic HelloKafka

輸入下面的消息內容:

This is DT_Spark!

I’m Rocky!

Life is short, you need Spark!

-       在任意服務器上運行命令從指定的Topic「HelloKafka」上消費(拉取)消息:

kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --from-beginning --topic HelloKafka

過一下子,你會看到打印的消息內容:

This is DT_Spark!

I’m Rocky!

Life is short, you need Spark!

-       在任意服務器上運行命令查看全部的Topic名字:

kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181

-       在任意服務器上運行命令查看指定Topic的概況:

kafka-topics.sh --describe --zookeepermaster:2181,slave1:2181,slave2:2181 --topic HelloKafka

至此,表明Kafka已經安裝和配置成功。

相關文章
相關標籤/搜索