Spark Core讀取ES的分區問題分析

Spark Core讀取ES的分區問題分析

浪院長 浪尖聊大數據 node

ES也是比較火熱,在日誌數據分析,規則分析等確實很方便,說實話用es stack 浪尖以爲能夠解決不少公司的數據分析需求。極客時間下週一要上線新的ES課程,有須要的暫時別購買,到時候還找浪尖返現吧。git

寫這篇文章的緣由是前兩天星球球友去面試,面試管問了一下,Spark 分析ES的數據,生成的RDD分區數跟什麼有關係呢?github

稍微猜想一下就能想到跟分片數有關,可是具體是什麼關係呢?面試

可想的具體關係多是如下兩種:apache

1).就像KafkaRDD的分區與kafka topic分區數的關係同樣,一對一。json

2).ES支持遊標查詢,那麼是否是也能夠對比較大的分片進行拆分紅多個RDD分區呢?app

那麼下面浪尖帶着你們翻一下源碼看看具體狀況。elasticsearch

1.Spark Core讀取ES

ES官網直接提供的有elasticsearch-hadoop 插件,對於ES 7.x,hadoop和Spark版本支持以下:ide

hadoop2Version  = 2.7.1
hadoop22Version = 2.2.0
spark13Version = 1.6.2
spark20Version = 2.3.0

浪尖這了採用的ES版本是7.1.1,測試用的Spark版本是2.3.1,沒有問題。整合es和spark,導入相關依賴有兩種方式:函數

a,導入整個elasticsearch-hadoop包

<dependency>
      <groupId>org.elasticsearch</groupId>
      <artifactId>elasticsearch-hadoop</artifactId>
      <version>7.1.1</version>
    </dependency>

b,只導入spark模塊的包

<dependency>
      <groupId>org.elasticsearch</groupId>
      <artifactId>elasticsearch-spark-20_2.11</artifactId>
      <version>7.1.1</version>
    </dependency>

浪尖這裏爲了測試方便,只是在本機起了一個單節點的ES實例,簡單的測試代碼以下:

import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.hadoop.cfg.ConfigurationOptions

object es2sparkrdd {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName)

    conf.set(ConfigurationOptions.ES_NODES, "127.0.0.1")
    conf.set(ConfigurationOptions.ES_PORT, "9200")
    conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
    conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")
    conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")
//    conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser)
//    conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd)
    conf.set("es.write.rest.error.handlers", "ignoreConflict")
    conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")

    val sc = new SparkContext(conf)
    import org.elasticsearch.spark._

    sc.esRDD("posts").foreach(each=>{
      each._2.keys.foreach(println)
    })
    sc.esJsonRDD("posts").foreach(each=>{
      println(each._2)
    })

    sc.stop()
  }
}

能夠看到Spark Core讀取RDD主要有兩種形式的API:

a,esRDD。這種返回的是一個tuple2的類型的RDD,第一個元素是id,第二個是一個map,包含ES的document元素。

RDD[(String, Map[String, AnyRef])]

b,esJsonRDD。這種返回的也是一個tuple2類型的RDD,第一個元素依然是id,第二個是json字符串。

RDD[(String, String)]

雖然是兩種類型的RDD,可是RDD都是ScalaEsRDD類型。

要分析Spark Core讀取ES的並行度,只須要分析ScalaEsRDD的getPartitions函數便可。

2.源碼分析

首先導入源碼https://github.com/elastic/elasticsearch-hadoop這個是gradle工程,能夠直接導入idea,而後切換到7.x版本便可。

廢話少說直接找到ScalaEsRDD,發現gePartitions是在其父類實現的,方法內容以下:

override def getPartitions: Array[Partition] = {
    esPartitions.zipWithIndex.map { case(esPartition, idx) =>
      new EsPartition(id, idx, esPartition)
    }.toArray
  }

esPartitions是一個lazy型的變量:

@transient private[spark] lazy val esPartitions = {
    RestService.findPartitions(esCfg, logger)
  }

這種聲明緣由是什麼呢?

lazy+transient的緣由你們能夠考慮一下。

RestService.findPartitions方法也是僅是建立客戶端獲取分片等信息,而後調用,分兩種狀況調用兩個方法。

final List<PartitionDefinition> partitions;
//            5.x及之後版本 同時沒有配置es.input.max.docs.per.partition
if (clusterInfo.getMajorVersion().onOrAfter(EsMajorVersion.V_5_X) && settings.getMaxDocsPerPartition() != null) {
     partitions = findSlicePartitions(client.getRestClient(), settings, mapping, nodesMap, shards, log);
} else {
     partitions = findShardPartitions(settings, mapping, nodesMap, shards, log);
}

a).findSlicePartitions

這個方法其實就是在5.x及之後的ES版本,同時配置了

es.input.max.docs.per.partition

之後,纔會執行,實際上就是將ES的分片按照指定大小進行拆分,必然要先進行分片大小統計,而後計算出拆分的分區數,最後生成分區信息。具體代碼以下:

long numDocs;
if (readResource.isTyped()) {
    numDocs = client.count(index, readResource.type(), Integer.toString(shardId), query);
} else {
    numDocs = client.countIndexShard(index, Integer.toString(shardId), query);
}
int numPartitions = (int) Math.max(1, numDocs / maxDocsPerPartition);
for (int i = 0; i < numPartitions; i++) {
    PartitionDefinition.Slice slice = new PartitionDefinition.Slice(i, numPartitions);
    partitions.add(new PartitionDefinition(settings, resolvedMapping, index, shardId, slice, locations));
}

實際上分片就是用遊標的方式,對_doc進行排序,而後按照分片計算獲得的分區偏移進行數據的讀取,組裝過程是SearchRequestBuilder.assemble方法來實現的。

這個其實我的以爲會浪費必定的性能,假如真的要ES結合Spark的話,建議合理設置分片數。

b).findShardPartitions方法

這個方法沒啥疑問了就是一個RDD分區對應於ES index的一個分片。

PartitionDefinition partition = new PartitionDefinition(settings, resolvedMapping, index, shardId,
locationList.toArray(new String[0]));
partitions.add(partition);

3.總結

以上就是Spark Core讀取ES數據的時候分片和RDD分區的對應關係分析,默認狀況下是一個es 索引分片對應Spark RDD的一個分區。假如分片數過大,且ES版本在5.x及以上,能夠配置參數

es.input.max.docs.per.partition

進行拆分。

相關文章
相關標籤/搜索