hive2solr問題小結

  搞了一段時間,hive2solr的job終於能夠穩定的跑了,實現使用hive向solr插數據,主要是實現RecordWriter接口,重寫write方法和close方法。下面對遇到的問題一一列出:

1.數據覆蓋問題,使用原子更新
參考:http://caiguangguang.blog.51cto.com/1652935/1599137
2.重複構建solrserver和solrtable對象問題,使用static在初始化的時候構建,後面直接調用
構建:html

        public static Map<Integer,SolrServer> solrServers = new HashMap<Integer,SolrServer>();
        public static Map<Integer,SolrTable> solrTables = new HashMap<Integer,SolrTable>();
        public static String[] iparray;
        public static String ipstring;
        public static String collec;
        static {
               LOG .warn("in SolrServerCustom start initialize ip maps" );
               ipstring = "xxxx,xxxxxx";
               collec = "userinfo" ;
               LOG .warn("in SolrServerCustom  ipstring and collec: " + ipstring + "," + collec );
               iparray = ipstring .split("," );
              Arrays. sort( iparray);
               for (int i=0;i< iparray. length;i++){
                     String urlx = "http://" +iparray [i]+"/solr/" + collec;
                      solrServers.put(i, new HttpSolrServer(urlx));
                      solrTables.put(i, new SolrTable(String.valueOf(i)));
              }
               LOG .warn("in SolrServerCustom end initialize ip maps,maps size " + solrServers .size());
               LOG .warn("in SolrServerCustom end initialize ip mapsx,mapsx size " + solrTables .size()); 
       }

引用:
java

 public void write(Writable w) throws IOException {
          MapWritable map = (MapWritable) w;
          SolrInputDocument doc = new SolrInputDocument();
          String key;
          String value;
          String newkey;
          int idx;
          for (final Map.Entry<Writable, Writable> entry : map.entrySet()) {
               key = entry.getKey().toString();
               newkey = this.tableName + "." + entry.getKey().toString();
               value = entry.getValue().toString();
               if(key.equals("id")){
                    idx = SolrUtil.getIntServer(value,SolrServerCustom.solrServers); //引用靜態屬性SolrServerCustom.solrServers
                    table = SolrServerCustom.solrTables.get(idx); //引用靜態屬性SolrServerCustom.solrTables
                    table.setNumInputBufferRows(this.numInputBufferRows);
               }
               if(key.equals("id")){
                    doc.addField("id",Integer.valueOf(value));
               }else{
                    if (value.equals("(null)")){
                         value = "";
                    }
                    setOper = new LinkedHashMap<String,Object>();
                    setOper.put("set",value);
                    if(!doc.keySet().contains(newkey)){
                         doc.addField(newkey, setOper);
                    }    
               }
          }
          table.save(doc);
     }

3.代碼存在內存泄露問題
1)對象的聲明,放在循環外,並調整outbuffer的大小
現象:yarn map/reduce  java heap滿致使job hangnginx

錯誤日誌:web

2015-01-26 14:01:10,000 FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:45)
        at java.lang.StringBuilder.<init>(StringBuilder.java:68)
        at com.chimpler.hive.solr.SolrWriter.write(SolrWriter.java:71)
        at org.apache.hadoop.hive.ql.exec.FileSinkOperator.processOp(FileSinkOperator.java:621)
        at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:793)
        at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:87)
        at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:793)
        at org.apache.hadoop.hive.ql.exec.TableScanOperator.processOp(TableScanOperator.java:92)
        at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:793)
        at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:540)
        at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:177)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:428)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:160)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
              at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:155)

2)try...catch....finally的使用(在finally中 clear buffer)
一開始沒有增長finally,致使在異常發生時buffer會大於設置,最終致使job內存用滿,hang住。

4.異常的處理
要求一個solrserver出錯,或者solr暫時不響應時程序不能退出,默認狀況下異常向上拋出,最終致使job失敗
好比:
apache

Caused by: org.apache.solr.client.solrj.impl.HttpSolrServer$RemoteSolrException: Expected content type application/octet-stream but got text/html. <html>
<head><title>504 Gateway Time-out</title></head>
<body bgcolor="white">
<center><h1>504 Gateway Time-out</h1></center>
<hr><center>nginx/1.6.2</center>
</body>
</html>

 防止異常的拋出會形成runtime error致使job失敗,catch異常後不作處理
tomcat

     public void flush(){
          try {
               if (!outputBuffer.isEmpty()) {
                    server.add(outputBuffer);
               }
          } catch(Exception e){
               LOG.warn("solrtable add error,Exception log is " + e);
          }finally{
               outputBuffer.clear(); //在finally中清除buffer,不然會致使buffer在異常拋出時一直遞增致使jvm oom的問題
          }
     }

5.commit問題,調用close方法時,只有最後一個solrtable會close,開始時使用每插入一行就commit的方式,可是這種性能不好(大約50%的下降),後來在solrserver端控制commit
solrconfig.xml:
bash

     <autoCommit>
       <!--<maxTime>${solr.autoCommit.maxTime:15000}</maxTime>-->
         <maxDocs>15000</maxDocs> //當內存索引數量達到指定值的時候,將內存的索引DUMP到硬盤中,並通知searcher類加載新的索引
        <maxTime>1000</maxTime> //每隔指定的時間段,自動的COMMIT內存中的索引數據,並通知Searcher類加載新的索引,以最早達到條件執行爲準
       <openSearcher>true</openSearcher>  //設置爲false時,雖然commit會致使index的變動flush到磁盤上,可是客戶端不會看到更新
     </autoCommit>
   
     <autoSoftCommit>
       <maxTime>${solr.autoSoftCommit.maxTime:10000}</maxTime>
     </autoSoftCommit>

這裏autoCommit是指hard commit,若是不使用autoCommit也能夠在add document時帶上commitWithin的參數autoSoftCommit和autoCommit相似,可是它是一個solf類型的commit,能夠確保數據可見可是沒有把數據flush到磁盤,機器crash會致使數據丟失。
save也致使性能損耗,save會消耗6ms左右的時間,須要放到一個list中進行save操做(batch操做)


6.outbuffer的問題
初始的代碼,由於對用solrtable來講只有一個入口(solrcloud時也同樣),這樣solrtable只有一個實例,這裏用到了靜態變量,每一個solrtable不能按本身的buffer進行操做
改爲非靜態變量,而且使用靜態代碼塊初始化table和server,放到一個hashmap中,用的時候去取,保證只有幾個的實例。不然若是在使用時進行實例化,每次的對象都不一樣,致使buffer一直爲1。

7.close的問題
若是設置了buffer,可能會致使不能flush
併發

public void save(SolrInputDocument doc) {
     outputBuffer.add(doc); //使用save放到buffer list中
     if (outputBuffer.size() >= numOutputBufferRows) { //只有list的大小>=設置的buffer大小時纔會觸發flush的操做
         flush();
     }
}

而flush中會調用server.add(outputBuffer)操做。filesink關閉時調用SolrWriter.close
調用SolrTable的commit(commit中調用flush和server.commit),發現只有最後一個table實例會調用commit.
解決方法,在SolrWriter.close中循環調用SolrTable.commit方法:
app

public void close(boolean abort) throws IOException {
     if (!abort) {
         Map<Integer,SolrTable> maps = new SolrServerCustom().solrTable;
         for(Map.Entry<Integer, SolrTable> entry:maps.entrySet()){
             entry.getValue().commit();
         }
     } else {
         table.rollback();
     }
}

8.鎖的問題,從nginx端看到大量的302 ,solr日誌看到有鎖的問題,調整參數,在solr啓動時釋放鎖
solr端日誌:
jvm

userinfo: org.apache.solr.common.SolrException:org.apache.solr.common.SolrException: Index locked for write for core userinfo

解決:solrconfig.xml中設置

<unlockOnStartup>true</unlockOnStartup>

緣由:

org.apache.solr.core.SolrCore初始化時使用IndexWriter.isLocked(dir)判斷是否加鎖,若是已經加了鎖,則分爲兩種狀況,一種是在solrconfig.xml中配置了unlockOnStartup,會嘗試unlock,若是沒有配置unlockStartup,則會拋出Index locked for write for core異常

根據堆棧能夠看對應代碼:
org.apache.solr.core.SolrCore 構造函數中會調用initIndex方法:

  void initIndex(boolean reload) throws IOException {
      String indexDir = getNewIndexDir();
      boolean indexExists = getDirectoryFactory().exists(indexDir);
      boolean firstTime;
      synchronized (SolrCore.class) {
        firstTime = dirs.add(getDirectoryFactory().normalize(indexDir));
      }
      boolean removeLocks = solrConfig.unlockOnStartup; // unlockOnStartup = getBool(indexConfigPrefix+"/unlockOnStartup", false); 默認爲false
      initIndexReaderFactory();
      if (indexExists && firstTime && !reload) {
       
        Directory dir = directoryFactory.get(indexDir, DirContext.DEFAULT,
            getSolrConfig().indexConfig.lockType);
        try {
          if (IndexWriter.isLocked(dir)) {
            if (removeLocks) {
              log.warn(
                  logid
                      + "WARNING: Solr index directory '{}' is locked.  Unlocking...",
                  indexDir);
              IndexWriter.unlock(dir); //解鎖
            } else {
              log.error(logid
                  + "Solr index directory '{}' is locked.  Throwing exception",
                  indexDir);
              throw new LockObtainFailedException(
                  "Index locked for write for core " + name);
            }
           
          }
        } finally {
          directoryFactory.release(dir);
        }
      }
      // Create the index if it doesn't exist.
      if(!indexExists) {
        log.warn(logid+"Solr index directory '" + new File(indexDir) + "' doesn't exist."
                + " Creating new index...");
        SolrIndexWriter writer = SolrIndexWriter.create("SolrCore.initIndex", indexDir, getDirectoryFactory(), true,
                                                        getLatestSchema(), solrConfig.indexConfig, solrDelPolicy, codec);
        writer.close();
      }
  }

9.tomcat的配置致使的問題,每臺機器兩個solr實例,其中一個一直不能啓動(在實例化core時會嘗試獲取鎖,這裏獲取鎖失敗,能夠手動刪除write.lock)
最終發現是兩個tomcat寫到了一個solr目錄裏面

錯誤日誌:

Caused by: org.apache.lucene.store.LockObtainFailedException: Lock obtain timed out: NativeFSLock@/apps/dat/web/working/solr/cloud/storage/data/userinfo/data/index/write.lock
     at org.apache.lucene.store.Lock.obtain(Lock.java:89)
     at org.apache.lucene.index.IndexWriter.<init>(IndexWriter.java:710)
     at org.apache.solr.update.SolrIndexWriter.<init>(SolrIndexWriter.java:77)
     at org.apache.solr.update.SolrIndexWriter.create(SolrIndexWriter.java:64)
     at org.apache.solr.update.DefaultSolrCoreState.createMainIndexWriter(DefaultSolrCoreState.java:267)
     at org.apache.solr.update.DefaultSolrCoreState.getIndexWriter(DefaultSolrCoreState.java:110)
     at org.apache.solr.core.SolrCore.openNewSearcher(SolrCore.java:1513)
     ... 12 more

10.部分job運行緩慢,其中一個job運行了11個小時。。緣由:數據寫入時發生在mapoperator或者reduceoperator中,多少個map或者reduce就是多少個併發線程寫入。job只有一個reduce,致使寫入緩慢,調整reduce的數量到100(set mapreduce.job.reduces=100)後,性能大幅度提高,3kw數據導入時間由40916s降低到993s。

相關文章
相關標籤/搜索