摘要: HBase能夠經過協處理器 Coprocessor 的 方式向Solr發出請求,Solr對於接收到的數據能夠作相關的同步:增、刪、改索引的操做,這樣就能夠同時使用HBase存儲量大和Solr檢索性能高 的優勢了,更況且HBase和Solr均可以集羣。這對海量數據存儲、檢索提供了一種方式,將存儲與索引放在不一樣的機器上,是大數據 架構的必須品。 html
關鍵詞: HBase, Solr, Coprocessor , 大數據 , 架構 java
正如個人以前的博客「 Solr與HBase架構設計 」中所述,HBase和Solr能夠經過協處理器 Coprocessor 的方式向Solr發出請求,Solr對於接收到的數據能夠作相關的同步:增、刪、改索引的操做。將存儲與索引放在不一樣的機器上,這是大數據架構的必須品,但目前還有不少不懂得此道的同窗,他們對於這種思想感到很新奇,不過,這絕對是好的方向,因此不懂得抓緊學習吧。 apache
有個朋友給個人那篇博客留言,說CDH也能夠作這樣的事情,我尚未試過,他還問我要與此相關的代碼,因而我就稍微整理了一下,做爲本篇文章的主要內容。關於CDH的事,我會盡快嘗試,有知道的同窗能夠給我留言。 架構
下面我主要講述一下,我測試對HBase和Solr的性能時,使用HBase 協處理器向HBase添加數據所編寫的相關代碼,及解釋說明。 ide
1、編寫HBase協處理器Coprocessor oop
一旦有數據postPut,就當即對Solr裏相應的Core更新。這裏使用了 ConcurrentUpdateSolrServer,它是Solr速率性能的保證,使用它不要忘記在Solr裏面配置autoCommit喲。 post
/* 性能 *版權:王安琪 學習 *描述:監視HBase,一有數據postPut就向Solr發送,本類要做爲觸發器添加到HBase 測試 *修改時間:2014-05-27 *修改內容:新增 */ package solrHbase.test;
import java.io.UnsupportedEncodingException;
import ***;
public class SorlIndexCoprocessorObserver extends BaseRegionObserver {
private static final Logger LOG = LoggerFactory .getLogger(SorlIndexCoprocessorObserver.class); private static final String solrUrl = "http://192.1.11.108:80/solr/core1"; private static final SolrServer solrServer = new ConcurrentUpdateSolrServer( solrUrl, 10000, 20);
/** * 創建solr索引 * * @throws UnsupportedEncodingException */ @Override public void postPut(final ObserverContext e, final Put put, final WALEdit edit, final boolean writeToWAL) throws UnsupportedEncodingException { inputSolr(put); }
public void inputSolr(Put put) { try { solrServer.add(TestSolrMain.getInputDoc(put)); } catch (Exception ex) { LOG.error(ex.getMessage()); } } } |
注意:getInputDoc是這個HBase協處理器Coprocessor的精髓所在,它能夠把HBase內的Put裏的內容轉化成Solr須要的值。其中 String fieldName = key.substring(key.indexOf( columnFamily ) + 3, key.indexOf( " 我在這" )).trim(); 這裏有一個亂碼字符,在這裏看不到,請你們注意一下。
public static SolrInputDocument getInputDoc(Put put) { SolrInputDocument doc = new SolrInputDocument(); doc.addField("test_ID", Bytes.toString(put.getRow())); for (KeyValue c : put.getFamilyMap().get(Bytes.toBytes(columnFamily))) { String key = Bytes.toString(c.getKey()); String value = Bytes.toString(c.getValue()); if (value.isEmpty()) { continue; } String fieldName = key.substring(key.indexOf(columnFamily) + 3, key.indexOf(" ")).trim(); doc.addField(fieldName, value); } return doc; } |
2、編寫測試程序入口代碼main
這段代碼向HBase請求建了一張表,並將模擬的數據,向HBase連續地提交數據內容,在HBase中不斷地插入數據,同時記錄時間,測試插入性能。
/* *版權:王安琪 *描述:測試HBaseInsert,HBase插入性能 *修改時間:2014-05-27 *修改內容:新增 */ package solrHbase.test;
import hbaseInput.HbaseInsert;
import ***;
public class TestHBaseMain {
private static Configuration config; private static String tableName = "angelHbase"; private static HTable table = null; private static final String columnFamily = "wanganqi";
/** * @param args */ public static void main(String[] args) { config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum", "192.103.101.104"); HbaseInsert.createTable(config, tableName, columnFamily); try { table = new HTable(config, Bytes.toBytes(tableName)); for (int k = 0; k < 1; k++) { Thread t = new Thread() { public void run() { for (int i = 0; i < 100000; i++) { HbaseInsert.inputData(table, PutCreater.createPuts(1000, columnFamily)); Calendar c = Calendar.getInstance(); String dateTime = c.get(Calendar.YEAR) + "-" + c.get(Calendar.MONTH) + "-" + c.get(Calendar.DATE) + "T" + c.get(Calendar.HOUR) + ":" + c.get(Calendar.MINUTE) + ":" + c.get(Calendar.SECOND) + ":" + c.get(Calendar.MILLISECOND) + "Z 寫入: " + i * 1000; System.out.println(dateTime); } } }; t.start(); } } catch (IOException e1) { e1.printStackTrace(); } }
} |
下面的是與HBase相關的操做,把它封裝到一個類中,這裏就只有建表與插入數據的相關代碼。
/* *版權:王安琪 *描述:與HBase相關操做,建表與插入數據 *修改時間:2014-05-27 *修改內容:新增 */ package hbaseInput; import ***; import org.apache.hadoop.hbase.client.Put;
public class HbaseInsert {
public static void createTable(Configuration config, String tableName, String columnFamily) { HBaseAdmin hBaseAdmin; try { hBaseAdmin = new HBaseAdmin(config); if (hBaseAdmin.tableExists(tableName)) { return; } HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); tableDescriptor.addFamily(new HColumnDescriptor(columnFamily)); hBaseAdmin.createTable(tableDescriptor); hBaseAdmin.close(); } catch (MasterNotRunningException e) { e.printStackTrace(); } catch (ZooKeeperConnectionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }
public static void inputData(HTable table, ArrayList puts) { try { table.put(puts); table.flushCommits(); puts.clear(); } catch (IOException e) { e.printStackTrace(); } } } |
3、編寫模擬數據Put
向HBase中寫入數據須要構造Put,下面是我構造模擬數據Put的方式,有字符串的生成,我是由mmseg提供的詞典 words.dic 中隨機讀取一些詞語鏈接起來,生成一句字符串的,下面的代碼沒有體現,不過很easy,你本身造你本身想要的數據就OK了。
public static Put createPut(String columnFamily) { String ss = getSentence(); byte [] family = Bytes. toBytes (columnFamily); byte[] rowKey = Bytes.toBytes("" + Math.abs(r.nextLong())); Put put = new Put(rowKey); put.add(family, Bytes.toBytes("DeviceID"), Bytes.toBytes("" + Math.abs(r.nextInt()))); ****** put.add(family, Bytes.toBytes(" Company_mmsegsm "), Bytes.toBytes("ss"));
return put; } |
固然在運行上面這個程序以前,須要先在Solr裏面配置好你須要的列信息,HBase、Solr安裝與配置,它們的基礎使用方法將會在以後的文章中介紹。在這裏,Solr的列配置就跟你使用createPut生成的Put搞成同樣的列名就好了,固然也可使用動態列的形式。
4、直接對Solr性能測試
若是你不想對HBase與Solr的相結合進行測試,只想單獨對Solr的性能進行測試,這就更簡單了,徹底能夠利用上面的代碼段來測試,稍微組裝一下就能夠了。
private static void sendConcurrentUpdateSolrServer(final String url, final int count) throws SolrServerException, IOException { SolrServer solrServer = new ConcurrentUpdateSolrServer(url, 10000, 20); for (int i = 0; i < count; i++) { solrServer.add(getInputDoc(PutCreater.createPut(columnFamily))); } } |