JavaAPI與hbase的交互

JavaAPI鏈接Hbase

準備工做:

  •   確保集羣各節點服務運行正常
  •   確保zookeeper能夠正常工做
  •   已經開啓hbase-master,hbase-regionserver
  •   環境:windows7下eclipse,集羣正常工做。
    •   因爲咱們是在windows7下運行,因此咱們將服務器中的hadoop程序拷貝到本地。
    •   在系統的環境變量中設置HADOOP_HOME,而且將%HADOOP_HOME%/bin添加到path當中。
    •   接下來在運行中,執行程序的時候eclipse會報錯說找不到winutils.exe。因此咱們還要下載一個winutils.exe,我這裏提供一個github上的下載連接https://github.com/srccodes/hadoop-common-2.2.0-bin,下載之後解壓到bin下便可。(版本雖然很舊,可是可使用)

鏈接hbase並進行簡單操做:

  1. 建立JavaProject,導入須要的jar包,jar包來自於服務器上hbase中的lib文件夾下的jar文件,因此將lib直接拷貝到當前工程中,並build path(導入這些額外的包)。
  2. 同時拷貝hbase下的log4j.properties文件到項目中,在執行過程當中能夠查看到執行過程當中產生的日誌。
  3. 建立鏈接hbase須要的配置信息
    1. Java客戶端其實就是shell客戶端的一種實現,操做命令基本上就是shell客戶端命令的一個映射。
    2. Java客戶端使用的配置信息是被映射到了HbaseConfiguration的實例對象中的,使用create方法建立實例化對象的時候,會從classpath中獲取hbase-site.xml文件並進行配置文件內容的讀取。同時也會讀取hadoop的配置文件信息。這裏咱們給定zookeeper的相關配置信息便可。
    3. 流程:先經過zookeeper拿到hbase:namespace的路徑,而後從這個路徑中拿到hbase:meta表的信息,接着就拿到了用戶表的路徑   
  4. 代碼實現以下
     1 package com.hblink.demo;
     2 
     3 import java.io.IOException;
     4 
     5 import org.apache.hadoop.conf.Configuration;
     6 import org.apache.hadoop.hbase.HBaseConfiguration;
     7 
     8 public class Hblink {
     9     /**
    10      * 獲取hbase的配置文件信息
    11      * 
    12      * @return
    13      * @throws IOException
    14      */
    15     public static Configuration getHBaseConfiguration() throws IOException {
    16         Configuration conf = HBaseConfiguration.create();
    17         // zookeeper的配置信息
    18         conf.set("hbase.zookeeper.quorum", "kslave5,kslave6,kslave7");// zookeeper節點信息
    19         conf.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper端口
    20         // conf.set("dfs.socket.timeout", "180000");
    21         return conf;
    22     }
    23 }
  5. 有了配置信息之後,咱們開始經過配置信息鏈接hbase

    HBaseAdmin類:是主要進行DDL操做相關的一個接口類,主要包括命名空間管理,用戶表管理。經過該接口咱們能夠建立、刪除、獲取用戶表,也能夠進行用戶表的分割,緊縮等操做。
    HTable類:是hbase中的用戶表的一個映射的java實例,經過該類進行表數據的操做,包括數據的增刪改查,也就是在這裏咱們能夠相似shell中put,get和sacn進行數據的操做。java

    HTableDescriptor類:是hbase用戶表的具體描述信息類,通常咱們建立表獲取表信息,就是經過該類進行的。git

     1 package com.hblink.test;
     2 
     3 import java.io.IOException;
     4 
     5 import org.apache.hadoop.conf.Configuration;
     6 import org.apache.hadoop.hbase.HColumnDescriptor;
     7 import org.apache.hadoop.hbase.HTableDescriptor;
     8 import org.apache.hadoop.hbase.TableName;
     9 import org.apache.hadoop.hbase.client.HBaseAdmin;
    10 
    11 import com.hblink.demo.Hblink;
    12 
    13 public class HbTest {
    14     public static void main(String[] args) throws Exception {
    15         Configuration configuration = Hblink.getHBaseConfiguration();
    16         HBaseAdmin hBaseAdmin = new HBaseAdmin(configuration);
    17         try {
    18             createTestTable(hBaseAdmin);
    19         } finally {
    20             hBaseAdmin.close(); // 資源釋放
    21         }
    22     }
    23 
    24     /**
    25      * 測試建立表table
    26      * 
    27      * @throws IOException
    28      */
    29     static void createTestTable(HBaseAdmin hbAdmin) throws IOException {
    30         TableName tableName = TableName.valueOf("stock-info"); // 建立表名
    31         HTableDescriptor hDescriptor = new HTableDescriptor(tableName);
    32         hDescriptor.addFamily(new HColumnDescriptor("f"));// 給定列族
    33         hbAdmin.createTable(hDescriptor);
    34         System.out.println("建立表成功!");
    35     }
    36 }

     

  6. 接下來是數據插入
     1 package com.hblink.test;
     2 
     3 import java.io.BufferedReader;
     4 import java.io.File;
     5 import java.io.FileInputStream;
     6 import java.io.FileNotFoundException;
     7 import java.io.IOException;
     8 import java.io.InputStreamReader;
     9 
    10 import org.apache.hadoop.conf.Configuration;
    11 import org.apache.hadoop.hbase.client.HTable;
    12 import org.apache.hadoop.hbase.client.Put;
    13 import org.apache.hadoop.hbase.util.Bytes;
    14 
    15 import com.hblink.demo.Hblink;
    16 
    17 public class TableTest {
    18     public static int count = 0;
    19 
    20     public static void main(String[] args) throws IOException {
    21 
    22         HTable hTable = null;
    23         Configuration configuration = Hblink.getHBaseConfiguration();
    24 
    25         hTable = new HTable(configuration, "stock-info");
    26 
    27         testPut(hTable);//插入數據
    28 
    29         hTable.close();//釋放資源
    30 
    31     }
    32 
    33     /**
    34      * 測試往表裏插入數據
    35      * 
    36      * @param hTable
    37      * @throws IOException
    38      */
    39     static void testPut(HTable hTable) throws IOException {
    40 
    41         File file = new File("./20171120sh.txt"); //獲取本地文件
    42         InputStreamReader isr = null;
    43         try {
    44             isr = new InputStreamReader(new FileInputStream(file), "utf-8");
    45         } catch (FileNotFoundException e) {
    46             e.printStackTrace();
    47         }
    48         if (isr == null) {
    49             return;
    50         }
    51         BufferedReader br = new BufferedReader(isr);
    52         String re = "";
    53         while ((re = br.readLine()) != null) {
    54             String[] sarr = re.split(",");
    55             // System.out.println(sarr[0] + "-" + sarr[1] + "-" + sarr[2] + "-" + sarr[3] +
    56             // sarr[4] + "-" + sarr[5] + "-" + sarr[6]);
    57             // System.out.println(sarr[0]);
    58 
    59             Put put = new Put(Bytes.toBytes(sarr[0]));
    60             put.add(Bytes.toBytes("f"), Bytes.toBytes("Stock"), Bytes.toBytes(sarr[1]));
    61             put.add(Bytes.toBytes("f"), Bytes.toBytes("Date"), Bytes.toBytes(sarr[2]));
    62             put.add(Bytes.toBytes("f"), Bytes.toBytes("Top"), Bytes.toBytes(sarr[3]));
    63             put.add(Bytes.toBytes("f"), Bytes.toBytes("Change-rate"), Bytes.toBytes(sarr[4]));
    64             put.add(Bytes.toBytes("f"), Bytes.toBytes("Volume"), Bytes.toBytes(sarr[5]));
    65             put.add(Bytes.toBytes("f"), Bytes.toBytes("Turnover"), Bytes.toBytes(sarr[6]));
    66             hTable.put(put);
    67             count++;
    68         }
    69         System.out.println("插入" + (count - 1) + "條數據成功!");
    70     }
    71 }
  7. 對錶中數據的查詢
      1 package com.hblink.test;
      2 
      3 import java.io.IOException;
      4 import java.util.Scanner;
      5 
      6 import org.apache.hadoop.conf.Configuration;
      7 import org.apache.hadoop.hbase.client.HTable;
      8 import org.apache.hadoop.hbase.client.Result;
      9 import org.apache.hadoop.hbase.client.ResultScanner;
     10 import org.apache.hadoop.hbase.client.Scan;
     11 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
     12 import org.apache.hadoop.hbase.filter.Filter;
     13 import org.apache.hadoop.hbase.filter.RegexStringComparator;
     14 import org.apache.hadoop.hbase.filter.RowFilter;
     15 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
     16 import org.apache.hadoop.hbase.util.Bytes;
     17 
     18 import com.hblink.demo.Hblink;
     19 
     20 public class ScanHbase {
     21     public static Boolean flag = true;
     22     public static String string = null;
     23 
     24     public static void main(String[] args) throws IOException {
     25         while (flag) {
     26             HTable hTable = null;
     27             Configuration configuration = Hblink.getHBaseConfiguration();
     28             hTable = new HTable(configuration, "stock-info");
     29 
     30             Scanner sc = new Scanner(System.in);
     31             System.out.print("請輸入須要查詢的股票代碼:");
     32             string = sc.next();
     33 
     34             // scanTestCell(hTable);
     35             scanTestRow(hTable);
     36             hTable.close();
     37             if (string.equals("quit")) {
     38                 flag = false;
     39             }
     40         }
     41     }
     42 
     43     /**
     44      * 經過列查詢
     45      * 
     46      * @param hTable
     47      * @throws IOException
     48      */
     49     static void scanTestCell(HTable hTable) throws IOException {
     50 
     51         // 設置過濾器
     52         SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes("f"),
     53                 Bytes.toBytes("Date"), CompareOp.EQUAL, Bytes.toBytes(string));
     54         // 設置全表掃描封裝類
     55         Scan scan = new Scan();
     56         // 添加過濾器(經過股票代碼查詢)
     57         scan.setFilter(singleColumnValueFilter);
     58         // 掃描
     59         ResultScanner resultScanner = hTable.getScanner(scan);
     60         for (Result result : resultScanner) {
     61             byte[] data = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Date"));
     62             byte[] stock = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Stock"));
     63             byte[] top = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Top"));
     64             byte[] change_rate = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Change-rate"));
     65             byte[] volume = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Volume"));
     66             byte[] turnover = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Turnover"));
     67 
     68             System.out.print(Bytes.toString(data) + ";");
     69             System.out.print(Bytes.toString(stock) + ";");
     70             if (Bytes.toString(top).equals("--")) {
     71                 System.out.print(Bytes.toString(top) + ";");
     72             } else {
     73                 System.out.print(Bytes.toInt(top) + ";");
     74             }
     75             System.out.print(Bytes.toString(change_rate) + ";");
     76             System.out.print(Bytes.toString(volume) + ";");
     77             System.out.print(Bytes.toString(turnover));
     78             System.out.println();
     79 
     80         }
     81 
     82     }
     83 
     84     /**
     85      * 經過正則--匹配行鍵
     86      * 
     87      * @param hTable
     88      * @throws IOException
     89      */
     90     static void scanTestRow(HTable hTable) throws IOException {
     91         RegexStringComparator re = new RegexStringComparator("^" + string + "");
     92         Filter filter = new RowFilter(CompareOp.EQUAL, re);
     93         Scan scan = new Scan();
     94         // 添加過濾器(經過股票代碼查詢)
     95         scan.setFilter(filter);
     96         // 掃描
     97         ResultScanner resultScanner = hTable.getScanner(scan);
     98         for (Result result : resultScanner) {
     99             byte[] data = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Date"));
    100             byte[] stock = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Stock"));
    101             byte[] top = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Top"));
    102             byte[] change_rate = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Change-rate"));
    103             byte[] volume = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Volume"));
    104             byte[] turnover = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Turnover"));
    105 
    106             System.out.print(Bytes.toString(data) + ";");
    107             System.out.print(Bytes.toString(stock) + ";");
    108             if (Bytes.toString(top).equals("--")) {
    109                 System.out.print(Bytes.toString(top) + ";");
    110             } else {
    111                 System.out.print(Bytes.toInt(top) + ";");
    112             }
    113             System.out.print(Bytes.toString(change_rate) + ";");
    114             System.out.print(Bytes.toString(volume) + ";");
    115             System.out.print(Bytes.toString(turnover));
    116             System.out.println();
    117 
    118         }
    119     }
    120 }


ps.繼續學習中=====github

相關文章
相關標籤/搜索