大屏幕上實時跳動的營業額,背後的技術結構。前端
現現在,咱們來到了數據時代,數據信息化與咱們的生活與工做息息相關。此篇文章簡述利用大數據框架,實時處理數據的流程與相關框架的介紹,主要包括:java
- 數據實時處理的概念和意義
- 數據實時處理能作什麼
- 數據實時處理架構簡介
- 數據實時處理代碼演示
什麼是數據實時處理呢?我我的對數據實時處理的理解爲:數據從生成->實時採集->實時緩存存儲->(準)實時計算->實時落地->實時展現->實時分析。這一個流程線下來,處理數據的速度在秒級甚至毫秒級。node
數據實時處理有什麼意義呢?咱們獲得數據能夠進行數據分析,利用數據統計方法,從錯綜複雜的數據關係中梳理出事物的聯繫,好比發展趨勢、影響因素、因果關係等。甚至創建一些BI,對一些數據的有用信息進行可視化呈現,並造成數據故事。算法
何爲數據的實時計算?咱們從數據源端拿到數據,可能不盡如人意,咱們想對獲得的數據進行 ETL 操做、或者進行關聯等等,那麼咱們就會用到數據的實時計算。目前主流的實時計算框架有 spark,storm,flink 等。sql
數據的實時落地,意思是將咱們的源數據或者計算好的數據進行實時的存儲。在大數據領域,推薦使用 HDFS,ES 等進行存儲。數據庫
咱們拿到了數據,要會用數據的價值。數據的價值體如今數據中相互關聯關係,或與歷史關聯,或能預測將來。咱們實時獲得數據,不只可以利用前端框架進行實時展現,還能夠對其中的一些數據進行算法訓練,預測將來走勢等。apache
example:bootstrap
淘寶雙 11 大屏,每一年的雙 11 是淘寶粉絲瘋狂的日子。馬雲會在雙 11 的當天在阿里總部豎起一面大的電子屏幕,展現淘寶這一天的成績。例如成交額,訪問人數,訂單量,下單量,成交量等等。這個電子大屏的背後,就是用到的咱們所說的數據的實時處理。首先,阿里的服務器遍及全國各地,這些服務器收集PC端、手機端等日誌,上報到服務器,在服務上部署數據採集工具。接下來,因爲數據量龐大,須要作數據的緩存緩衝處理。下一步,對原始日誌進行實時的計算,好比篩選出上面所述的各個指標。最後,經過接口或者其餘形式,進行前端屏幕的實時展現。緩存
接下來是咱們介紹的重點,先放一張數據流程圖:前端框架
下面將分別簡單的介紹下各個組件:
flume 是一個分佈式的數據收集系統,具備高可靠、高可用、事務管理、失敗重啓、聚合和傳輸等功能。數據處理速度快,徹底能夠用於生產環境。
flume 的核心概念有:event,agent,source,channel,sink
flume 的數據流由事件 (event) 貫穿始終。event 是 flume 的基本數據單位,它攜帶日誌數據而且攜帶數據的頭信息,這些 event 由 agent 外部的 source 生成,當 source 捕獲事件後會進行特定的格式化,而後 source 會把事件推入 channel 中。能夠把 channel 看做是一個緩衝區,它將保存事件直到 sink 處理完該事件。sink 負責持久化日誌或者把事件推向另外一個 source。
flume 的核心是 agent。agent 是一個 java 進程,運行在日誌收集端,經過 agent 接收日誌,而後暫存起來,再發送到目的地。 每臺機器運行一個 agent。 agent 裏面能夠包含多個 source,channel,sink。
source 是數據的收集端,負責將數據捕獲後進行特殊的格式化,將數據封裝到 event 裏,而後將事件推入 channel 中。flume 提供了不少內置的 source,支持 avro,log4j,syslog 等等。若是內置的 source 沒法知足環境的需求,flume 還支持自定義 source。
channel 是鏈接 source 和 sink 的組件,你們能夠將它看作一個數據的緩衝區(數據隊列),它能夠將事件暫存到內存中也能夠持久化到本地磁盤上, 直到 sink 處理完該事件。兩個較爲經常使用的 channel,MemoryChannel 和 FileChannel。
sink 從 channel 中取出事件,而後將數據發到別處,能夠向文件系統、數據庫、hadoop、kafka,也能夠是其餘 agent 的 source。
口述抽象,上兩張官網貼圖:
單個 agent 收集數據流程圖
多個 agent 協做處理數據流程圖Kafka 是一個高吞吐量的分佈式發佈-訂閱消息系統。企業中通常使用 kafka 作消息中間件,作緩衝緩存處理。須要 zookeeper 分佈式協調組件管理。
kafka 的設計目標:
kafka 核心概念
貼兩張官網圖
prodecer-broker-consumer
分區圖spark 是一個分佈式的計算框架,是我目前認爲最火的計算框架。
spark,是一種"one stack to rulethem all"的大數據計算框架,指望使用一個技術棧就完美地解決大數據領域的各類計算任務。apache 官方,對 spark 的定義是:通用的大數據快速處理引擎(一「棧」式)。
貼個spark架構圖
須要搭建 flume 集羣,kafka 集羣,es 集羣,zookeeper 集羣,因爲本例 spark 是在本地模式運行,因此無需搭建 spark 集羣。
搭建好集羣后,根據集羣組件直接的整合關係,配置好配置文件。其中主要的配置爲 flume 的配置,以下圖:
能夠看到,咱們的 agent 的 source 爲 r1,channel 爲 c1,sink 爲 k1,source 爲我本地 nc 服務,收集日誌時,只須要打開 9999 端口就能夠把日誌收集。channel 選擇爲 memory 內存模式。sink 爲 kafka 的 topic8 主題。建立好 es 對應的表,表有三個字段,對應代碼裏面的 case class(代碼隨後貼上)。
代碼以下:
package run
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.Logger
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.spark.rdd.EsSpark
/** * @author wangjx * 測試kafka數據進行統計 kafka自身維護offset(建議使用自定義維護方式維護偏移量) */
object SparkStreamingAutoOffsetKafka {
//定義樣例類 與es表對應
case class people(name:String,country:String,age:Int)
def main(args: Array[String]): Unit = {
val logger = Logger.getLogger(this.getClass);
//spark 配置
val conf = new SparkConf().setAppName("SparkStreamingAutoOffsetKafka").setMaster("local[2]")
conf.set("es.index.auto.create","true")
conf.set("es.nodes","127.0.0.1")
conf.set("es.port","9200")
//spark streaming實時計算初始化 定義每10秒一個批次 準實時處理 企業通常都是準實時 好比每隔10秒統計近1分鐘的數據等等
val ssc = new StreamingContext(conf, Seconds(10))
val spark = SparkSession.builder()
.config(conf)
.getOrCreate()
spark.sparkContext.setLogLevel("WARN");
//設置kafka參數
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "x:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "exactly-once",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//kafka主題
val topic = Set("kafka8")
//從kafka獲取數據
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topic, kafkaParams)
)
//具體的業務邏輯
val kafkaValue: DStream[String] = stream.flatMap(line=>Some(line.value()))
val peopleStream = kafkaValue
.map(_.split(":"))
//造成people樣例對象
.map(m=>people(m(0),m(1),m(2).toInt))
//存入ES
peopleStream.foreachRDD(rdd =>{
EsSpark.saveToEs(rdd, "people/man")
})
//啓動程序入口
ssc.start()
ssc.awaitTermination()
}
}
複製代碼
文 / 皮蛋二哥
一直覺得只要保持低調,就沒人知道其實我是一名做家。
——迷同樣的皮蛋二哥
編 / 熒聲
本文已由做者受權發佈,版權屬於創宇前端。歡迎註明出處轉載本文。本文連接:knownsec-fed.com/2018-08-31-…
想要看到更多來自知道創宇開發一線的分享,請搜索關注咱們的微信公衆號:創宇前端(KnownsecFED)。歡迎留言討論,咱們會盡量回復。
感謝您的閱讀。