HBaseMapReduce3:
將HDFS文件內容數據寫入存儲到HBase中:
對一些大的文件,須要存入HBase中,其思想是先把文件傳到HDFS上,利用map階段讀取<key,value>對,可在reduce把這些鍵值對上傳到HBase中。java
這裏已是固定指定HDFS中的某一文件,而後在reduce中把這些鍵值對寫入到HBase中。
數據庫
public class HBaseAndMapReduce3 { public static void main(String[] args) throws Exception { System.exit(run()); } public static int run() throws Exception { Configuration conf = new Configuration(); conf = HBaseConfiguration.create(conf); conf.set("hbase.zookeeper.quorum", "192.168.226.129"); Job job = Job.getInstance(conf, "findFriend"); job.setJarByClass(HBaseAndMapReduce3.class); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); DateFormat df = new SimpleDateFormat( "yyyyMMddHHmmssS" ); FileInputFormat.addInputPath(job, new Path("hdfs://192.168.226.129:9000/hbasemapreduce1/2016051818564427/part-r-00000")); // 把數據寫入Hbase數據庫 TableMapReduceUtil.initTableReducerJob("friend",FindFriendReducer.class, job); checkTable(conf); return job.waitForCompletion(true) ? 0 : 1; } private static void checkTable(Configuration conf) throws Exception { Connection con = ConnectionFactory.createConnection(conf); Admin admin = con.getAdmin(); TableName tn = TableName.valueOf("friend"); if (!admin.tableExists(tn)){ HTableDescriptor htd = new HTableDescriptor(tn); HColumnDescriptor hcd = new HColumnDescriptor("person"); htd.addFamily(hcd); admin.createTable(htd); System.out.println("表不存在,新建立表成功...."); } } public static class FindFriendReducer extends TableReducer<Text, Text, ImmutableBytesWritable> { @Override protected void reduce( Text key, Iterable<Text> values, Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException { Put put = new Put(key.getBytes()); put.addColumn(Bytes.toBytes("person"), Bytes.toBytes("nickname"), values.iterator().next().getBytes()); context.write(new ImmutableBytesWritable(key.getBytes()), put); } } }
//原數據文件中的內容:ide
hadoop Berg-OSChina,BergBerg hbase OSChina,BergBerg zookeeper OSChina,BergBerg
///將HDFS中文件內容存入HBase中,經過客戶端全表掃描知:oop
hbase(main):003:0> scan 'friend' ROW COLUMN+CELL hadoop column=person:nickname, timestamp=1463748372584, value=Berg-OSChina,BergBerg hbasep column=person:nickname, timestamp=1463748372584, value=OSChina,BergBerggBerg zookeeper column=person:nickname, timestamp=1463748372584, value=OSChina,BergBerggBerg 3 row(s) in 0.2850 seconds