HBase(九):HBaseAndMapReduce(三)

HBaseMapReduce3:
     將HDFS文件內容數據寫入存儲到HBase中:
         對一些大的文件,須要存入HBase中,其思想是先把文件傳到HDFS上,利用map階段讀取<key,value>對,可在reduce把這些鍵值對上傳到HBase中。java


    這裏已是固定指定HDFS中的某一文件,而後在reduce中把這些鍵值對寫入到HBase中。
 數據庫

public class HBaseAndMapReduce3 {
	
	public static void main(String[] args) throws Exception {
		System.exit(run());
	}

	public static int run() throws Exception {
		Configuration conf = new Configuration();
		conf = HBaseConfiguration.create(conf);
		conf.set("hbase.zookeeper.quorum", "192.168.226.129");

		Job job = Job.getInstance(conf, "findFriend");
		job.setJarByClass(HBaseAndMapReduce3.class);

		job.setInputFormatClass(KeyValueTextInputFormat.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		DateFormat df = new SimpleDateFormat( "yyyyMMddHHmmssS" );
		
		FileInputFormat.addInputPath(job, new Path("hdfs://192.168.226.129:9000/hbasemapreduce1/2016051818564427/part-r-00000"));
		
		// 把數據寫入Hbase數據庫
		TableMapReduceUtil.initTableReducerJob("friend",FindFriendReducer.class, job);
		checkTable(conf);
		return job.waitForCompletion(true) ? 0 : 1;
	}

	private static void checkTable(Configuration conf) throws Exception {
		Connection con = ConnectionFactory.createConnection(conf);
		Admin admin = con.getAdmin();
		TableName tn = TableName.valueOf("friend");
		if (!admin.tableExists(tn)){
			HTableDescriptor htd = new HTableDescriptor(tn);
			HColumnDescriptor hcd = new HColumnDescriptor("person");
			htd.addFamily(hcd);
			admin.createTable(htd);
			System.out.println("表不存在,新建立表成功....");
		}
	}

	public static class FindFriendReducer extends
			TableReducer<Text, Text, ImmutableBytesWritable> {
		@Override
		protected void reduce(
				Text key,
				Iterable<Text> values,
				Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context)
				throws IOException, InterruptedException {
			
			Put put = new Put(key.getBytes());
			put.addColumn(Bytes.toBytes("person"), Bytes.toBytes("nickname"),
					values.iterator().next().getBytes());
			context.write(new ImmutableBytesWritable(key.getBytes()), put);
		}
	}
}

//原數據文件中的內容:ide

hadoop	Berg-OSChina,BergBerg
hbase	OSChina,BergBerg
zookeeper	OSChina,BergBerg

///將HDFS中文件內容存入HBase中,經過客戶端全表掃描知:oop

hbase(main):003:0> scan 'friend'
ROW                             COLUMN+CELL                                                                              
 hadoop                         column=person:nickname, timestamp=1463748372584, value=Berg-OSChina,BergBerg             
 hbasep                         column=person:nickname, timestamp=1463748372584, value=OSChina,BergBerggBerg             
 zookeeper                      column=person:nickname, timestamp=1463748372584, value=OSChina,BergBerggBerg             
3 row(s) in 0.2850 seconds
相關文章
相關標籤/搜索