日誌服務(SLS)集成 Spark 流計算實戰

前言

日誌服務做爲一站式的日誌的採集與分析平臺,提供了各類用戶場景的日誌採集能力,經過日誌服務提供的各類與·與SDK,採集客戶端(Logtail),Producer,用戶能夠很是容易的把各類數據源中的數據採集到日誌服務的Logstore中。同時爲了便於用戶對日誌進行處理,提供了各類支持流式消費的SDK,如各類語言的消費組,與 Spark,Flink,Storm 等各類流計算技術無縫對接的Connector,以便於用戶根據本身的業務場景很是便捷的處理海量日誌。html

從最先的Spark Streaming到最新的Stuctured Streaming,Spark 一直是最流行的流計算框架之一。使用日誌服務的Spark SDK,能夠很是方便的在Spark 中消費日誌服務中的數據,同時也支持將 Spark 的計算結果寫入日誌服務。git

日誌服務基礎概念

日誌服務的存儲層是一個相似Kafka的Append only的FIFO消息隊列,包含以下基本概念:github

  • 日誌(Log):由時間、及一組不定個數的Key-Value對組成。
  • 日誌組(LogGroup):一組日誌的集合,包含相同Meta信息如Topic,Source,Tags等。是讀寫的基本單位。

  • Shard:分區,LogGroup讀寫基本單元,對應於Kafka的partition。
  • Logstore:日誌庫,用以存放同一類日誌數據。Logstore會包含1個或多個Shard。
  • Project:Logstore存放容器,包含一個或者多個Logstore。

準備工做

1)添加Maven依賴:sql

<dependency>
   <groupId>com.aliyun.emr</groupId>
   <artifactId>emr-logservice_2.11</artifactId>
   <version>1.9.0</version>
</dependency>

Github源碼下載
2)計劃消費的日誌服務project,logstore以及對應的endpoint。
3)用於訪問日誌服務Open API的Access Key。apache

對 Spark Streaming 的支持

Spark Streaming是Spark最先推出的流計算技術,如今已經進入維護狀態,再也不會增長新的功能。可是考慮到Spark Streaming 的使用仍然很是普遍,咱們先從Spark Streaming開始介紹。Spark Streaming 提供了一個DStream 的數據模型抽象,本質是把無界數據集拆分紅一個一個的RDD,轉化爲有界數據集的流式計算。每一個批次處理的數據就是這段時間內從日誌服務消費到的數據。併發

圖-2 DStream

Spark Streaming 從日誌服務消費支持 Receiver 和 Direct 兩種消費方式。app

Receiver模式

Receivers的實現內部實現基於日誌服務的消費組(Consumer Library)。數據拉取與處理徹底分離。消費組自動均勻分配Logstore內的全部shard到全部的Receiver,而且自動提交checkpoint到SLS。這就意味着Logstore內的shard個數與Spark 實際的併發沒有對應關係。
對於全部的Receiver,接收到的數據默認會保存在Spark Executors中,因此Failover的時候有可能形成數據丟失,這個時候就須要開啓WAL日誌,Failover的時候能夠從WAL中恢復,防止丟失數據。框架

SDK將SLS中的每行日誌解析爲JSON字符串形式,Receiver使用示例以下所示:ide

object SLSReceiverSample {
  def main(args: Array[String]): Unit = {
    val project = "your project"
    val logstore = "your logstore"
    val consumerGroup = "consumer group"
    val endpoint = "your endpoint"
    val accessKeyId = "access key id"
    val accessKeySecret = "access key secret"
    val batchInterval = Milliseconds(5 * 1000)

    val conf = new SparkConf().setAppName("Test SLS Loghub")
    val ssc = new StreamingContext(conf, batchInterval)
    val stream = LoghubUtils.createStream(
      ssc,
      project,
      logstore,
      consumerGroup,
      endpoint,
      accessKeyId,
      accessKeySecret,
      StorageLevel.MEMORY_AND_DISK,
      LogHubCursorPosition.END_CURSOR)

    stream.checkpoint(batchInterval * 2).foreachRDD(rdd =>
      rdd.map(bytes => new String(bytes)).top(10).foreach(println)
    )
    ssc.checkpoint("hdfs:///tmp/spark/streaming")
    ssc.start()
    ssc.awaitTermination()
  }
}

除Project,Logstore,Access Key 這些基礎配置外,還能夠指定StorageLevel,消費開始位置等。函數

Direct模式

Direct模式再也不須要Receiver,也不依賴於消費組,而是使用日誌服務的低級API,在每一個批次內直接從服務端拉取數據處理。對於Logstore中的每一個Shard來講,每一個批次都會讀取指定位置範圍內的數據。爲了保證一致性,只有在每一個批次確認正常結束以後才能把每一個Shard的消費結束位置(checkpoint)保存到服務端。

爲了實現Direct模式,SDK依賴一個本地的ZooKeeper,每一個shard的checkpoint會臨時保存到本地的ZooKeeper,等用戶手動提交checkpoint時,再從ZooKeeper中同步到服務端。Failover時也是先從本地ZooKeeper中嘗試讀上一次的checkpoint,若是沒有讀到再從服務端獲取。

object SLSDirectSample {
  def main(args: Array[String]): Unit = {
    val project = "your project"
    val logstore = "your logstore"
    val consumerGroup = "consumerGroup"
    val endpoint = "endpoint"
    val accessKeyId = "access key id"
    val accessKeySecret = "access key secret"
    val batchInterval = Milliseconds(5 * 1000)
    val zkAddress = "localhost:2181"
    val conf = new SparkConf().setAppName("Test Direct SLS Loghub")
    val ssc = new StreamingContext(conf, batchInterval)
    val zkParas = Map("zookeeper.connect" -> zkAddress)
    val loghubStream = LoghubUtils.createDirectStream(
      ssc,
      project,
      logstore,
      consumerGroup,
      accessKeyId,
      accessKeySecret,
      endpoint,
      zkParas,
      LogHubCursorPosition.END_CURSOR)

    loghubStream.checkpoint(batchInterval).foreachRDD(rdd => {
      println(s"count by key: ${rdd.map(s => {
        s.sorted
        (s.length, s)
      }).countByKey().size}")
      // 手動更新checkpoint
      loghubStream.asInstanceOf[CanCommitOffsets].commitAsync()
    })
    ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
    ssc.start()
    ssc.awaitTermination()
  }
}

Direct模式示例

如何限速

在Receiver中,若是須要限制消費速度,咱們只須要調整 Consumer Library 自己的參數便可。而Direct方式是在每一個批次開始時從SLS拉取數據,這就涉及到一個問題:一個批次內拉取多少數據才合適。若是太多,一個批次內處理不完,形成處理延時。若是太少會導worker空閒,工做不飽和,消費延時。這個時候咱們就須要合理配置拉取的速度和行數,實現一個批次儘量多處理又能及時完成的目標。理想狀態下Spark 消費的總體速率應該與SLS採集速率一致,才能實現真正的實時處理。

因爲SLS的數據模型是以LogGroup做爲讀寫的基本單位,而一個LogGroup中可能包含上萬行日誌,這就意味着Spark中直接限制每一個批次的行數難以實現。所以,Direct限流涉及到兩個配置參數:

參數 說明 默認值
spark.streaming.loghub.maxRatePerShard 每一個批次每一個Shard讀取行數,決定了限流的下限 10000
spark.loghub.batchGet.step 每次請求讀取LogGroup個數,決定了限流的粒度 100

能夠經過適當縮小spark.loghub.batchGet.step來控制限流的精度,可是即使如此,在某些狀況下仍是會存在較大偏差,如一個LogGroup中存在10000行日誌,spark.streaming.loghub.maxRatePerShard設置爲100,spark.loghub.batchGet.step設置爲1,那一個批次內該shard仍是會拉取10000行日誌。

兩種模式的對比

和Receiver相比,Direct有以下的優點:

  1. 下降資源消耗,不須要佔用Executor資源來做爲Receiver的角色。
  2. 魯棒性更好,在計算的時候纔會從服務端真正消費數據,下降內存使用,再也不須要WAL,Failover 直接在讀一次就好了,更容易實現exactly once語義。
  3. 簡化並行。Spark partition 與 Logstore 的 shard 個數對應,增長shard個數就能提升Spark任務處理併發上限。

可是也存在一些缺點:

  1. 在SLS場景下,須要依賴本地的 ZooKeeper 來保存臨時 checkpoint,當調用 commitAsync 時從 ZooKeeper同步到日誌服務服務端。因此當須要重置 checkpoint 時,也須要先刪除本地 ZooKeeper 中的 checkpoint 才能生效。
  2. 上一個批次保存 checkpoint 以前,下一個批次沒法真正開始,不然 ZooKeeper 中的 checkpoint 可能會被更新成一箇中間狀態。目前SDK在每一個批次會檢查是否上一個批次的 checkpoint 尚未提交,若是沒有提交則生成一個空批次,而不是繼續從服務端消費。
  3. 在 SLS 場景下,限流方式不夠精確。

Spark Streaming結果寫入SLS

與消費SLS相反,Spark Streaming的處理結果也能夠直接寫入SLS。使用示例:

...
    val lines = loghubStream.map(x => x)

 // 轉換函數把結果中每條記錄轉爲一行日誌
    def transformFunc(x: String): LogItem = {
      val r = new LogItem()
      r.PushBack("key", x)
      r
    }

    val callback = new Callback with Serializable {
      override def onCompletion(result: Result): Unit = {
        println(s"Send result ${result.isSuccessful}")
      }
    }
    // SLS producer config
    val producerConfig = Map(
      "sls.project" -> loghubProject,
      "sls.logstore" -> targetLogstore,
      "access.key.id" -> accessKeyId,
      "access.key.secret" -> accessKeySecret,
      "sls.endpoint" -> endpoint,
      "sls.ioThreadCount" -> "2"
    )
    lines.writeToLoghub(
      producerConfig,
      "topic",
      "streaming",
      transformFunc, Option.apply(callback))

    ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
    ssc.start()
    ssc.awaitTermination()

對Structured Streaming的支持

Structured  Streaming 並非最近纔出現的技術,而是早在16年就已經出現,可是直到 Spark 2.2.0 才正式推出。其數據模型是基於無界表的概念,流數據至關於往一個表上不斷追加行。

圖-3 無界表模型

與Spark Streaming相比,Structured Streaming主要有以下特色:

  1. 底層實現基於Spark SQL引擎,可使用大多數Spark SQL的函數。和Spark SQL共用大部分API,若是對Spark SQL熟悉的用戶,很是容易上手。複用Spark SQL的執行引用,性能更佳。
  2. 支持 Process time 和 Event time,而Spark Streaming只支持 Process Time。
  3. 批流同一的API。Structured Streaming 複用Spark SQL的 DataSet/DataFrame模型,和 RDD/DStream相比更High level,易用性更好。
  4. 實時性更好,默認基於micro-batch模式。在 Spark 2.3 中,還增長了連續處理模型,號稱能夠作到毫秒級延遲。
  5. API 對用戶更友好,只保留了SparkSession一個入口,不須要建立各類Context對象,使用起來更簡單。

SDK使用示例

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}

object StructuredStreamingDemo {
  def main(args: Array[String]) {

    val spark = SparkSession
      .builder
      .appName("StructuredLoghubWordCount")
      .master("local")
      .getOrCreate()

    import spark.implicits._
    val schema = new StructType(
      Array(StructField("content", StringType)))
    val lines = spark
      .readStream
      .format("loghub")
      .schema(schema)
      .option("sls.project", "your project")
      .option("sls.store", "your logstore")
      .option("access.key.id", "your access key id")
      .option("access.key.secret", "your access key secret")
      .option("endpoint", "your endpoint")
      .option("startingoffsets", "latest")
      .load()
      .select("content")
      .as[String]

    val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("loghub")
      .option("sls.project", "sink project")
      .option("sls.store", "sink logstore")
      .option("access.key.id", "your access key id")
      .option("access.key.secret", "your access key secret")
      .option("endpoint", "your endpoint")
      .option("checkpointLocation", "your checkpoint dir")
      .start()

    query.awaitTermination()
  }
}

代碼解釋:
1)schema 聲明瞭咱們須要的字段,除了日誌中的字段外,還有以下的內部字段:

__logProject__
__logStore__
__shard__
__time__
__topic__
__source__
__sequence_number__ // 每行日誌惟一id

若是沒有指定schema,SDK默認提供一個__value__字段,其內容爲由全部字段組成的一個JSON字符串。

2)lines 定義了一個流。
startingoffsets:開始位置,支持:

  • latest :日誌服務最新寫入位置。強烈建議從latest開始,從其餘位置開始意味着須要先處理歷史數據,可能須要等待較長時間才能結束。
  • earliest:日誌服務中最先的日誌對應的位置。
  • 或者爲每一個shard指定一個開始時間,以JSON形式指定。

maxOffsetsPerTrigger:批次讀取行數,SDK中默認是64*1024 。

3)結果寫入到日誌服務
format 指定爲Loghub便可。

不足之處

  1. 不支持手動提交checkpoint,SDK內部自動保存checkpoint到checkpointLocation中。
  2. 再也不須要提供consumerGroup名稱,也就是說checkpoint沒有保存到SLS服務端,沒法在日誌服務裏面監控消費延遲,只能經過Spark 任務日誌觀察消費進度。

參考資料

官方文檔:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
SLS SDK例子:https://github.com/aliyun/aliyun-emapreduce-sdk/tree/master-2.x/examples/src/main/scala/com/aliyun/emr/examples/sql/streaming
日誌服務實時消費:https://help.aliyun.com/document_detail/28998.html


本文做者:liketic

閱讀原文

本文爲阿里雲內容,未經容許不得轉載。

相關文章
相關標籤/搜索