大數據之HBase

Author: Lijbhtml

HBase是一個分佈式的、面向列的開源數據庫,該技術來源於 Fay Chang 所撰寫的Google論文「Bigtable:一個結構化數據的分佈式存儲系統」。就像Bigtable利用了Google文件系統(File System)所提供的分佈式數據存儲同樣,HBase在Hadoop之上提供了相似於Bigtable的能力。HBase是Apache的Hadoop項目的子項目。HBase不一樣於通常的關係數據庫,它是一個適合於非結構化數據存儲的數據庫。另外一個不一樣的是HBase基於列的而不是基於行的模式。java

Hbase和HDFS之間關係?node

由於HDFS文件系統雖然支持海量數據存儲,可是不擅長對單條記錄作高效的管理(查詢、修改、刪除,增長)不支持對海量數據作隨機讀寫。HBase是構建在HDFS上的一款NoSQL數據庫,實現了對HDFS上的數據的高效管理,可以實現對海量數據的隨機讀寫,實現行級別數據管理。shell

HBase數據庫特色數據庫

  • 大 hbase一張表規模通常是在數億行*數百萬列且每一列具有上千個版本
  • 稀疏 ,HBase沒有固定的表結構,在一行記錄中,能夠有任意多個列存在(提高磁盤利用率)。
  • HBase沒有數據類型,全部的類型都是以字節數組形式存在。
  • 該數據和常規的數據庫最大的區別是在底層對錶中記錄管理形式上有很大的區別,由於絕大多數數據庫都是面向行存儲的模式,致使了系統的IO利用率低。在HBase中採用面向列存儲的形式,極大的提高系統的IO利用率。

行存儲和列存儲apache

行存儲centos

列存儲數組

HBase環境搭建bash

  • 確保hadoop能正常運行(HDFS),必須配置 HADOOP_HOME架構

  • 安裝zookeeper(管理hbase服務)

    [root@CentOS ~]# tar -zxf zookeeper-3.4.6.tar.gz -C /usr/ [root@CentOS ~]# vi /usr/zookeeper-3.4.6/conf/zoo.cfg tickTime=2000 dataDir=/root/zkdata clientPort=2181 [root@CentOS ~]# mkdir /root/zkdata [root@CentOS zookeeper-3.4.6]# ./bin/zkServer.sh start zoo.cfg JMX enabled by default Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg Starting zookeeper ... STARTED [root@CentOS zookeeper-3.4.6]# ./bin/zkServer.sh status zoo.cfg JMX enabled by default Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg Mode: standalone [root@centos ~]# jps 1612 SecondaryNameNode 1348 NameNode 1742 QuorumPeerMain //zookeeper 1437 DataNode

  • 安裝HBase

    [root@centos ~]# tar -zxf hbase-1.2.4-bin.tar.gz -C /usr/ [root@centos ~]# vi /usr/hbase-1.2.4/conf/hbase-site.xml

    <property> <name>hbase.rootdir</name> <value>hdfs://CentOS:9000/hbase</value> </property> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <property> <name>hbase.zookeeper.quorum</name> <value>CentOS</value> </property> <property> <name>hbase.zookeeper.property.clientPort</name> <value>2181</value> </property> [root@centos ~]# vi /usr/hbase-1.2.4/conf/regionservers

    centos

    [root@centos ~]# vi .bashrc

    HBASE_MANAGES_ZK=false HBASE_HOME=/usr/hbase-1.2.4 HADOOP_HOME=/usr/hadoop-2.6.0 JAVA_HOME=/usr/java/latest CLASSPATH=. PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HBASE_HOME/bin export JAVA_HOME export CLASSPATH export PATH export HADOOP_HOME export HBASE_HOME export HBASE_MANAGES_ZK

  • 啓動HBase

    [root@centos ~]# start-hbase.sh [root@centos ~]# jps 1612 SecondaryNameNode 2102 HRegionServer //負責實際表數據的讀寫操做 1348 NameNode 2365 Jps 1978 HMaster //相似namenode管理表相關元數據、管理ResgionServer 1742 QuorumPeerMain 1437 DataNode

能夠訪問:http://centos:16010

HBase Shell命令

  • 鏈接Hbase

    [root@centos ~]# hbase shell SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/hbase-1.2.4/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] HBase Shell; enter 'help<RETURN>' for list of supported commands. Type "exit<RETURN>" to leave the HBase Shell Version 1.2.4, rUnknown, Wed Feb 15 18:58:00 CST 2017

    hbase(main):001:0>

  • 查看系統狀態

    hbase(main):001:0> status 1 active master, 0 backup masters, 1 servers, 0 dead, 2.0000 average load

  • 查看當前系統版本

    hbase(main):006:0> version 1.2.4, rUnknown, Wed Feb 15 18:58:00 CST 2017

namespace操做(數據庫)

  • 查看系統數據庫

    hbase(main):003:0> list_namespace NAMESPACE default
    hbase

  • 建立namespace

    hbase(main):006:0> create_namespace 'baizhi',{'author'=>'zs'} 0 row(s) in 0.3260 seconds

  • 查看namespace的表

    hbase(main):004:0> list_namespace_tables 'hbase' TABLE
    meta
    namespace

  • 查看建庫詳情

    hbase(main):008:0> describe_namespace 'baizhi' DESCRIPTION
    {NAME => 'baizhi', author => 'zs'} 1 row(s) in 0.0550 seconds

  • 修改namespace

    hbase(main):010:0> alter_namespace 'baizhi',{METHOD => 'set','author'=> 'wangwu'} 0 row(s) in 0.2520 seconds hbase(main):011:0> describe_namespace 'baizhi' DESCRIPTION
    {NAME => 'baizhi', author => 'wangwu'} 1 row(s) in 0.0030 seconds hbase(main):012:0> alter_namespace 'baizhi',{METHOD => 'unset',NAME => 'author'} 0 row(s) in 0.0550 seconds hbase(main):013:0> describe_namespace 'baizhi' DESCRIPTION
    {NAME => 'baizhi'}
    1 row(s) in 0.0080 seconds

  • 刪除namespace

    hbase(main):020:0> drop_namespace 'baizhi' 0 row(s) in 0.0730 seconds

HBase不容許刪除有表的數據庫

table相關操做(DDL操做)

  • 建立表

    hbase(main):023:0> create 't_user','cf1','cf2' 0 row(s) in 1.2880 seconds

    => Hbase::Table - t_user hbase(main):024:0> create 'baizhi:t_user',{NAME=>'cf1',VERSIONS=>3},{NAME=>'cf2',TTL=>3600} 0 row(s) in 1.2610 seconds

    => Hbase::Table - baizhi:t_user

  • 查看建表詳情

    hbase(main):026:0> describe 'baizhi:t_user' Table baizhi:t_user is ENABLED
    baizhi:t_user
    COLUMN FAMILIES DESCRIPTION
    {NAME => 'cf1', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '3', COMPRESSION =

    'NONE', MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', BLOCKSIZE => '65536', IN_MEMORY => 'false', B LOCKCACHE => 'true'}
    {NAME => 'cf2', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION = 'NONE', MIN_VERSIONS => '0', TTL => '3600 SECONDS (1 HOUR)', KEEP_DELETED_CELLS => 'FALSE', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}
    2 row(s) in 0.0240 seconds

  • 判斷表是否存在

    hbase(main):030:0> exists 't_user' Table t_user does exist
    0 row(s) in 0.0250 seconds

  • enable/is_enabled/enable_all (相似disable、disable_all、is_disabled)

    hbase(main):036:0> enable 't_user' 0 row(s) in 0.0220 seconds

    hbase(main):037:0> is_enabled 't_user' true
    0 row(s) in 0.0090 seconds hbase(main):035:0> enable_all 't_.*' t_user
    Enable the above 1 tables (y/n)? y 1 tables successfully enabled

  • drop表

    hbase(main):038:0> disable 't_user' 0 row(s) in 2.2930 seconds

    hbase(main):039:0> drop 't_user' 0 row(s) in 1.2670 seconds

  • 展現全部用戶表(沒法查看系統表hbase下的表)

    hbase(main):042:0> list 'baizhi:.*' TABLE
    baizhi:t_user
    1 row(s) in 0.0050 seconds

    => ["baizhi:t_user"] hbase(main):043:0> list TABLE
    baizhi:t_user
    1 row(s) in 0.0050 seconds

  • 獲取一個表的引用

    hbase(main):002:0> t=get_table 'baizhi:t_user' 0 row(s) in 0.0440 seconds

  • 修改表參數

    hbase(main):008:0> alter 'baizhi:t_user',{ NAME => 'cf2', TTL => 60 } Updating all regions with the new schema... 1/1 regions updated. Done. 0 row(s) in 2.7000 seconds


表的DML操做

  • Put指令

    hbase(main):010:0> put 'baizhi:t_user',1,'cf1:name','zhangsan' 0 row(s) in 0.2060 seconds

    hbase(main):011:0> t = get_table 'baizhi:t_user' 0 row(s) in 0.0010 seconds

    hbase(main):012:0> t.put 1,'cf1:age','18' 0 row(s) in 0.0500 seconds

  • Get指令

    hbase(main):017:0> get 'baizhi:t_user',1 COLUMN CELL
    cf1:age timestamp=1536996547967, value=21
    cf1:name timestamp=1536996337398, value=zhangsan
    2 row(s) in 0.0680 seconds hbase(main):019:0> get 'baizhi:t_user',1,{COLUMN =>'cf1', VERSIONS=>10} COLUMN CELL
    cf1:age timestamp=1536996547967, value=21 cf1:age timestamp=1536996542980, value=20 cf1:age timestamp=1536996375890, value=18 cf1:name timestamp=1536996337398, value=zhangsan 4 row(s) in 0.0440 seconds

    hbase(main):020:0> get 'baizhi:t_user',1,{COLUMN =>'cf1:age', VERSIONS=>10} COLUMN CELL
    cf1:age timestamp=1536996547967, value=21 cf1:age timestamp=1536996542980, value=20 cf1:age timestamp=1536996375890, value=18
    3 row(s) in 0.0760 seconds

    hbase(main):021:0> get 'baizhi:t_user',1,{COLUMN =>'cf1:age', TIMESTAMP => 1536996542980 } COLUMN CELL cf1:age timestamp=1536996542980, value=20
    1 row(s) in 0.0260 seconds

    hbase(main):025:0> get 'baizhi:t_user',1,{TIMERANGE => [1536996375890,1536996547967]} COLUMN CELL
    cf1:age timestamp=1536996542980, value=20 1 row(s) in 0.0480 seconds

    hbase(main):026:0> get 'baizhi:t_user',1,{TIMERANGE => [1536996375890,1536996547967],VERSIONS=>10} COLUMN CELL cf1:age timestamp=1536996542980, value=20 cf1:age timestamp=1536996375890, value=18 2 row(s) in 0.0160 seconds

  • scan

    hbase(main):004:0> scan 'baizhi:t_user' ROW COLUMN+CELL
    1 column=cf1:age, timestamp=1536996547967, value=21 1 column=cf1:height, timestamp=1536997284682, value=170 1 column=cf1:name, timestamp=1536996337398, value=zhangsan 1 column=cf1:salary, timestamp=1536997158586, value=15000
    1 column=cf1:weight, timestamp=1536997311001, value=\x00\x00\x00\x00\x00\x00\x00\x05
    2 column=cf1:age, timestamp=1536997566506, value=18 2 column=cf1:name, timestamp=1536997556491, value=lisi
    2 row(s) in 0.0470 seconds hbase(main):009:0> scan 'baizhi:t_user', {STARTROW => '1',LIMIT=>1} ROW COLUMN+CELL 1 column=cf1:age, timestamp=1536996547967, value=21 1 column=cf1:height, timestamp=1536997284682, value=170 1 column=cf1:name, timestamp=1536996337398, value=zhangsan 1 column=cf1:salary, timestamp=1536997158586, value=15000 1 column=cf1:weight, timestamp=1536997311001, value=\x00\x00\x00\x00\x00\x00\x00\x05
    1 row(s) in 0.0280 seconds

    hbase(main):011:0> scan 'baizhi:t_user', {COLUMNS=>'cf1:age',TIMESTAMP=>1536996542980} ROW COLUMN+CELL 1 column=cf1:age, timestamp=1536996542980, value=20 1 row(s) in 0.0330 seconds

  • delete/deleteall

    hbase(main):013:0> scan 'baizhi:t_user', {COLUMNS=>'cf1:age',VERSIONS=>3} ROW COLUMN+CELL
    1 column=cf1:age, timestamp=1536996547967, value=21 1 column=cf1:age, timestamp=1536996542980, value=20 1 column=cf1:age, timestamp=1536996375890, value=18 2 column=cf1:age, timestamp=1536997566506, value=18
    2 row(s) in 0.0150 seconds

    hbase(main):014:0> delete 'baizhi:t_user',1,'cf1:age',1536996542980 0 row(s) in 0.0920 seconds

    hbase(main):015:0> scan 'baizhi:t_user', {COLUMNS=>'cf1:age',VERSIONS=>3} ROW COLUMN+CELL 1 column=cf1:age, timestamp=1536996547967, value=21 2 column=cf1:age, timestamp=1536997566506, value=18
    2 row(s) in 0.0140 seconds

    hbase(main):016:0> delete 'baizhi:t_user',1,'cf1:age' 0 row(s) in 0.0170 seconds

    hbase(main):017:0> scan 'baizhi:t_user', {COLUMNS=>'cf1:age',VERSIONS=>3} ROW COLUMN+CELL
    2 column=cf1:age, timestamp=1536997566506, value=18
    1 row(s) in 0.0170 seconds

    hbase(main):019:0> deleteall 'baizhi:t_user',1 0 row(s) in 0.0200 seconds

    hbase(main):020:0> get 'baizhi:t_user',1 COLUMN CELL 0 row(s) in 0.0200 seconds

  • truncate

    hbase(main):022:0> truncate 'baizhi:t_user' Truncating 'baizhi:t_user' table (it may take a while):

    • Disabling table...
    • Truncating table... 0 row(s) in 4.0040 seconds

HBase java API

  • maven依賴

    <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.4</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.2.4</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-protocol</artifactId> <version>1.2.4</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.2.4</version> </dependency>

  • 建立Connection和Admin對象

    private Connection conn; private Admin admin; @Before public void before() throws IOException { Configuration config= HBaseConfiguration.create(); //由於HMaster和HRegionServer都將信息註冊在zookeeper中 config.set("hbase.zookeeper.quorum","centos"); conn= ConnectionFactory.createConnection(config);

    admin=conn.getAdmin();

    } @After public void after() throws IOException { admin.close(); conn.close(); }

  • 建立Namespace

    NamespaceDescriptor nd=NamespaceDescriptor.create("zpark") .addConfiguration("author","zhangsan") .build(); admin.createNamespace(nd);

  • 建立表

    //create 'zpark:t_user',{NAME=>'cf1',VERIONS=>3},,{NAME=>'cf2',TTL=>10} TableName tname=TableName.valueOf("zpark:t_user"); //建立表的描述 HTableDescriptor t_user=new HTableDescriptor(tname);

    //構建列簇 HColumnDescriptor cf1=new HColumnDescriptor("cf1"); cf1.setMaxVersions(3);

    HColumnDescriptor cf2=new HColumnDescriptor("cf2"); cf2.setTimeToLive(10);

    //添加列簇 t_user.addFamily(cf1); t_user.addFamily(cf2);

    admin.createTable(t_user);

  • 插入數據

    TableName tname=TableName.valueOf("zpark:t_user"); Table t_user = conn.getTable(tname);

    String[] company={"www.baizhi.com","www.sina.com"}; for(int i=0;i<1000;i++){ String com=company[new Random().nextInt(2)]; String rowKey=com; if(i<10){ rowKey+=":00"+i; }else if(i<100){ rowKey+=":0"+i; }else if(i<1000){ rowKey+=":"+i; } Put put=new Put(rowKey.getBytes()); put.addColumn("cf1".getBytes(),"name".getBytes(),("user"+i).getBytes()); put.addColumn("cf1".getBytes(),"age".getBytes(), Bytes.toBytes(i)); put.addColumn("cf1".getBytes(),"salary".getBytes(),Bytes.toBytes(5000+1000*i)); put.addColumn("cf1".getBytes(),"company".getBytes(),com.getBytes());

    t_user.put(put);

    } t_user.close();

  • 批量插入

    TableName tname=TableName.valueOf("zpark:t_user"); String[] company={"www.baizhi.com","www.sina.com"}; BufferedMutator mutator=conn.getBufferedMutator(tname); for(int i=0;i<1000;i++){ String com=company[new Random().nextInt(2)]; String rowKey=com; if(i<10){ rowKey+=":00"+i; }else if(i<100){ rowKey+=":0"+i; }else if(i<1000){ rowKey+=":"+i; } Put put=new Put(rowKey.getBytes()); put.addColumn("cf1".getBytes(),"name".getBytes(),("user"+i).getBytes()); put.addColumn("cf1".getBytes(),"age".getBytes(), Bytes.toBytes(i)); put.addColumn("cf1".getBytes(),"salary".getBytes(),Bytes.toBytes(5000+1000*i)); put.addColumn("cf1".getBytes(),"company".getBytes(),com.getBytes()); mutator.mutate(put); } mutator.close(); mutator.close();

  • 修改數據

    TableName tname=TableName.valueOf("zpark:t_user"); Table t_user = conn.getTable(tname);

    Put put=new Put("www.baizhi.com:000".getBytes()); put.addColumn("cf1".getBytes(),"name".getBytes(),("zhangsan").getBytes());

    t_user.put(put); t_user.close();

  • 查詢一條記錄

    TableName tname=TableName.valueOf("zpark:t_user"); Table t_user = conn.getTable(tname);

    Get get=new Get("www.sina.com:002".getBytes()); //表示一行數據,涵蓋n個cell Result result = t_user.get(get);

    String name = Bytes.toString(result.getValue("cf1".getBytes(), "name".getBytes())); Integer age = Bytes.toInt(result.getValue("cf1".getBytes(), "age".getBytes())); Integer salary = Bytes.toInt(result.getValue("cf1".getBytes(), "salary".getBytes())); System.out.println(name+","+age+","+salary);

    t_user.close();

  • 查詢多條

    TableName tname=TableName.valueOf("zpark:t_user"); Table t_user = conn.getTable(tname);

    Scan scan=new Scan(); // scan.setStartRow("www.baizhi.com:000".getBytes()); // scan.setStopRow("www.taizhi.com:020".getBytes()); Filter filter1=new PrefixFilter("www.baizhi.com:00".getBytes()); Filter filter2=new PrefixFilter("www.sina.com:00".getBytes()); FilterList filter=new FilterList(FilterList.Operator.MUST_PASS_ONE,filter1,filter2); scan.setFilter(filter);

    ResultScanner resultScanner = t_user.getScanner(scan); for (Result result : resultScanner) { String rowKey=Bytes.toString(result.getRow()); String name = Bytes.toString(result.getValue("cf1".getBytes(), "name".getBytes())); Integer age = Bytes.toInt(result.getValue("cf1".getBytes(), "age".getBytes())); Integer salary = Bytes.toInt(result.getValue("cf1".getBytes(), "salary".getBytes())); System.out.println(rowKey+" => "+ name+","+age+","+salary); } t_user.close();

  • 查詢多條+Filter

    TableName tname=TableName.valueOf("zpark:t_user"); Table t_user = conn.getTable(tname);

    Scan scan=new Scan(); // scan.setStartRow("www.baizhi.com:000".getBytes()); // scan.setStopRow("www.taizhi.com:020".getBytes()); Filter filter1=new PrefixFilter("www.baizhi.com:00".getBytes()); Filter filter2=new PrefixFilter("www.sina.com:00".getBytes()); FilterList filter=new FilterList(FilterList.Operator.MUST_PASS_ALL,filter1,filter2); scan.setFilter(filter);

    ResultScanner resultScanner = t_user.getScanner(scan); for (Result result : resultScanner) { String rowKey=Bytes.toString(result.getRow()); String name = Bytes.toString(result.getValue("cf1".getBytes(), "name".getBytes())); Integer age = Bytes.toInt(result.getValue("cf1".getBytes(), "age".getBytes())); Integer salary = Bytes.toInt(result.getValue("cf1".getBytes(), "salary".getBytes())); System.out.println(rowKey+" => "+ name+","+age+","+salary); } t_user.close();

HBase和MapReduce集成

public class CustomJobSubmiter extends Configured implements Tool {
    public int run(String[] args) throws Exception {

        //建立Job
        Configuration config = HBaseConfiguration.create(getConf());
        Job job = Job.getInstance(config);
        job.setJarByClass(CustomJobSubmiter.class);     // class that contains mapper

        //設置輸入、輸出格式
        job.setInputFormatClass(TableInputFormat.class);
        job.setOutputFormatClass(TableOutputFormat.class);

        TableMapReduceUtil.initTableMapperJob(
                "zpark:t_user",
                new Scan(),
                UserMapper.class,
                Text.class,
                DoubleWritable.class,
                job
        );
        TableMapReduceUtil.initTableReducerJob(
                "zpark:t_user_count",
                UserReducer.class,
                job
        );
        job.setCombinerClass(UserCombiner.class);
        job.waitForCompletion(true);
        
        
        return 0;
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new CustomJobSubmiter(),args);
    }
    public static class UserMapper extends TableMapper<Text, CountWritable>{
        private Text k=new Text();
        private DoubleWritable v=new DoubleWritable();
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            String company= Bytes.toString(value.getValue("cf1".getBytes(),"company".getBytes()));
            Double salary= Bytes.toInt(value.getValue("cf1".getBytes(),"salary".getBytes()))*1.0;
            k.set(company);
            v.set(salary);
            context.write(k,new CountWritable(1,salary,salary,salary));
        }
    }
    public static class UserCombiner extends Reducer<Text, CountWritable,Text, CountWritable>{
        @Override
        protected void reduce(Text key, Iterable<CountWritable> values, Context context) throws IOException, InterruptedException {
            int total=0;
            double tatalSalary=0.0;
            double avgSalary=0.0;
            double maxSalary=0.0;
            double minSalary=Integer.MAX_VALUE;
            for (CountWritable value : values) {
                tatalSalary+=value.getTatalSalary();
                total+=value.getTotal();
                if(minSalary>value.getMinSalary()){
                    minSalary=value.getMinSalary();
                }
                if(maxSalary<value.getMaxSalary()){
                    maxSalary=value.getMaxSalary();
                }
            }
            
            context.write(key,new CountWritable(total,tatalSalary,maxSalary,minSalary));
        }
    }
    public static class UserReducer extends TableReducer<Text, CountWritable, NullWritable>{
        @Override
        protected void reduce(Text key, Iterable<CountWritable> values, Context context) throws IOException, InterruptedException {
            int total=0;
            double tatalSalary=0.0;
            double avgSalary=0.0;
            double maxSalary=0.0;
            double minSalary=Integer.MAX_VALUE;
            for (CountWritable value : values) {
                tatalSalary+=value.getTatalSalary();
                total+=value.getTotal();
                if(minSalary>value.getMinSalary()){
                    minSalary=value.getMinSalary();
                }
                if(maxSalary<value.getMaxSalary()){
                    maxSalary=value.getMaxSalary();
                }
            }
            avgSalary=tatalSalary/total;

            Put put=new Put(key.getBytes());
            put.addColumn("cf1".getBytes(),"taotal".getBytes(),(total+"").getBytes());
            put.addColumn("cf1".getBytes(),"tatalSalary".getBytes(),(tatalSalary+"").getBytes());
            put.addColumn("cf1".getBytes(),"maxSalary".getBytes(),(maxSalary+"").getBytes());
            put.addColumn("cf1".getBytes(),"minSalary".getBytes(),(minSalary+"").getBytes());
            put.addColumn("cf1".getBytes(),"avgSalary".getBytes(),(avgSalary+"").getBytes());

            context.write(null,put);

        }
    }
}

public class CountWritable implements Writable {
    int total=0;
    double tatalSalary=0.0;
    double maxSalary=0.0;
    double minSalary=Integer.MAX_VALUE;

    public CountWritable(int total, double tatalSalary, double maxSalary, double minSalary) {
        this.total = total;
        this.tatalSalary = tatalSalary;
        this.maxSalary = maxSalary;
        this.minSalary = minSalary;
    }

    public CountWritable() {
    }

    public void write(DataOutput out) throws IOException {
        out.writeInt(total);
        out.writeDouble(tatalSalary);
        out.writeDouble(maxSalary);
        out.writeDouble(minSalary);
    }

    public void readFields(DataInput in) throws IOException {
        total=in.readInt();
        tatalSalary=in.readDouble();
        maxSalary=in.readDouble();
        minSalary=in.readDouble();
    }
    //....
}

HBase架構

HBase宏觀架構

RegionServer架構

Region架構圖

參考:http://www.blogjava.net/DLevin/archive/2015/08/22/426877.html

http://www.blogjava.net/DLevin/archive/2015/08/22/426950.html

HBase集羣構建

  • 確保HDFS正常運行(HDFS - HA)

  • 配置安裝HBase

    [root@CentOSX ~]# tar -zxf hbase-1.2.4-bin.tar.gz -C /usr/ [root@CentOSX ~]# vi /usr/hbase-1.2.4/conf/hbase-site.xml

    <property> <name>hbase.rootdir</name> <value>hdfs://mycluster/hbase</value> </property> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <property> <name>hbase.zookeeper.quorum</name> <value>CentOSA,CentOSB,CentOSC</value> </property> <property> <name>hbase.zookeeper.property.clientPort</name> <value>2181</value> </property> [root@centos ~]# vi /usr/hbase-1.2.4/conf/regionservers CentOSA CentOSB CentOSC [root@CentOSX ~]# vi .bashrc HBASE_MANAGES_ZK=false HBASE_HOME=/usr/hbase-1.2.4 HADOOP_HOME=/usr/hadoop-2.6.0 JAVA_HOME=/usr/java/latest CLASSPATH=. PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HBASE_HOME/bin export JAVA_HOME export CLASSPATH export PATH export HADOOP_HOME export HBASE_HOME export HBASE_MANAGES_ZK

    [root@CentOSX ~]# source .bashrc

  • 啓動HBase [root@CentOSX ~]# hbase-daemon.sh start master [root@CentOSX ~]# hbase-daemon.sh start regionserver

相關文章
相關標籤/搜索