浪院長 浪尖聊大數據 node
ES也是比較火熱,在日誌數據分析,規則分析等確實很方便,說實話用es stack 浪尖以爲能夠解決不少公司的數據分析需求。極客時間下週一要上線新的ES課程,有須要的暫時別購買,到時候還找浪尖返現吧。git
寫這篇文章的緣由是前兩天星球球友去面試,面試管問了一下,Spark 分析ES的數據,生成的RDD分區數跟什麼有關係呢?github
稍微猜想一下就能想到跟分片數有關,可是具體是什麼關係呢?面試
可想的具體關係多是如下兩種:apache
1).就像KafkaRDD的分區與kafka topic分區數的關係同樣,一對一。json
2).ES支持遊標查詢,那麼是否是也能夠對比較大的分片進行拆分紅多個RDD分區呢?app
那麼下面浪尖帶着你們翻一下源碼看看具體狀況。elasticsearch
ES官網直接提供的有elasticsearch-hadoop 插件,對於ES 7.x,hadoop和Spark版本支持以下:ide
hadoop2Version = 2.7.1 hadoop22Version = 2.2.0 spark13Version = 1.6.2 spark20Version = 2.3.0
浪尖這了採用的ES版本是7.1.1,測試用的Spark版本是2.3.1,沒有問題。整合es和spark,導入相關依賴有兩種方式:函數
a,導入整個elasticsearch-hadoop包
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop</artifactId> <version>7.1.1</version> </dependency>
b,只導入spark模塊的包
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>7.1.1</version> </dependency>
浪尖這裏爲了測試方便,只是在本機起了一個單節點的ES實例,簡單的測試代碼以下:
import org.apache.spark.{SparkConf, SparkContext} import org.elasticsearch.hadoop.cfg.ConfigurationOptions object es2sparkrdd { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName) conf.set(ConfigurationOptions.ES_NODES, "127.0.0.1") conf.set(ConfigurationOptions.ES_PORT, "9200") conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true") conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true") conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false") // conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser) // conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd) conf.set("es.write.rest.error.handlers", "ignoreConflict") conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler") val sc = new SparkContext(conf) import org.elasticsearch.spark._ sc.esRDD("posts").foreach(each=>{ each._2.keys.foreach(println) }) sc.esJsonRDD("posts").foreach(each=>{ println(each._2) }) sc.stop() } }
能夠看到Spark Core讀取RDD主要有兩種形式的API:
a,esRDD。這種返回的是一個tuple2的類型的RDD,第一個元素是id,第二個是一個map,包含ES的document元素。
RDD[(String, Map[String, AnyRef])]
b,esJsonRDD。這種返回的也是一個tuple2類型的RDD,第一個元素依然是id,第二個是json字符串。
RDD[(String, String)]
雖然是兩種類型的RDD,可是RDD都是ScalaEsRDD類型。
要分析Spark Core讀取ES的並行度,只須要分析ScalaEsRDD的getPartitions函數便可。
首先導入源碼https://github.com/elastic/elasticsearch-hadoop這個是gradle工程,能夠直接導入idea,而後切換到7.x版本便可。
廢話少說直接找到ScalaEsRDD,發現gePartitions是在其父類實現的,方法內容以下:
override def getPartitions: Array[Partition] = { esPartitions.zipWithIndex.map { case(esPartition, idx) => new EsPartition(id, idx, esPartition) }.toArray }
esPartitions是一個lazy型的變量:
@transient private[spark] lazy val esPartitions = { RestService.findPartitions(esCfg, logger) }
這種聲明緣由是什麼呢?
lazy+transient的緣由你們能夠考慮一下。
RestService.findPartitions方法也是僅是建立客戶端獲取分片等信息,而後調用,分兩種狀況調用兩個方法。
final List<PartitionDefinition> partitions; // 5.x及之後版本 同時沒有配置es.input.max.docs.per.partition if (clusterInfo.getMajorVersion().onOrAfter(EsMajorVersion.V_5_X) && settings.getMaxDocsPerPartition() != null) { partitions = findSlicePartitions(client.getRestClient(), settings, mapping, nodesMap, shards, log); } else { partitions = findShardPartitions(settings, mapping, nodesMap, shards, log); }
a).findSlicePartitions
這個方法其實就是在5.x及之後的ES版本,同時配置了
es.input.max.docs.per.partition
之後,纔會執行,實際上就是將ES的分片按照指定大小進行拆分,必然要先進行分片大小統計,而後計算出拆分的分區數,最後生成分區信息。具體代碼以下:
long numDocs; if (readResource.isTyped()) { numDocs = client.count(index, readResource.type(), Integer.toString(shardId), query); } else { numDocs = client.countIndexShard(index, Integer.toString(shardId), query); } int numPartitions = (int) Math.max(1, numDocs / maxDocsPerPartition); for (int i = 0; i < numPartitions; i++) { PartitionDefinition.Slice slice = new PartitionDefinition.Slice(i, numPartitions); partitions.add(new PartitionDefinition(settings, resolvedMapping, index, shardId, slice, locations)); }
實際上分片就是用遊標的方式,對_doc進行排序,而後按照分片計算獲得的分區偏移進行數據的讀取,組裝過程是SearchRequestBuilder.assemble方法來實現的。
b).findShardPartitions方法
這個方法沒啥疑問了就是一個RDD分區對應於ES index的一個分片。
PartitionDefinition partition = new PartitionDefinition(settings, resolvedMapping, index, shardId, locationList.toArray(new String[0])); partitions.add(partition);
以上就是Spark Core讀取ES數據的時候分片和RDD分區的對應關係分析,默認狀況下是一個es 索引分片對應Spark RDD的一個分區。假如分片數過大,且ES版本在5.x及以上,能夠配置參數
es.input.max.docs.per.partition
進行拆分。