如何使用Spark大規模並行構建索引

使用Spark構建索引很是簡單,由於spark提供了更高級的抽象rdd分佈式彈性數據集,相比之前的使用Hadoop的MapReduce來構建大規模索引,Spark具備更靈活的api操做,性能更高,語法更簡潔等一系列優勢。 

先看下,總體的拓撲圖: 





而後,再來看下,使用scala寫的spark程序: 

Java代碼   收藏代碼
  1. package com.easy.build.index  
  2.   
  3. import java.util  
  4.   
  5. import org.apache.solr.client.solrj.beans.Field  
  6. import org.apache.solr.client.solrj.impl.HttpSolrClient  
  7. import org.apache.spark.rdd.RDD  
  8. import org.apache.spark.{SparkConf, SparkContext}  
  9.   
  10. import scala.annotation.meta.field  
  11. /** 
  12.   * Created by qindongliang on 2016/1/21. 
  13.   */  
  14.   
  15. //註冊model,時間類型能夠爲字符串,只要後臺索引配置爲Long便可,註解映射形式以下  
  16. case class Record(  
  17.                    @(Field@field )("rowkey")     rowkey:String,  
  18.                    @(Field@field )("title")  title:String,  
  19.                    @(Field@field)("content") content:String,  
  20.                    @(Field@field)("isdel") isdel:String,  
  21.                    @(Field@field)("t1") t1:String,  
  22.                    @(Field@field)("t2")t2:String,  
  23.                    @(Field@field)("t3")t3:String,  
  24.                    @(Field@field)("dtime") dtime:String  
  25.   
  26.   
  27.                  )  
  28.   
  29. /*** 
  30.   * Spark構建索引==>Solr 
  31.   */  
  32. object SparkIndex {  
  33.   
  34.   //solr客戶端  
  35.   val client=new  HttpSolrClient("http://192.168.1.188:8984/solr/monitor");  
  36.   //批提交的條數  
  37.   val batchCount=10000;  
  38.   
  39.   def main2(args: Array[String]) {  
  40.   
  41.     val d1=new Record("row1","title","content","1","01","57","58","3");  
  42.     val d2=new Record("row2","title","content","1","01","57","58","45");  
  43.     val d3=new Record("row3","title","content","1","01","57","58",null);  
  44.     client.addBean(d1);  
  45.     client.addBean(d2)  
  46.     client.addBean(d3)  
  47.     client.commit();  
  48.     println("提交成功!")  
  49.   
  50.   
  51.   }  
  52.   
  53.   
  54.   /*** 
  55.     * 迭代分區數據(一個迭代器集合),而後進行處理 
  56.     * @param lines 處理每一個分區的數據 
  57.     */  
  58.   def  indexPartition(lines:scala.Iterator[String] ): Unit ={  
  59.           //初始化集合,分區迭代開始前,能夠初始化一些內容,如數據庫鏈接等  
  60.           val datas = new util.ArrayList[Record]()  
  61.           //迭代處理每條數據,符合條件會提交數據  
  62.           lines.foreach(line=>indexLineToModel(line,datas))  
  63.           //操做分區結束後,能夠關閉一些資源,或者作一些操做,最後一次提交數據  
  64.           commitSolr(datas,true);  
  65.   }  
  66.   
  67.   /*** 
  68.     *  提交索引數據到solr中 
  69.     * 
  70.     * @param datas 索引數據 
  71.     * @param isEnd 是否爲最後一次提交 
  72.     */  
  73.   def commitSolr(datas:util.ArrayList[Record],isEnd:Boolean): Unit ={  
  74.           //僅僅最後一次提交和集合長度等於批處理的數量時才提交  
  75.           if ((datas.size()>0&&isEnd)||datas.size()==batchCount) {  
  76.             client.addBeans(datas);  
  77.             client.commit(); //提交數據  
  78.             datas.clear();//清空集合,便於重用  
  79.           }  
  80.   }  
  81.   
  82.   
  83.   /*** 
  84.     * 獲得分區的數據具體每一行,並映射 
  85.     * 到Model,進行後續索引處理 
  86.     * 
  87.     * @param line 每行具體數據 
  88.     * @param datas 添加數據的集合,用於批量提交索引 
  89.     */  
  90.   def indexLineToModel(line:String,datas:util.ArrayList[Record]): Unit ={  
  91.     //數組數據清洗轉換  
  92.     val fields=line.split("\1",-1).map(field =>etl_field(field))  
  93.     //將清洗完後的數組映射成Tuple類型  
  94.     val tuple=buildTuble(fields)  
  95.     //將Tuple轉換成Bean類型  
  96.     val recoder=Record.tupled(tuple)  
  97.     //將實體類添加至集合,方便批處理提交  
  98.     datas.add(recoder);  
  99.     //提交索引到solr  
  100.     commitSolr(datas,false);  
  101.   }  
  102.   
  103.   
  104.   /*** 
  105.     * 將數組映射成Tuple集合,方便與Bean綁定 
  106.     * @param array field集合數組 
  107.     * @return tuple集合 
  108.     */  
  109.   def buildTuble(array: Array[String]):(String, String, String, String, String, String, String, String)={  
  110.      array match {  
  111.        case Array(s1, s2, s3, s4, s5, s6, s7, s8) => (s1, s2, s3, s4, s5, s6, s7,s8)  
  112.      }  
  113.   }  
  114.   
  115.   
  116.   /*** 
  117.     *  對field進行加工處理 
  118.     * 空值替換爲null,這樣索引裏面就不會索引這個字段 
  119.     * ,正常值就仍是原樣返回 
  120.     * 
  121.     * @param field 用來走特定規則的數據 
  122.     * @return 映射完的數據 
  123.     */  
  124.   def etl_field(field:String):String={  
  125.     field match {  
  126.       case "" => null  
  127.       case _ => field  
  128.     }  
  129.   }  
  130.   
  131.   /*** 
  132.     * 根據條件清空某一類索引數據 
  133.     * @param query 刪除的查詢條件 
  134.     */  
  135.   def deleteSolrByQuery(query:String): Unit ={  
  136.     client.deleteByQuery(query);  
  137.     client.commit()  
  138.     println("刪除成功!")  
  139.   }  
  140.   
  141.   
  142.   def main(args: Array[String]) {  
  143.     //根據條件刪除一些數據  
  144.     deleteSolrByQuery("t1:03")  
  145.     //遠程提交時,須要提交打包後的jar  
  146.     val jarPath = "target\\spark-build-index-1.0-SNAPSHOT.jar";  
  147.     //遠程提交時,假裝成相關的hadoop用戶,不然,可能沒有權限訪問hdfs系統  
  148.     System.setProperty("user.name""webmaster");  
  149.     //初始化SparkConf  
  150.     val conf = new SparkConf().setMaster("spark://192.168.1.187:7077").setAppName("build index ");  
  151.     //上傳運行時依賴的jar包  
  152.     val seq = Seq(jarPath) :+ "D:\\tmp\\lib\\noggit-0.6.jar" :+ "D:\\tmp\\lib\\httpclient-4.3.1.jar" :+ "D:\\tmp\\lib\\httpcore-4.3.jar" :+ "D:\\tmp\\lib\\solr-solrj-5.1.0.jar" :+ "D:\\tmp\\lib\\httpmime-4.3.1.jar"  
  153.     conf.setJars(seq)  
  154.     //初始化SparkContext上下文  
  155.     val sc = new SparkContext(conf);  
  156.     //此目錄下全部的數據,將會被構建索引,格式必定是約定好的  
  157.     val rdd = sc.textFile("hdfs://192.168.1.187:9000/user/monitor/gs/");  
  158.     //經過rdd構建索引  
  159.     indexRDD(rdd);  
  160.     //關閉索引資源  
  161.     client.close();  
  162.     //關閉SparkContext上下文  
  163.     sc.stop();  
  164.   
  165.   
  166.   }  
  167.   
  168.   
  169.   /*** 
  170.     * 處理rdd數據,構建索引 
  171.     * @param rdd 
  172.     */  
  173.   def indexRDD(rdd:RDD[String]): Unit ={  
  174.     //遍歷分區,構建索引  
  175.     rdd.foreachPartition(line=>indexPartition(line));  
  176.   }  
  177.   
  178.   
  179.   
  180. }  

ok,至此,咱們的建索引程序就寫完了,本例子中用的是遠程提交模式,實際上它也能夠支持spark on yarn (cluster 或者 client )  模式,不過此時須要注意的是,不須要顯式指定setMaster的值,而由提交任務時,經過--master來指定運行模式,另外,依賴的相關jar包,也須要經過--jars參數來提交到集羣裏面,不然的話,運行時會報異常,最後看下本例子裏面的solr是單機模式的,因此使用spark建索引提速並無達到最大值,真正能發揮最大威力的是,多臺search集羣正如我畫的架構圖裏面,每臺機器是一個shard,這就是solrcloud的模式,或者在elasticsearch裏面的集羣shard,這樣以來,才能真正達到高效批量的索引構建 
相關文章
相關標籤/搜索