部分業務須要使用HBase的數據進行多維度分析,咱們採用了將部分數據同步到Solr,經過Solr進行多維度查詢返回對應的Rowkey,再從HBase批量獲取數據。所以咱們使用了一個比較成熟的方案Lily HBase Indexer來同步二級索引到Solr。可是使用的時候出現了Solr丟失數據的問題。基本上天天Solr都會比HBase少幾千條數據。java
因爲咱們使用的是CDH集羣,下面全部操做都是基於該環境git
到每一個節點的/var/log/hbase-solr
和/var/log/solr
查看了日誌,都沒發現寫入失敗的記錄github
因爲日誌沒有發現錯誤,猜想是Solr的數據在緩存中沒提交上去。
在solr的collecttion目錄下的conf/solrconfig.xml文件,將Solr的硬提交激活,操做以下緩存
<autoCommit>
<maxTime>${solr.autoCommit.maxTime:60000}</maxTime>
<openSearcher>true</openSearcher>
</autoCommit>
而後保存配置,將修改update 到Solr集羣。而後測試仍舊出現上述問題app
目前是沒看到問題出在哪裏了,所以只能去網上搜索一下具體緣由了。網上有這麼兩個帖子
hbase-indexer solr numFound different from hbase table rows size異步
HBase Indexer致使Solr與HBase數據不一致問題解決性能
他們都提到了修改morphline-hbase-mapper.xml,添加read-row
以下:
測試
從新刷新hbase-indexer配置ui
此次發現數目對了,可是字段缺了spa
因爲設置了read-row以後數據不會再次從HBase中獲取,所以只會讀取WAL。假如修改了部分字段,HBaseIndexer就會提交相應的字段上去。例如
HBase中有name和age兩個字段
put 'HBase_Indexer_Test','001','cf1:name','xiaoming'
put 'HBase_Indexer_Test','002','cf1:name','xiaohua'
此時的數據爲
而後執行
put 'HBase_Indexer_Test','001','cf1:age','12'
最後只能看到
說明這種模式只從WAL獲取數據,而且將獲取的數據覆蓋到了Solr裏面。
那麼這樣看來只能修改HBase indexer的代碼了
Lily HBase Indexer的代碼是託管在github 上的,若是是單獨安裝的請直接訪問NGDATA的這個工程:http://ngdata.github.io/hbase-indexer/
若是是使用的CDH版本,請訪問:https://github.com/cloudera/hbase-indexer
我這裏使用CDH 5.7.0版本進行測試。在releases選項中能夠找到對應版本號的包,下載解壓以後能夠看到一個Maven工程。能夠看到它包含以下模塊
在./hbase-indexer-engine/src/main/java/com/ngdata/hbaseindexer/indexer/Indexer.java
文件中有一個calculateIndexUpdates方法,其中有以下代碼:
Result result = rowData.toResult();
if(conf.getRowReadMode()==RowReadMode.DYNAMIC){
if(!mapper.containsRequiredData(result)){
result = readRow(rowData);
}
}
boolean rowDeleted = result.isEmpty();
privateResult readRow(RowData rowData)throwsIOException{
TimerContext timerContext = rowReadTimer.time();
try{
HTableInterface table = tablePool.getTable(rowData.getTable());
try{
Getget= mapper.getGet(rowData.getRow());
return table.get(get);
}finally{
table.close();
}
}finally{
timerContext.stop();
}
}
從代碼中能夠看出其執行的流程圖以下:
假如咱們使用默認的Dynamic模式寫入了大量的數據,那麼意味着有部分數據會在WAL生成後一段時間內沒法「落地」,那麼就可能出現下面的狀況:
知道了問題在哪裏以後,咱們嘗試修改他的源碼。因爲HBase將預寫日誌的內容寫到HBase region中會有必定的滯後性,所以咱們能夠認爲預寫日誌中的內容老是最新的數據。假設咱們有一條rowkey =001的數據以下:
列名 | 值 |
---|---|
Rowkey | 001 |
cf1:A | a |
cf1:B | b |
cf1:C | c |
咱們將C的值改爲D。因爲夾雜在不少條數據中,可能日誌中拿到了C = 'd',可是HBase中仍舊是'c',咱們須要將HBase的數據拿出來,再將預寫日誌中的數據覆蓋它,便有了下面的代碼
privateResult readRow(RowData rowData)throwsIOException{
TimerContext timerContext = rowReadTimer.time();
try{
HTableInterface table = tablePool.getTable(rowData.getTable());
try{
Get get = mapper.getGet(rowData.getRow());
return merge(table.get(get), rowData.toResult());
//return table.get(get);
}finally{
table.close();
}
}finally{
timerContext.stop();
}
}
privateResult merge(Result data,Result wal)throwsIOException{
//若是data爲空,則直接返回WAL的數據
if(data.isEmpty()){
return wal;
}
/* //若是rowkey不相同,則返回wal的數據
if (!Bytes.toString(data.getRow()).equals(Bytes.toString(wal.getRow()))) {
return wal;
}*/
TreeMap<String,Cell> cellMap =newTreeMap<String,Cell>();
CellScanner dataScanner = data.cellScanner();
CellScanner walScanner = wal.cellScanner();
while(dataScanner.advance()){
Cell cell = dataScanner.current();
String cf =Bytes.toString(CellUtil.cloneFamily(cell));
String cq =Bytes.toString(CellUtil.cloneQualifier(cell));
String key = cf +"->"+ cq;
cellMap.put(key, cell);
}
while(walScanner.advance()){
Cell cell = walScanner.current();
String cf =Bytes.toString(CellUtil.cloneFamily(cell));
String cq =Bytes.toString(CellUtil.cloneQualifier(cell));
String key = cf +"->"+ cq;
cellMap.put(key, cell);
}
ArrayList<Cell> cells =newArrayList<Cell>();
cells.addAll(cellMap.values());
returnResult.create(cells);
}
值得一提的是,HBase返回的result中,列的排序是按照"列族名+列名"的字典排序。好比表中有["cf1:name","cf2:cellphone","cf1:age"] 三個列,那麼返回的時候會排列成["cf1:age","cf1:name","cf2:cellphone"]。在建立新的Result對象的時候也必須遵循這樣的規則,所以這裏使用了treemap。不要問我爲何,我特麼調了一成天才發現這個問題。
進入hbase-indexer-engine的工程,執行mvn clean install -DskipTests
進行打包,稍等片刻便好了
在target下面有一個hbase-indexer-engine-1.5-cdh5.7.0.jar文件(這裏的版本號對應本身的環境),將這個jar文件分發到集羣的hbase-indexer的目錄下,CDH版本放在在/opt/cloudera/parcels/CDH/jars/下便可。
而後重啓服務進行測試。
數據跑了一天,Solr中對應的條數和HBase的同樣。
所以咱們修改的代碼是有效的。
上面咱們是合併了數據而後所有覆蓋到Solr的,若是HBase存在大量的Update操做,那麼勢必每次列數都會和映射到Solr裏面的列不一致,所以每次都會從HBase中get一次數據,這樣確定會影響性能。那麼咱們可否使用ReadRow.Never模式 + Solr的原子更新
的方式來實現呢?