Hive 數據導入HBase的2種方法詳解

最近常常被問到這個問題,因此簡單寫一下總結。html

Hive數據導入到HBase基本有2個方案:java

    一、HBase中建表,而後Hive中建一個外部表,這樣當Hive中寫入數據後,HBase中也會同時更新apache

    二、MapReduce讀取Hive數據,而後寫入(API或者Bulkload)到HBasejson

一、Hive 外部表

建立hbase表app

(1) 創建一個表格classes具備1個列族user函數

create 'classes','user'

(2) 查看錶的構造oop

hbase(main):005:0> describe 'classes'
DESCRIPTION ENABLED
 'classes', {NAME => 'user', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', true
  VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '2147483647', KEEP_DELETED_CELLS => '
 false', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}

(3) 加入2行數據spa

put 'classes','001','user:name','jack'
put 'classes','001','user:age','20'
put 'classes','002','user:name','liza'
put 'classes','002','user:age','18'

(4) 查看classes中的數據code

hbase(main):016:0> scan 'classes'
ROW COLUMN+CELL
 001 column=user:age, timestamp=1404980824151, value=20
 001 column=user:name, timestamp=1404980772073, value=jack
 002 column=user:age, timestamp=1404980963764, value=18
 002 column=user:name, timestamp=1404980953897, value=liza

(5) 建立外部hive表,查詢驗證orm

create external table classes(id int, name string, age int) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,user:name,user:age") 
TBLPROPERTIES("hbase.table.name" = "classes");

select * from classes;
OK
1 jack 20
2 liza 18

(6)再添加數據到HBase

put 'classes','003','user:age','1820183291839132'
hbase(main):025:0> scan 'classes'
ROW COLUMN+CELL
 001 column=user:age, timestamp=1404980824151, value=20
 001 column=user:name, timestamp=1404980772073, value=jack
 002 column=user:age, timestamp=1404980963764, value=18
 002 column=user:name, timestamp=1404980953897, value=liza
 003 column=user:age, timestamp=1404981476497, value=1820183291839132

(7)Hive查詢,看看新數據

select * from classes;
OK
1 jack 20
2 liza 18
3 NULL NULL    --這裏是null了,由於003沒有name,因此補位Null,而age爲Null是由於超過最大值

(8)以下做爲驗證

put 'classes','004','user:name','test'
put 'classes','004','user:age','1820183291839112312'  -- 已經超int了
hbase(main):030:0> scan 'classes'
ROW COLUMN+CELL
 001 column=user:age, timestamp=1404980824151, value=20
 001 column=user:name, timestamp=1404980772073, value=jack
 002 column=user:age, timestamp=1404980963764, value=18
 002 column=user:name, timestamp=1404980953897, value=liza
 003 column=user:age, timestamp=1404981476497, value=1820183291839132
 004 column=user:age, timestamp=1404981558125, value=1820183291839112312
 004 column=user:name, timestamp=1404981551508, value=test                   
select * from classes;
1 jack 20
2 liza 18
3 NULL NULL
4 test NULL    -- 超int後也認爲是null
put 'classes','005','user:age','1231342'
hbase(main):034:0* scan 'classes'
ROW COLUMN+CELL
 001 column=user:age, timestamp=1404980824151, value=20
 001 column=user:name, timestamp=1404980772073, value=jack
 002 column=user:age, timestamp=1404980963764, value=18
 002 column=user:name, timestamp=1404980953897, value=liza
 003 column=user:age, timestamp=1404981476497, value=1820183291839132
 004 column=user:age, timestamp=1404981558125, value=1820183291839112312
 004 column=user:name, timestamp=1404981551508, value=test
 005 column=user:age, timestamp=1404981720600, value=1231342   
select * from classes;
1 jack 20
2 liza 18
3 NULL NULL
4 test NULL
5 NULL 1231342


注意點:

    一、hbase中的空cell在hive中會補null

    二、hive和hbase中不匹配的字段會補null

    三、Bytes類型的數據,建hive表示加#b

    http://stackoverflow.com/questions/12909118/number-type-value-in-hbase-not-recognized-by-hive

    http://www.aboutyun.com/thread-8023-1-1.html

    四、HBase CF to hive Map

    https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration


二、MapReduce 寫入 HBase

    MR寫入到HBase有2個經常使用方法,1是直接調用HBase Api,使用Table 、Put寫入;2是經過MR生成HFile,而後Bulkload到HBase,數據量很大的時候推薦使用

注意點:

    一、若是須要從hive的路徑中讀取一些值怎麼辦   

private String reg = "stat_date=(.*?)\\/softid=([\\d]+)/";
private String stat_date;
private String softid;
 ------------廈門map函數中寫入-------------
String filePathString = ((FileSplit) context.getInputSplit()).getPath().toString();
///user/hive/warehouse/snapshot.db/stat_all_info/stat_date=20150820/softid=201/000000_0
// 解析stat_date 和softid
Pattern pattern = Pattern.compile(reg);
Matcher matcher = pattern.matcher(filePathString);
while(matcher.find()){
	stat_date = matcher.group(1);
	softid = matcher.group(2);
}

    二、hive中的map和list怎麼處理

    hive中的分隔符主要有8種,分別是\001-----> \008

默認    ^A    \001
,       ^B    \002
:       ^C    \003

 Hive中保存的Lis,最底層的數據格式爲 jerrick,  liza, tom, jerry , Map的數據格式爲  jerrick:23, liza:18, tom:0

因此在MR讀入時須要簡單處理下,例如map須要:   "{"+ mapkey.replace("\002", ",").replace("\003", ":")+"}", 由此再轉爲JSON, toString後再保存到HBase。

    三、簡單實例,代碼刪減不少,僅可參考!

public void map(
				LongWritable key,
				Text value,
				Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>.Context context) {
				String filePathString = ((FileSplit) context.getInputSplit()).getPath().toString();
				///user/hive/warehouse/snapshot.db/stat_all_info/stat_date=20150820/softid=201/000000_0
				// 解析stat_date 和softid
				Pattern pattern = Pattern.compile(reg);
				Matcher matcher = pattern.matcher(filePathString);
				while(matcher.find()){
					stat_date = matcher.group(1);
					softid = matcher.group(2);
				}
				
				rowMap.put("stat_date", stat_date);
				rowMap.put("softid", softid);
				
				String[] vals = value.toString().split("\001");
				
				try {
					Configuration conf = context.getConfiguration();
					String cf = conf.get("hbase.table.cf", HBASE_TABLE_COLUME_FAMILY);
					
					String arow = rowkey; 
					for(int index=10; index < vals.length; index++){
						byte[] row = Bytes.toBytes(arow);
						ImmutableBytesWritable k = new ImmutableBytesWritable(row);
						KeyValue kv = new KeyValue();
						if(index == vals.length-1){
							//dict need 
							logger.info("d is :" + vals[index]);
							logger.info("d is :" + "{"+vals[index].replace("\002", ",").replace("\003", ":")+"}");
							
							
							JSONObject json = new JSONObject("{"+vals[index].replace("\002", ",").replace("\003", ":")+"}");
							kv = new KeyValue(row, cf.getBytes(),Bytes.toBytes(valueKeys[index]), Bytes.toBytes(json.toString()));
						}else{
							kv = new KeyValue(row, cf.getBytes(),Bytes.toBytes(valueKeys[index]), Bytes.toBytes(vals[index]));
						}
						context.write(k, kv);
					}
					
				} catch (Exception e1) {
					context.getCounter("offile2HBase", "Map ERROR").increment(1);
					logger.info("map error:" + e1.toString());
				}
				
			context.getCounter("offile2HBase", "Map TOTAL").increment(1);

		}
	}

   四、bulkload

int jobResult = (job.waitForCompletion(true)) ? 0 : 1;
logger.info("jobResult=" + jobResult);
Boolean bulkloadHfileToHbase = Boolean.valueOf(conf.getBoolean("hbase.table.hfile.bulkload", false));
if ((jobResult == 0) && (bulkloadHfileToHbase.booleanValue())) {
	LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
	loader.doBulkLoad(outputDir, hTable);
}
相關文章
相關標籤/搜索