日誌服務做爲一站式的日誌的採集與分析平臺,提供了各類用戶場景的日誌採集能力,經過日誌服務提供的各類與·與SDK,採集客戶端(Logtail),Producer,用戶能夠很是容易的把各類數據源中的數據採集到日誌服務的Logstore中。同時爲了便於用戶對日誌進行處理,提供了各類支持流式消費的SDK,如各類語言的消費組,與 Spark,Flink,Storm 等各類流計算技術無縫對接的Connector,以便於用戶根據本身的業務場景很是便捷的處理海量日誌。html
從最先的Spark Streaming到最新的Stuctured Streaming,Spark 一直是最流行的流計算框架之一。使用日誌服務的Spark SDK,能夠很是方便的在Spark 中消費日誌服務中的數據,同時也支持將 Spark 的計算結果寫入日誌服務。git
日誌服務的存儲層是一個相似Kafka的Append only的FIFO消息隊列,包含以下基本概念:github
1)添加Maven依賴:sql
<dependency> <groupId>com.aliyun.emr</groupId> <artifactId>emr-logservice_2.11</artifactId> <version>1.9.0</version> </dependency>
Github源碼下載。
2)計劃消費的日誌服務project,logstore以及對應的endpoint。
3)用於訪問日誌服務Open API的Access Key。apache
Spark Streaming是Spark最先推出的流計算技術,如今已經進入維護狀態,再也不會增長新的功能。可是考慮到Spark Streaming 的使用仍然很是普遍,咱們先從Spark Streaming開始介紹。Spark Streaming 提供了一個DStream 的數據模型抽象,本質是把無界數據集拆分紅一個一個的RDD,轉化爲有界數據集的流式計算。每一個批次處理的數據就是這段時間內從日誌服務消費到的數據。併發
Spark Streaming 從日誌服務消費支持 Receiver 和 Direct 兩種消費方式。app
Receivers的實現內部實現基於日誌服務的消費組(Consumer Library)。數據拉取與處理徹底分離。消費組自動均勻分配Logstore內的全部shard到全部的Receiver,而且自動提交checkpoint到SLS。這就意味着Logstore內的shard個數與Spark 實際的併發沒有對應關係。
對於全部的Receiver,接收到的數據默認會保存在Spark Executors中,因此Failover的時候有可能形成數據丟失,這個時候就須要開啓WAL日誌,Failover的時候能夠從WAL中恢復,防止丟失數據。框架
SDK將SLS中的每行日誌解析爲JSON字符串形式,Receiver使用示例以下所示:ide
object SLSReceiverSample { def main(args: Array[String]): Unit = { val project = "your project" val logstore = "your logstore" val consumerGroup = "consumer group" val endpoint = "your endpoint" val accessKeyId = "access key id" val accessKeySecret = "access key secret" val batchInterval = Milliseconds(5 * 1000) val conf = new SparkConf().setAppName("Test SLS Loghub") val ssc = new StreamingContext(conf, batchInterval) val stream = LoghubUtils.createStream( ssc, project, logstore, consumerGroup, endpoint, accessKeyId, accessKeySecret, StorageLevel.MEMORY_AND_DISK, LogHubCursorPosition.END_CURSOR) stream.checkpoint(batchInterval * 2).foreachRDD(rdd => rdd.map(bytes => new String(bytes)).top(10).foreach(println) ) ssc.checkpoint("hdfs:///tmp/spark/streaming") ssc.start() ssc.awaitTermination() } }
除Project,Logstore,Access Key 這些基礎配置外,還能夠指定StorageLevel,消費開始位置等。函數
Direct模式再也不須要Receiver,也不依賴於消費組,而是使用日誌服務的低級API,在每一個批次內直接從服務端拉取數據處理。對於Logstore中的每一個Shard來講,每一個批次都會讀取指定位置範圍內的數據。爲了保證一致性,只有在每一個批次確認正常結束以後才能把每一個Shard的消費結束位置(checkpoint)保存到服務端。
爲了實現Direct模式,SDK依賴一個本地的ZooKeeper,每一個shard的checkpoint會臨時保存到本地的ZooKeeper,等用戶手動提交checkpoint時,再從ZooKeeper中同步到服務端。Failover時也是先從本地ZooKeeper中嘗試讀上一次的checkpoint,若是沒有讀到再從服務端獲取。
object SLSDirectSample { def main(args: Array[String]): Unit = { val project = "your project" val logstore = "your logstore" val consumerGroup = "consumerGroup" val endpoint = "endpoint" val accessKeyId = "access key id" val accessKeySecret = "access key secret" val batchInterval = Milliseconds(5 * 1000) val zkAddress = "localhost:2181" val conf = new SparkConf().setAppName("Test Direct SLS Loghub") val ssc = new StreamingContext(conf, batchInterval) val zkParas = Map("zookeeper.connect" -> zkAddress) val loghubStream = LoghubUtils.createDirectStream( ssc, project, logstore, consumerGroup, accessKeyId, accessKeySecret, endpoint, zkParas, LogHubCursorPosition.END_CURSOR) loghubStream.checkpoint(batchInterval).foreachRDD(rdd => { println(s"count by key: ${rdd.map(s => { s.sorted (s.length, s) }).countByKey().size}") // 手動更新checkpoint loghubStream.asInstanceOf[CanCommitOffsets].commitAsync() }) ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory ssc.start() ssc.awaitTermination() } }
Direct模式示例
在Receiver中,若是須要限制消費速度,咱們只須要調整 Consumer Library 自己的參數便可。而Direct方式是在每一個批次開始時從SLS拉取數據,這就涉及到一個問題:一個批次內拉取多少數據才合適。若是太多,一個批次內處理不完,形成處理延時。若是太少會導worker空閒,工做不飽和,消費延時。這個時候咱們就須要合理配置拉取的速度和行數,實現一個批次儘量多處理又能及時完成的目標。理想狀態下Spark 消費的總體速率應該與SLS採集速率一致,才能實現真正的實時處理。
因爲SLS的數據模型是以LogGroup做爲讀寫的基本單位,而一個LogGroup中可能包含上萬行日誌,這就意味着Spark中直接限制每一個批次的行數難以實現。所以,Direct限流涉及到兩個配置參數:
參數 | 說明 | 默認值 |
---|---|---|
spark.streaming.loghub.maxRatePerShard | 每一個批次每一個Shard讀取行數,決定了限流的下限 | 10000 |
spark.loghub.batchGet.step | 每次請求讀取LogGroup個數,決定了限流的粒度 | 100 |
能夠經過適當縮小spark.loghub.batchGet.step來控制限流的精度,可是即使如此,在某些狀況下仍是會存在較大偏差,如一個LogGroup中存在10000行日誌,spark.streaming.loghub.maxRatePerShard設置爲100,spark.loghub.batchGet.step設置爲1,那一個批次內該shard仍是會拉取10000行日誌。
和Receiver相比,Direct有以下的優點:
可是也存在一些缺點:
與消費SLS相反,Spark Streaming的處理結果也能夠直接寫入SLS。使用示例:
... val lines = loghubStream.map(x => x) // 轉換函數把結果中每條記錄轉爲一行日誌 def transformFunc(x: String): LogItem = { val r = new LogItem() r.PushBack("key", x) r } val callback = new Callback with Serializable { override def onCompletion(result: Result): Unit = { println(s"Send result ${result.isSuccessful}") } } // SLS producer config val producerConfig = Map( "sls.project" -> loghubProject, "sls.logstore" -> targetLogstore, "access.key.id" -> accessKeyId, "access.key.secret" -> accessKeySecret, "sls.endpoint" -> endpoint, "sls.ioThreadCount" -> "2" ) lines.writeToLoghub( producerConfig, "topic", "streaming", transformFunc, Option.apply(callback)) ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory ssc.start() ssc.awaitTermination()
Structured Streaming 並非最近纔出現的技術,而是早在16年就已經出現,可是直到 Spark 2.2.0 才正式推出。其數據模型是基於無界表的概念,流數據至關於往一個表上不斷追加行。
與Spark Streaming相比,Structured Streaming主要有以下特色:
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{StringType, StructField, StructType} object StructuredStreamingDemo { def main(args: Array[String]) { val spark = SparkSession .builder .appName("StructuredLoghubWordCount") .master("local") .getOrCreate() import spark.implicits._ val schema = new StructType( Array(StructField("content", StringType))) val lines = spark .readStream .format("loghub") .schema(schema) .option("sls.project", "your project") .option("sls.store", "your logstore") .option("access.key.id", "your access key id") .option("access.key.secret", "your access key secret") .option("endpoint", "your endpoint") .option("startingoffsets", "latest") .load() .select("content") .as[String] val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count() val query = wordCounts.writeStream .outputMode("complete") .format("loghub") .option("sls.project", "sink project") .option("sls.store", "sink logstore") .option("access.key.id", "your access key id") .option("access.key.secret", "your access key secret") .option("endpoint", "your endpoint") .option("checkpointLocation", "your checkpoint dir") .start() query.awaitTermination() } }
代碼解釋:
1)schema 聲明瞭咱們須要的字段,除了日誌中的字段外,還有以下的內部字段:
__logProject__ __logStore__ __shard__ __time__ __topic__ __source__ __sequence_number__ // 每行日誌惟一id
若是沒有指定schema,SDK默認提供一個__value__字段,其內容爲由全部字段組成的一個JSON字符串。
2)lines 定義了一個流。
startingoffsets:開始位置,支持:
maxOffsetsPerTrigger:批次讀取行數,SDK中默認是64*1024 。
3)結果寫入到日誌服務
format 指定爲Loghub便可。
官方文檔:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
SLS SDK例子:https://github.com/aliyun/aliyun-emapreduce-sdk/tree/master-2.x/examples/src/main/scala/com/aliyun/emr/examples/sql/streaming
日誌服務實時消費:https://help.aliyun.com/document_detail/28998.html
本文做者:liketic
本文爲阿里雲內容,未經容許不得轉載。