讀hbase

hbase的讀取操做java

分別讀取 rowkey 和幾個列 p:t   p:c   f:tsweb

其中charsetUtil是爲了檢測hbase中的流的 ,避免中文亂碼出現apache

類 ReadFromHbase.java:api

package apiTest;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

public class ReadFromHbase {
	private static Logger hbaseLogger = Logger.getLogger("org.apache");
	private static Logger logger = Logger.getLogger(ReadFromHbase.class);

	public static void getAllRows() throws IOException {
		hbaseLogger.setLevel(Level.ERROR);
		String strTBName = "enterprise_webpage";
		// 這是讀取hbase-site.xml這個文件中的hbase
		Configuration conf = HBaseConfiguration.create();
		conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 21600000);
		conf.setLong(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100);
		// 若是讀取的是本地的hbase 就把這句放開
		// conf.set("hbase.zookeeper.quorum", "localhost");
		@SuppressWarnings("resource")
		HTable table = new HTable(conf, Bytes.toBytes(strTBName)); // get table
		System.out.println("aaaaaaaaaaaaaaa");
		int pagenum = 0;
		String siteUrl = null;
		logger.info("-------------------new Scan-------------------");
		Scan scan = new Scan();
		ResultScanner resultScanner = table.getScanner(scan); // queryAll
		Iterator<Result> results = resultScanner.iterator();

		while (results.hasNext()) {
			try {
				++pagenum;
				logger.info("已經抓取到Hbase的第" + pagenum + "條數據");
				Result result = results.next();
				// 得到rowkey
				logger.info("========= rowKey ========");
				byte[] row = result.getRow();
				ByteArrayInputStream rowBis = new ByteArrayInputStream(row);
				String rowCharset = CharSetUtil.getStreamCharSet(rowBis, "utf-8");
				String rowString = new String(row, rowCharset);
				logger.info(rowString);
				logger.info("========= rowKey ========");

				// 獲取title
				logger.info("===================================== title =====================================");
				byte[] titleByte = result.getValue(Bytes.toBytes("p"), Bytes.toBytes("t"));
				if (titleByte == null || titleByte.length == 0) {
					continue;
				}
				ByteArrayInputStream titleBis = new ByteArrayInputStream(titleByte);
				String titleCharset = CharSetUtil.getStreamCharSet(titleBis, "utf-8");
				String titleString = new String(titleByte, titleCharset);
				logger.info(titleString);
				logger.info("===================================== title =====================================");

				// 獲取content
				logger.info("===================================== content =====================================");
				byte[] contentValue = result.getValue(Bytes.toBytes("p"), Bytes.toBytes("c"));
				if (contentValue == null || contentValue.length == 0) {
					continue;
				}
				ByteArrayInputStream bai = new ByteArrayInputStream(contentValue);
				String a = CharSetUtil.getStreamCharSet(bai, "utf-8");
				String content = new String(contentValue, a);
				logger.info(content);
				logger.info("===================================== content =====================================");

				// 獲取fetchTime
				logger.info("===================================== fetchTime =====================================");
				byte[] timeByte = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("ts"));
				if (timeByte == null || timeByte.length == 0) {
					continue;
				}
				// ByteArrayInputStream timeBis = new
				// ByteArrayInputStream(timeByte);
				// String timeCharset = CharSetUtil.getStreamCharSet(timeBis,
				// "utf-8");
				long bytes2Long = bytes2Long(timeByte);
				SimpleDateFormat sfd = new SimpleDateFormat("yyyy-MM-dd");
				Date d = new Date(bytes2Long);
				String time2 = sfd.format(d);

				// String timeString = new String(timeByte, "utf-8");

				logger.info(time2);
				logger.info("===================================== fetchTime =====================================");

			} catch (Exception e) {
				logger.error(e.toString(), e);
				logger.error(siteUrl);
				continue;
			}
		}
		resultScanner.close();
		logger.info("-----------------close scanner--------------------");
	}

	public static long bytes2Long(byte[] byteNum) {
		long num = 0;
		for (int ix = 0; ix < 8; ++ix) {
			num <<= 8;
			num |= (byteNum[ix] & 0xff);
		}
		return num;
	}

	public static void main(String[] args) {
		try {
			ReadFromHbase.getAllRows();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

 

工具類 CharSetUtil.java:工具

package apiTest;

import java.io.InputStream;
import java.nio.charset.Charset;

import info.monitorenter.cpdetector.io.ASCIIDetector;
import info.monitorenter.cpdetector.io.CodepageDetectorProxy;
import info.monitorenter.cpdetector.io.JChardetFacade;
import info.monitorenter.cpdetector.io.ParsingDetector;
import info.monitorenter.cpdetector.io.UnicodeDetector;

	public class CharSetUtil {
		private static final CodepageDetectorProxy detector;
		static{
			detector=CodepageDetectorProxy.getInstance();
			detector.add(new ParsingDetector(false));
			detector.add(ASCIIDetector.getInstance());
			detector.add(UnicodeDetector.getInstance());
			detector.add(JChardetFacade.getInstance());
		}
		public static String getStreamCharSet(InputStream inputStream,String defaultCharSet){
			if(inputStream==null){
				return defaultCharSet;
			}
			int count=200;
			try{
				count=inputStream.available();
				
			}catch(Exception e){
				e.printStackTrace();
			}
			try {
				Charset charset=detector.detectCodepage(inputStream, count);
				if(charset!=null){
					return charset.name();
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
			return defaultCharSet;
		}
	
}
相關文章
相關標籤/搜索