如何使用Spark大規模並行構建索引
使用Spark構建索引很是簡單,由於spark提供了更高級的抽象rdd分佈式彈性數據集,相比之前的使用Hadoop的MapReduce來構建大規模索引,Spark具備更靈活的api操做,性能更高,語法更簡潔等一系列優勢。
先看下,總體的拓撲圖:
而後,再來看下,使用scala寫的spark程序:
Java代碼
- package com.easy.build.index
-
- import java.util
-
- import org.apache.solr.client.solrj.beans.Field
- import org.apache.solr.client.solrj.impl.HttpSolrClient
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- import scala.annotation.meta.field
- /**
- * Created by qindongliang on 2016/1/21.
- */
-
- //註冊model,時間類型能夠爲字符串,只要後臺索引配置爲Long便可,註解映射形式以下
- case class Record(
- @(Field@field )("rowkey") rowkey:String,
- @(Field@field )("title") title:String,
- @(Field@field)("content") content:String,
- @(Field@field)("isdel") isdel:String,
- @(Field@field)("t1") t1:String,
- @(Field@field)("t2")t2:String,
- @(Field@field)("t3")t3:String,
- @(Field@field)("dtime") dtime:String
-
-
- )
-
- /***
- * Spark構建索引==>Solr
- */
- object SparkIndex {
-
- //solr客戶端
- val client=new HttpSolrClient("http://192.168.1.188:8984/solr/monitor");
- //批提交的條數
- val batchCount=10000;
-
- def main2(args: Array[String]) {
-
- val d1=new Record("row1","title","content","1","01","57","58","3");
- val d2=new Record("row2","title","content","1","01","57","58","45");
- val d3=new Record("row3","title","content","1","01","57","58",null);
- client.addBean(d1);
- client.addBean(d2)
- client.addBean(d3)
- client.commit();
- println("提交成功!")
-
-
- }
-
-
- /***
- * 迭代分區數據(一個迭代器集合),而後進行處理
- * @param lines 處理每一個分區的數據
- */
- def indexPartition(lines:scala.Iterator[String] ): Unit ={
- //初始化集合,分區迭代開始前,能夠初始化一些內容,如數據庫鏈接等
- val datas = new util.ArrayList[Record]()
- //迭代處理每條數據,符合條件會提交數據
- lines.foreach(line=>indexLineToModel(line,datas))
- //操做分區結束後,能夠關閉一些資源,或者作一些操做,最後一次提交數據
- commitSolr(datas,true);
- }
-
- /***
- * 提交索引數據到solr中
- *
- * @param datas 索引數據
- * @param isEnd 是否爲最後一次提交
- */
- def commitSolr(datas:util.ArrayList[Record],isEnd:Boolean): Unit ={
- //僅僅最後一次提交和集合長度等於批處理的數量時才提交
- if ((datas.size()>0&&isEnd)||datas.size()==batchCount) {
- client.addBeans(datas);
- client.commit(); //提交數據
- datas.clear();//清空集合,便於重用
- }
- }
-
-
- /***
- * 獲得分區的數據具體每一行,並映射
- * 到Model,進行後續索引處理
- *
- * @param line 每行具體數據
- * @param datas 添加數據的集合,用於批量提交索引
- */
- def indexLineToModel(line:String,datas:util.ArrayList[Record]): Unit ={
- //數組數據清洗轉換
- val fields=line.split("\1",-1).map(field =>etl_field(field))
- //將清洗完後的數組映射成Tuple類型
- val tuple=buildTuble(fields)
- //將Tuple轉換成Bean類型
- val recoder=Record.tupled(tuple)
- //將實體類添加至集合,方便批處理提交
- datas.add(recoder);
- //提交索引到solr
- commitSolr(datas,false);
- }
-
-
- /***
- * 將數組映射成Tuple集合,方便與Bean綁定
- * @param array field集合數組
- * @return tuple集合
- */
- def buildTuble(array: Array[String]):(String, String, String, String, String, String, String, String)={
- array match {
- case Array(s1, s2, s3, s4, s5, s6, s7, s8) => (s1, s2, s3, s4, s5, s6, s7,s8)
- }
- }
-
-
- /***
- * 對field進行加工處理
- * 空值替換爲null,這樣索引裏面就不會索引這個字段
- * ,正常值就仍是原樣返回
- *
- * @param field 用來走特定規則的數據
- * @return 映射完的數據
- */
- def etl_field(field:String):String={
- field match {
- case "" => null
- case _ => field
- }
- }
-
- /***
- * 根據條件清空某一類索引數據
- * @param query 刪除的查詢條件
- */
- def deleteSolrByQuery(query:String): Unit ={
- client.deleteByQuery(query);
- client.commit()
- println("刪除成功!")
- }
-
-
- def main(args: Array[String]) {
- //根據條件刪除一些數據
- deleteSolrByQuery("t1:03")
- //遠程提交時,須要提交打包後的jar
- val jarPath = "target\\spark-build-index-1.0-SNAPSHOT.jar";
- //遠程提交時,假裝成相關的hadoop用戶,不然,可能沒有權限訪問hdfs系統
- System.setProperty("user.name", "webmaster");
- //初始化SparkConf
- val conf = new SparkConf().setMaster("spark://192.168.1.187:7077").setAppName("build index ");
- //上傳運行時依賴的jar包
- val seq = Seq(jarPath) :+ "D:\\tmp\\lib\\noggit-0.6.jar" :+ "D:\\tmp\\lib\\httpclient-4.3.1.jar" :+ "D:\\tmp\\lib\\httpcore-4.3.jar" :+ "D:\\tmp\\lib\\solr-solrj-5.1.0.jar" :+ "D:\\tmp\\lib\\httpmime-4.3.1.jar"
- conf.setJars(seq)
- //初始化SparkContext上下文
- val sc = new SparkContext(conf);
- //此目錄下全部的數據,將會被構建索引,格式必定是約定好的
- val rdd = sc.textFile("hdfs://192.168.1.187:9000/user/monitor/gs/");
- //經過rdd構建索引
- indexRDD(rdd);
- //關閉索引資源
- client.close();
- //關閉SparkContext上下文
- sc.stop();
-
-
- }
-
-
- /***
- * 處理rdd數據,構建索引
- * @param rdd
- */
- def indexRDD(rdd:RDD[String]): Unit ={
- //遍歷分區,構建索引
- rdd.foreachPartition(line=>indexPartition(line));
- }
-
-
-
- }
ok,至此,咱們的建索引程序就寫完了,本例子中用的是遠程提交模式,實際上它也能夠支持spark on yarn (cluster 或者 client ) 模式,不過此時須要注意的是,不須要顯式指定setMaster的值,而由提交任務時,經過--master來指定運行模式,另外,依賴的相關jar包,也須要經過--jars參數來提交到集羣裏面,不然的話,運行時會報異常,最後看下本例子裏面的solr是單機模式的,因此使用spark建索引提速並無達到最大值,真正能發揮最大威力的是,多臺search集羣正如我畫的架構圖裏面,每臺機器是一個shard,這就是solrcloud的模式,或者在elasticsearch裏面的集羣shard,這樣以來,才能真正達到高效批量的索引構建
歡迎關注本站公眾號,獲取更多信息