Hbase 實戰之穀粒微博

Hbase 實戰之穀粒微博

 

1 需求分析

1) 微博內容的瀏覽,數據庫表設計
2) 用戶社交體現:關注用戶,取關用戶
3) 拉取關注的人的微博內容
 
 

 

2 代碼實現

2.1 代碼設計總覽:

1) 建立命名空間以及表名的定義
2) 建立微博內容表
3) 建立用戶關係表
4) 建立用戶微博內容接收郵件表
5) 發佈微博內容
6) 添加關注用戶
7) 移除(取關)用戶
8) 獲取關注的人的微博內容
9) 測試
 

 2.2 com.atlxl.weibo.Constant 代碼

package com.atlxl.weibo;

public class Constant {


    //命名空間
    public static final String NAMESPACE = "weibo";

    //內容表
    public static final String CONTENT = "weibo:content";

    //用戶關係表
    public static final String RELATIONS = "weibo:relations";

    //收件箱表
    public static final String INBOX = "weibo:inbox";


}

 

 

2.3 com.atlxl.weibo.WeiBoUtil 代碼

package com.atlxl.weibo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class WeiBoUtil {

    private  static Configuration configuration = HBaseConfiguration.create();
    static {
        configuration.set("hbase.zookeeper.quorum", "192.168.192.102" );
    }

    //建立命名空間
    public static void createNamespace(String ns) throws IOException {

        //建立鏈接
        Connection connection = ConnectionFactory.createConnection(configuration);
        Admin admin = connection.getAdmin();

        //建立NS描述器
        NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(ns).build();

        //建立操做
        admin.createNamespace(namespaceDescriptor);

        //關閉資源
        admin.close();
        connection.close();
    }


    //建立表
    public static void createTable(String tableName, int versions, String... cfs) throws IOException {

        //建立鏈接
        Connection connection = ConnectionFactory.createConnection(configuration);
        Admin admin = connection.getAdmin();

        //建立表描述器
        HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));

        //循環添加列族
        for (String cf : cfs) {
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cf);
            hColumnDescriptor.setMaxVersions(versions);
            hTableDescriptor.addFamily(hColumnDescriptor);
        }

        //建立表操做
        admin.createTable(hTableDescriptor);

        //關閉資源
        admin.close();
        connection.close();

    }


    /**
     * 1.更新微博內容表數據
     * 2.更新收件箱表的數據
     *      --獲取當前操做人的fans
     *      --去往收件箱表依次更新數據
     *
     *
     * @param uid
     * @param content
     * @throws IOException
     */
    //發佈微博
    public static void createDate(String uid, String content) throws IOException {

        //獲取鏈接
        Connection connection = ConnectionFactory.createConnection(configuration);

        //獲取三張操做表的對象
        Table contTable = connection.getTable(TableName.valueOf(Constant.CONTENT));
        Table relaTable = connection.getTable(TableName.valueOf(Constant.RELATIONS));
        Table inboxTable = connection.getTable(TableName.valueOf(Constant.INBOX));

        //拼接RK
        long ts = System.currentTimeMillis();
        String rowKey = uid + "_" + ts;

        //生成Put對象
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("content"), Bytes.toBytes(content));

        //往內容表添加數據
        contTable.put(put);

        //獲取關係表中的fans
        Get get = new Get(Bytes.toBytes(uid));
        get.addFamily(Bytes.toBytes("fans"));
        Result result = relaTable.get(get);
        Cell[] cells = result.rawCells();

        if (cells.length <= 0) {
            return;
        }
        //更新fans收件箱表
        List<Put> puts = new ArrayList<Put>();
        for (Cell cell : cells) {
            byte[] cloneQualifier = CellUtil.cloneQualifier(cell);
            Put inboxPut = new Put(cloneQualifier);
            inboxPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(uid), ts, Bytes.toBytes(rowKey));
            puts.add(inboxPut);

        }
        inboxTable.put(puts);

        //關閉資源
        inboxTable.close();
        relaTable.close();
        contTable.close();

        connection.close();

    }


    /**
     * 1.在用戶關係表
     *      --添加操做人的attends
     *      --添加被操做人的fans
     * 2.在收件箱中
     *      --在微博內容表中獲取被關注者的3條數據(rowkey)
     *      --在收件箱表中添加操做人的關注者信息
     *
     * @param uid
     * @param uids
     */
    //關注用戶
    public static void addAttend(String uid, String... uids) throws IOException {

        //獲取鏈接
        Connection connection = ConnectionFactory.createConnection(configuration);

        //獲取三張操做表的對象
        Table contTable = connection.getTable(TableName.valueOf(Constant.CONTENT));
        Table relaTable = connection.getTable(TableName.valueOf(Constant.RELATIONS));
        Table inboxTable = connection.getTable(TableName.valueOf(Constant.INBOX));

        //建立操做者的Put對象
        Put relaPut = new Put(Bytes.toBytes(uid));

        ArrayList<Put> puts = new ArrayList<Put>();

        for (String s : uids) {
            relaPut.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(s), Bytes.toBytes(s));

            //建立被關注者的Put對象
            Put fansPut = new Put(Bytes.toBytes(s));
            fansPut.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(uid));
            puts.add(fansPut); //添加被操做人的fans
        }

        puts.add(relaPut); //添加操做人的attends

        relaTable.put(puts);


        Put inboxPut = new Put(Bytes.toBytes(uid));
        //獲取內容表中被關注者的rowkey
        for (String s : uids) {
            Scan scan = new Scan(Bytes.toBytes(s),Bytes.toBytes(s + "|"));
            ResultScanner results = contTable.getScanner(scan);

            for (Result result : results) {
                String rowKey = Bytes.toString(result.getRow());
                String[] split = rowKey.split("_");
                byte[] row = result.getRow();
                inboxPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(s), Long.parseLong(split[1]), row);

            }

        }

        inboxTable.put(inboxPut);

        inboxTable.close();
        relaTable.close();
        contTable.close();

        connection.close();

    }


    /**
     * 1.用戶關係表
     *      --刪除操做者關注列族的待取關用戶
     *      --刪除待取關用戶fans列族的操做者
     *
     * 2.收件箱表
     *      --刪除操做者的待取關用戶信息
     *
     */
    //取關用戶
    public static void delAttend(String uid, String... uids) throws IOException {

        //獲取鏈接
        Connection connection = ConnectionFactory.createConnection(configuration);

        //獲取表對象
        Table relaTable = connection.getTable(TableName.valueOf(Constant.RELATIONS));
        Table inboxTable = connection.getTable(TableName.valueOf(Constant.INBOX));

        //建立操做者的刪除對象
        Delete relaDel = new Delete(Bytes.toBytes(uid));

        ArrayList<Delete> deletes = new ArrayList<Delete>();

        for (String s : uids) {

            //建立被取關者刪除對象
            Delete fansDel = new Delete(Bytes.toBytes(s));
            fansDel.addColumns(Bytes.toBytes("fans"), Bytes.toBytes(uid));
            deletes.add(fansDel);

            relaDel.addColumns(Bytes.toBytes("attends"), Bytes.toBytes(s));
        }
        deletes.add(relaDel);

        //執行刪除操做
        relaTable.delete(deletes);


        //刪除收件箱表的相關內容
        Delete inboxDel = new Delete(Bytes.toBytes(uid));
        for (String s : uids) {
            inboxDel.addColumns(Bytes.toBytes("info"), Bytes.toBytes(s));
        }

        //執行收件箱表刪除操做
        inboxTable.delete(inboxDel);

        //關閉資源
        inboxTable.close();
        relaTable.close();

        connection.close();
    }


    //獲取微博內容(初始化頁面)
    public static void getInit(String uid) throws IOException {

        //獲取鏈接
        Connection connection = ConnectionFactory.createConnection(configuration);


        //獲取表對象(2個)
        Table inboxTable = connection.getTable(TableName.valueOf(Constant.INBOX));
        Table contTable = connection.getTable(TableName.valueOf(Constant.CONTENT));


        //獲取收件箱表數據
        Get get = new Get(Bytes.toBytes(uid)); //收件箱表get對象
        get.setMaxVersions(); //設置獲取最大版本的數據

        Result result = inboxTable.get(get);

        ArrayList<Get> gets = new ArrayList<Get>();

        Cell[] cells = result.rawCells();
        //遍歷返回內容並將其封裝成內容的get對象
        for (Cell cell : cells) {
            Get contGet = new Get(CellUtil.cloneValue(cell));
            gets.add(contGet);
        }


        //根據收件箱表獲取值去往內容表獲取實際微博內容
        Result[] results = contTable.get(gets);
        for (Result result1 : results) {
            Cell[] cells1 = result1.rawCells();
            //遍歷並打印
            for (Cell cell : cells1) {
                System.out.println("RK:" + Bytes.toString(CellUtil.cloneRow(cell)) + ",Content:" + Bytes.toString(CellUtil.cloneValue(cell)));
            }
        }


        //關閉資源
        inboxTable.close();
        contTable.close();

        connection.close();
    }


    //獲取微博內容(查看某我的全部內容)
    public static void getData(String uid) throws IOException {

        //獲取鏈接
        Connection connection = ConnectionFactory.createConnection(configuration);

        //獲取表對象
        Table table = connection.getTable(TableName.valueOf(Constant.CONTENT));

        //掃描(過濾器)
        Scan scan = new Scan();

//        過濾器集合
//        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
//        filterList.addFilter();

        RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(uid + "_"));

        scan.setFilter(rowFilter);
        ResultScanner results = table.getScanner(scan);

        //遍歷打印
        for (Result result : results) {
            Cell[] cells = result.rawCells();
            for (Cell cell : cells) {
                System.out.println("RK:" + Bytes.toString(CellUtil.cloneRow(cell)) + ",Content:" + Bytes.toString(CellUtil.cloneValue(cell)));
            }
        }

        //關閉資源
        table.close();

        connection.close();

    }


}

 

 

 

2.4 com.atlxl.weibo.Weibo 代碼

package com.atlxl.weibo;

import java.io.IOException;

public class Weibo {

    public static void init() throws IOException {
        //建立命名空間
        WeiBoUtil.createNamespace(Constant.NAMESPACE);

        //建立內容表
        WeiBoUtil.createTable(Constant.CONTENT, 1, "info");


        //建立用戶關係表
        WeiBoUtil.createTable(Constant.RELATIONS, 1, "attends","fans");


        //建立收件箱表(多版本)
        WeiBoUtil.createTable(Constant.INBOX, 2,"info");

    }

    public static void main(String[] args) throws IOException {

//        //測試
//        init();


        //1001,1002發佈微博
//        WeiBoUtil.createDate("1001", "今每天氣晴朗!");
//        WeiBoUtil.createDate("1002", "今每天氣下雨!");

        //1001關注1002和1003
//        WeiBoUtil.addAttend("1001", "1002","1003");


        //獲取1001初始化頁面信息
//        WeiBoUtil.getInit("1001");


        //1003發佈微博
//        WeiBoUtil.createDate("1003", "今天你過的怎麼樣!");
        WeiBoUtil.createDate("1003", "好好學習!");
        WeiBoUtil.createDate("1003", "每天向上!");

//        System.out.println("*****************************************");

        //獲取1001初始化頁面信息
        WeiBoUtil.getInit("1001");

        //取關
        WeiBoUtil.delAttend("1001", "1002");

        System.out.println("*****************************************");

        WeiBoUtil.getInit("1001");


    }

}
相關文章
相關標籤/搜索