搞了一段時間,hive2solr的job終於能夠穩定的跑了,實現使用hive向solr插數據,主要是實現RecordWriter接口,重寫write方法和close方法。下面對遇到的問題一一列出:
1.數據覆蓋問題,使用原子更新
參考:http://caiguangguang.blog.51cto.com/1652935/1599137
2.重複構建solrserver和solrtable對象問題,使用static在初始化的時候構建,後面直接調用
構建:html
public static Map<Integer,SolrServer> solrServers = new HashMap<Integer,SolrServer>(); public static Map<Integer,SolrTable> solrTables = new HashMap<Integer,SolrTable>(); public static String[] iparray; public static String ipstring; public static String collec; static { LOG .warn("in SolrServerCustom start initialize ip maps" ); ipstring = "xxxx,xxxxxx"; collec = "userinfo" ; LOG .warn("in SolrServerCustom ipstring and collec: " + ipstring + "," + collec ); iparray = ipstring .split("," ); Arrays. sort( iparray); for (int i=0;i< iparray. length;i++){ String urlx = "http://" +iparray [i]+"/solr/" + collec; solrServers.put(i, new HttpSolrServer(urlx)); solrTables.put(i, new SolrTable(String.valueOf(i))); } LOG .warn("in SolrServerCustom end initialize ip maps,maps size " + solrServers .size()); LOG .warn("in SolrServerCustom end initialize ip mapsx,mapsx size " + solrTables .size()); }
引用:
java
public void write(Writable w) throws IOException { MapWritable map = (MapWritable) w; SolrInputDocument doc = new SolrInputDocument(); String key; String value; String newkey; int idx; for (final Map.Entry<Writable, Writable> entry : map.entrySet()) { key = entry.getKey().toString(); newkey = this.tableName + "." + entry.getKey().toString(); value = entry.getValue().toString(); if(key.equals("id")){ idx = SolrUtil.getIntServer(value,SolrServerCustom.solrServers); //引用靜態屬性SolrServerCustom.solrServers table = SolrServerCustom.solrTables.get(idx); //引用靜態屬性SolrServerCustom.solrTables table.setNumInputBufferRows(this.numInputBufferRows); } if(key.equals("id")){ doc.addField("id",Integer.valueOf(value)); }else{ if (value.equals("(null)")){ value = ""; } setOper = new LinkedHashMap<String,Object>(); setOper.put("set",value); if(!doc.keySet().contains(newkey)){ doc.addField(newkey, setOper); } } } table.save(doc); }
3.代碼存在內存泄露問題
1)對象的聲明,放在循環外,並調整outbuffer的大小
現象:yarn map/reduce java heap滿致使job hangnginx
錯誤日誌:web
2015-01-26 14:01:10,000 FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:45) at java.lang.StringBuilder.<init>(StringBuilder.java:68) at com.chimpler.hive.solr.SolrWriter.write(SolrWriter.java:71) at org.apache.hadoop.hive.ql.exec.FileSinkOperator.processOp(FileSinkOperator.java:621) at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:793) at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:87) at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:793) at org.apache.hadoop.hive.ql.exec.TableScanOperator.processOp(TableScanOperator.java:92) at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:793) at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:540) at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:177) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:428) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:160) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:155)
2)try...catch....finally的使用(在finally中 clear buffer)
一開始沒有增長finally,致使在異常發生時buffer會大於設置,最終致使job內存用滿,hang住。
4.異常的處理
要求一個solrserver出錯,或者solr暫時不響應時程序不能退出,默認狀況下異常向上拋出,最終致使job失敗
好比:
apache
Caused by: org.apache.solr.client.solrj.impl.HttpSolrServer$RemoteSolrException: Expected content type application/octet-stream but got text/html. <html> <head><title>504 Gateway Time-out</title></head> <body bgcolor="white"> <center><h1>504 Gateway Time-out</h1></center> <hr><center>nginx/1.6.2</center> </body> </html>
防止異常的拋出會形成runtime error致使job失敗,catch異常後不作處理
tomcat
public void flush(){ try { if (!outputBuffer.isEmpty()) { server.add(outputBuffer); } } catch(Exception e){ LOG.warn("solrtable add error,Exception log is " + e); }finally{ outputBuffer.clear(); //在finally中清除buffer,不然會致使buffer在異常拋出時一直遞增致使jvm oom的問題 } }
5.commit問題,調用close方法時,只有最後一個solrtable會close,開始時使用每插入一行就commit的方式,可是這種性能不好(大約50%的下降),後來在solrserver端控制commit
solrconfig.xml:
bash
<autoCommit> <!--<maxTime>${solr.autoCommit.maxTime:15000}</maxTime>--> <maxDocs>15000</maxDocs> //當內存索引數量達到指定值的時候,將內存的索引DUMP到硬盤中,並通知searcher類加載新的索引 <maxTime>1000</maxTime> //每隔指定的時間段,自動的COMMIT內存中的索引數據,並通知Searcher類加載新的索引,以最早達到條件執行爲準 <openSearcher>true</openSearcher> //設置爲false時,雖然commit會致使index的變動flush到磁盤上,可是客戶端不會看到更新 </autoCommit> <autoSoftCommit> <maxTime>${solr.autoSoftCommit.maxTime:10000}</maxTime> </autoSoftCommit>
這裏autoCommit是指hard commit,若是不使用autoCommit也能夠在add document時帶上commitWithin的參數autoSoftCommit和autoCommit相似,可是它是一個solf類型的commit,能夠確保數據可見可是沒有把數據flush到磁盤,機器crash會致使數據丟失。
save也致使性能損耗,save會消耗6ms左右的時間,須要放到一個list中進行save操做(batch操做)
6.outbuffer的問題
初始的代碼,由於對用solrtable來講只有一個入口(solrcloud時也同樣),這樣solrtable只有一個實例,這裏用到了靜態變量,每一個solrtable不能按本身的buffer進行操做
改爲非靜態變量,而且使用靜態代碼塊初始化table和server,放到一個hashmap中,用的時候去取,保證只有幾個的實例。不然若是在使用時進行實例化,每次的對象都不一樣,致使buffer一直爲1。
7.close的問題
若是設置了buffer,可能會致使不能flush
併發
public void save(SolrInputDocument doc) { outputBuffer.add(doc); //使用save放到buffer list中 if (outputBuffer.size() >= numOutputBufferRows) { //只有list的大小>=設置的buffer大小時纔會觸發flush的操做 flush(); } }
而flush中會調用server.add(outputBuffer)操做。filesink關閉時調用SolrWriter.close
調用SolrTable的commit(commit中調用flush和server.commit),發現只有最後一個table實例會調用commit.
解決方法,在SolrWriter.close中循環調用SolrTable.commit方法:
app
public void close(boolean abort) throws IOException { if (!abort) { Map<Integer,SolrTable> maps = new SolrServerCustom().solrTable; for(Map.Entry<Integer, SolrTable> entry:maps.entrySet()){ entry.getValue().commit(); } } else { table.rollback(); } }
8.鎖的問題,從nginx端看到大量的302 ,solr日誌看到有鎖的問題,調整參數,在solr啓動時釋放鎖
solr端日誌:
jvm
userinfo: org.apache.solr.common.SolrException:org.apache.solr.common.SolrException: Index locked for write for core userinfo
解決:solrconfig.xml中設置
<unlockOnStartup>true</unlockOnStartup>
緣由:
org.apache.solr.core.SolrCore初始化時使用IndexWriter.isLocked(dir)判斷是否加鎖,若是已經加了鎖,則分爲兩種狀況,一種是在solrconfig.xml中配置了unlockOnStartup,會嘗試unlock,若是沒有配置unlockStartup,則會拋出Index locked for write for core異常
根據堆棧能夠看對應代碼:
org.apache.solr.core.SolrCore 構造函數中會調用initIndex方法:
void initIndex(boolean reload) throws IOException { String indexDir = getNewIndexDir(); boolean indexExists = getDirectoryFactory().exists(indexDir); boolean firstTime; synchronized (SolrCore.class) { firstTime = dirs.add(getDirectoryFactory().normalize(indexDir)); } boolean removeLocks = solrConfig.unlockOnStartup; // unlockOnStartup = getBool(indexConfigPrefix+"/unlockOnStartup", false); 默認爲false initIndexReaderFactory(); if (indexExists && firstTime && !reload) { Directory dir = directoryFactory.get(indexDir, DirContext.DEFAULT, getSolrConfig().indexConfig.lockType); try { if (IndexWriter.isLocked(dir)) { if (removeLocks) { log.warn( logid + "WARNING: Solr index directory '{}' is locked. Unlocking...", indexDir); IndexWriter.unlock(dir); //解鎖 } else { log.error(logid + "Solr index directory '{}' is locked. Throwing exception", indexDir); throw new LockObtainFailedException( "Index locked for write for core " + name); } } } finally { directoryFactory.release(dir); } } // Create the index if it doesn't exist. if(!indexExists) { log.warn(logid+"Solr index directory '" + new File(indexDir) + "' doesn't exist." + " Creating new index..."); SolrIndexWriter writer = SolrIndexWriter.create("SolrCore.initIndex", indexDir, getDirectoryFactory(), true, getLatestSchema(), solrConfig.indexConfig, solrDelPolicy, codec); writer.close(); } }
9.tomcat的配置致使的問題,每臺機器兩個solr實例,其中一個一直不能啓動(在實例化core時會嘗試獲取鎖,這裏獲取鎖失敗,能夠手動刪除write.lock)
最終發現是兩個tomcat寫到了一個solr目錄裏面
錯誤日誌:
Caused by: org.apache.lucene.store.LockObtainFailedException: Lock obtain timed out: NativeFSLock@/apps/dat/web/working/solr/cloud/storage/data/userinfo/data/index/write.lock at org.apache.lucene.store.Lock.obtain(Lock.java:89) at org.apache.lucene.index.IndexWriter.<init>(IndexWriter.java:710) at org.apache.solr.update.SolrIndexWriter.<init>(SolrIndexWriter.java:77) at org.apache.solr.update.SolrIndexWriter.create(SolrIndexWriter.java:64) at org.apache.solr.update.DefaultSolrCoreState.createMainIndexWriter(DefaultSolrCoreState.java:267) at org.apache.solr.update.DefaultSolrCoreState.getIndexWriter(DefaultSolrCoreState.java:110) at org.apache.solr.core.SolrCore.openNewSearcher(SolrCore.java:1513) ... 12 more
10.部分job運行緩慢,其中一個job運行了11個小時。。緣由:數據寫入時發生在mapoperator或者reduceoperator中,多少個map或者reduce就是多少個併發線程寫入。job只有一個reduce,致使寫入緩慢,調整reduce的數量到100(set mapreduce.job.reduces=100)後,性能大幅度提高,3kw數據導入時間由40916s降低到993s。