hbase的讀取操做java
分別讀取 rowkey 和幾個列 p:t p:c f:tsweb
其中charsetUtil是爲了檢測hbase中的流的 ,避免中文亂碼出現apache
類 ReadFromHbase.java:api
package apiTest; import java.io.ByteArrayInputStream; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Level; import org.apache.log4j.Logger; public class ReadFromHbase { private static Logger hbaseLogger = Logger.getLogger("org.apache"); private static Logger logger = Logger.getLogger(ReadFromHbase.class); public static void getAllRows() throws IOException { hbaseLogger.setLevel(Level.ERROR); String strTBName = "enterprise_webpage"; // 這是讀取hbase-site.xml這個文件中的hbase Configuration conf = HBaseConfiguration.create(); conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 21600000); conf.setLong(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100); // 若是讀取的是本地的hbase 就把這句放開 // conf.set("hbase.zookeeper.quorum", "localhost"); @SuppressWarnings("resource") HTable table = new HTable(conf, Bytes.toBytes(strTBName)); // get table System.out.println("aaaaaaaaaaaaaaa"); int pagenum = 0; String siteUrl = null; logger.info("-------------------new Scan-------------------"); Scan scan = new Scan(); ResultScanner resultScanner = table.getScanner(scan); // queryAll Iterator<Result> results = resultScanner.iterator(); while (results.hasNext()) { try { ++pagenum; logger.info("已經抓取到Hbase的第" + pagenum + "條數據"); Result result = results.next(); // 得到rowkey logger.info("========= rowKey ========"); byte[] row = result.getRow(); ByteArrayInputStream rowBis = new ByteArrayInputStream(row); String rowCharset = CharSetUtil.getStreamCharSet(rowBis, "utf-8"); String rowString = new String(row, rowCharset); logger.info(rowString); logger.info("========= rowKey ========"); // 獲取title logger.info("===================================== title ====================================="); byte[] titleByte = result.getValue(Bytes.toBytes("p"), Bytes.toBytes("t")); if (titleByte == null || titleByte.length == 0) { continue; } ByteArrayInputStream titleBis = new ByteArrayInputStream(titleByte); String titleCharset = CharSetUtil.getStreamCharSet(titleBis, "utf-8"); String titleString = new String(titleByte, titleCharset); logger.info(titleString); logger.info("===================================== title ====================================="); // 獲取content logger.info("===================================== content ====================================="); byte[] contentValue = result.getValue(Bytes.toBytes("p"), Bytes.toBytes("c")); if (contentValue == null || contentValue.length == 0) { continue; } ByteArrayInputStream bai = new ByteArrayInputStream(contentValue); String a = CharSetUtil.getStreamCharSet(bai, "utf-8"); String content = new String(contentValue, a); logger.info(content); logger.info("===================================== content ====================================="); // 獲取fetchTime logger.info("===================================== fetchTime ====================================="); byte[] timeByte = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("ts")); if (timeByte == null || timeByte.length == 0) { continue; } // ByteArrayInputStream timeBis = new // ByteArrayInputStream(timeByte); // String timeCharset = CharSetUtil.getStreamCharSet(timeBis, // "utf-8"); long bytes2Long = bytes2Long(timeByte); SimpleDateFormat sfd = new SimpleDateFormat("yyyy-MM-dd"); Date d = new Date(bytes2Long); String time2 = sfd.format(d); // String timeString = new String(timeByte, "utf-8"); logger.info(time2); logger.info("===================================== fetchTime ====================================="); } catch (Exception e) { logger.error(e.toString(), e); logger.error(siteUrl); continue; } } resultScanner.close(); logger.info("-----------------close scanner--------------------"); } public static long bytes2Long(byte[] byteNum) { long num = 0; for (int ix = 0; ix < 8; ++ix) { num <<= 8; num |= (byteNum[ix] & 0xff); } return num; } public static void main(String[] args) { try { ReadFromHbase.getAllRows(); } catch (IOException e) { e.printStackTrace(); } } }
工具類 CharSetUtil.java:工具
package apiTest; import java.io.InputStream; import java.nio.charset.Charset; import info.monitorenter.cpdetector.io.ASCIIDetector; import info.monitorenter.cpdetector.io.CodepageDetectorProxy; import info.monitorenter.cpdetector.io.JChardetFacade; import info.monitorenter.cpdetector.io.ParsingDetector; import info.monitorenter.cpdetector.io.UnicodeDetector; public class CharSetUtil { private static final CodepageDetectorProxy detector; static{ detector=CodepageDetectorProxy.getInstance(); detector.add(new ParsingDetector(false)); detector.add(ASCIIDetector.getInstance()); detector.add(UnicodeDetector.getInstance()); detector.add(JChardetFacade.getInstance()); } public static String getStreamCharSet(InputStream inputStream,String defaultCharSet){ if(inputStream==null){ return defaultCharSet; } int count=200; try{ count=inputStream.available(); }catch(Exception e){ e.printStackTrace(); } try { Charset charset=detector.detectCodepage(inputStream, count); if(charset!=null){ return charset.name(); } } catch (Exception e) { e.printStackTrace(); } return defaultCharSet; } }