package com.atlxl.weibo; public class Constant { //命名空間 public static final String NAMESPACE = "weibo"; //內容表 public static final String CONTENT = "weibo:content"; //用戶關係表 public static final String RELATIONS = "weibo:relations"; //收件箱表 public static final String INBOX = "weibo:inbox"; }
package com.atlxl.weibo; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SubstringComparator; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class WeiBoUtil { private static Configuration configuration = HBaseConfiguration.create(); static { configuration.set("hbase.zookeeper.quorum", "192.168.192.102" ); } //建立命名空間 public static void createNamespace(String ns) throws IOException { //建立鏈接 Connection connection = ConnectionFactory.createConnection(configuration); Admin admin = connection.getAdmin(); //建立NS描述器 NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(ns).build(); //建立操做 admin.createNamespace(namespaceDescriptor); //關閉資源 admin.close(); connection.close(); } //建立表 public static void createTable(String tableName, int versions, String... cfs) throws IOException { //建立鏈接 Connection connection = ConnectionFactory.createConnection(configuration); Admin admin = connection.getAdmin(); //建立表描述器 HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName)); //循環添加列族 for (String cf : cfs) { HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cf); hColumnDescriptor.setMaxVersions(versions); hTableDescriptor.addFamily(hColumnDescriptor); } //建立表操做 admin.createTable(hTableDescriptor); //關閉資源 admin.close(); connection.close(); } /** * 1.更新微博內容表數據 * 2.更新收件箱表的數據 * --獲取當前操做人的fans * --去往收件箱表依次更新數據 * * * @param uid * @param content * @throws IOException */ //發佈微博 public static void createDate(String uid, String content) throws IOException { //獲取鏈接 Connection connection = ConnectionFactory.createConnection(configuration); //獲取三張操做表的對象 Table contTable = connection.getTable(TableName.valueOf(Constant.CONTENT)); Table relaTable = connection.getTable(TableName.valueOf(Constant.RELATIONS)); Table inboxTable = connection.getTable(TableName.valueOf(Constant.INBOX)); //拼接RK long ts = System.currentTimeMillis(); String rowKey = uid + "_" + ts; //生成Put對象 Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("content"), Bytes.toBytes(content)); //往內容表添加數據 contTable.put(put); //獲取關係表中的fans Get get = new Get(Bytes.toBytes(uid)); get.addFamily(Bytes.toBytes("fans")); Result result = relaTable.get(get); Cell[] cells = result.rawCells(); if (cells.length <= 0) { return; } //更新fans收件箱表 List<Put> puts = new ArrayList<Put>(); for (Cell cell : cells) { byte[] cloneQualifier = CellUtil.cloneQualifier(cell); Put inboxPut = new Put(cloneQualifier); inboxPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(uid), ts, Bytes.toBytes(rowKey)); puts.add(inboxPut); } inboxTable.put(puts); //關閉資源 inboxTable.close(); relaTable.close(); contTable.close(); connection.close(); } /** * 1.在用戶關係表 * --添加操做人的attends * --添加被操做人的fans * 2.在收件箱中 * --在微博內容表中獲取被關注者的3條數據(rowkey) * --在收件箱表中添加操做人的關注者信息 * * @param uid * @param uids */ //關注用戶 public static void addAttend(String uid, String... uids) throws IOException { //獲取鏈接 Connection connection = ConnectionFactory.createConnection(configuration); //獲取三張操做表的對象 Table contTable = connection.getTable(TableName.valueOf(Constant.CONTENT)); Table relaTable = connection.getTable(TableName.valueOf(Constant.RELATIONS)); Table inboxTable = connection.getTable(TableName.valueOf(Constant.INBOX)); //建立操做者的Put對象 Put relaPut = new Put(Bytes.toBytes(uid)); ArrayList<Put> puts = new ArrayList<Put>(); for (String s : uids) { relaPut.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(s), Bytes.toBytes(s)); //建立被關注者的Put對象 Put fansPut = new Put(Bytes.toBytes(s)); fansPut.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(uid)); puts.add(fansPut); //添加被操做人的fans } puts.add(relaPut); //添加操做人的attends relaTable.put(puts); Put inboxPut = new Put(Bytes.toBytes(uid)); //獲取內容表中被關注者的rowkey for (String s : uids) { Scan scan = new Scan(Bytes.toBytes(s),Bytes.toBytes(s + "|")); ResultScanner results = contTable.getScanner(scan); for (Result result : results) { String rowKey = Bytes.toString(result.getRow()); String[] split = rowKey.split("_"); byte[] row = result.getRow(); inboxPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(s), Long.parseLong(split[1]), row); } } inboxTable.put(inboxPut); inboxTable.close(); relaTable.close(); contTable.close(); connection.close(); } /** * 1.用戶關係表 * --刪除操做者關注列族的待取關用戶 * --刪除待取關用戶fans列族的操做者 * * 2.收件箱表 * --刪除操做者的待取關用戶信息 * */ //取關用戶 public static void delAttend(String uid, String... uids) throws IOException { //獲取鏈接 Connection connection = ConnectionFactory.createConnection(configuration); //獲取表對象 Table relaTable = connection.getTable(TableName.valueOf(Constant.RELATIONS)); Table inboxTable = connection.getTable(TableName.valueOf(Constant.INBOX)); //建立操做者的刪除對象 Delete relaDel = new Delete(Bytes.toBytes(uid)); ArrayList<Delete> deletes = new ArrayList<Delete>(); for (String s : uids) { //建立被取關者刪除對象 Delete fansDel = new Delete(Bytes.toBytes(s)); fansDel.addColumns(Bytes.toBytes("fans"), Bytes.toBytes(uid)); deletes.add(fansDel); relaDel.addColumns(Bytes.toBytes("attends"), Bytes.toBytes(s)); } deletes.add(relaDel); //執行刪除操做 relaTable.delete(deletes); //刪除收件箱表的相關內容 Delete inboxDel = new Delete(Bytes.toBytes(uid)); for (String s : uids) { inboxDel.addColumns(Bytes.toBytes("info"), Bytes.toBytes(s)); } //執行收件箱表刪除操做 inboxTable.delete(inboxDel); //關閉資源 inboxTable.close(); relaTable.close(); connection.close(); } //獲取微博內容(初始化頁面) public static void getInit(String uid) throws IOException { //獲取鏈接 Connection connection = ConnectionFactory.createConnection(configuration); //獲取表對象(2個) Table inboxTable = connection.getTable(TableName.valueOf(Constant.INBOX)); Table contTable = connection.getTable(TableName.valueOf(Constant.CONTENT)); //獲取收件箱表數據 Get get = new Get(Bytes.toBytes(uid)); //收件箱表get對象 get.setMaxVersions(); //設置獲取最大版本的數據 Result result = inboxTable.get(get); ArrayList<Get> gets = new ArrayList<Get>(); Cell[] cells = result.rawCells(); //遍歷返回內容並將其封裝成內容的get對象 for (Cell cell : cells) { Get contGet = new Get(CellUtil.cloneValue(cell)); gets.add(contGet); } //根據收件箱表獲取值去往內容表獲取實際微博內容 Result[] results = contTable.get(gets); for (Result result1 : results) { Cell[] cells1 = result1.rawCells(); //遍歷並打印 for (Cell cell : cells1) { System.out.println("RK:" + Bytes.toString(CellUtil.cloneRow(cell)) + ",Content:" + Bytes.toString(CellUtil.cloneValue(cell))); } } //關閉資源 inboxTable.close(); contTable.close(); connection.close(); } //獲取微博內容(查看某我的全部內容) public static void getData(String uid) throws IOException { //獲取鏈接 Connection connection = ConnectionFactory.createConnection(configuration); //獲取表對象 Table table = connection.getTable(TableName.valueOf(Constant.CONTENT)); //掃描(過濾器) Scan scan = new Scan(); // 過濾器集合 // FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); // filterList.addFilter(); RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(uid + "_")); scan.setFilter(rowFilter); ResultScanner results = table.getScanner(scan); //遍歷打印 for (Result result : results) { Cell[] cells = result.rawCells(); for (Cell cell : cells) { System.out.println("RK:" + Bytes.toString(CellUtil.cloneRow(cell)) + ",Content:" + Bytes.toString(CellUtil.cloneValue(cell))); } } //關閉資源 table.close(); connection.close(); } }
package com.atlxl.weibo; import java.io.IOException; public class Weibo { public static void init() throws IOException { //建立命名空間 WeiBoUtil.createNamespace(Constant.NAMESPACE); //建立內容表 WeiBoUtil.createTable(Constant.CONTENT, 1, "info"); //建立用戶關係表 WeiBoUtil.createTable(Constant.RELATIONS, 1, "attends","fans"); //建立收件箱表(多版本) WeiBoUtil.createTable(Constant.INBOX, 2,"info"); } public static void main(String[] args) throws IOException { // //測試 // init(); //1001,1002發佈微博 // WeiBoUtil.createDate("1001", "今每天氣晴朗!"); // WeiBoUtil.createDate("1002", "今每天氣下雨!"); //1001關注1002和1003 // WeiBoUtil.addAttend("1001", "1002","1003"); //獲取1001初始化頁面信息 // WeiBoUtil.getInit("1001"); //1003發佈微博 // WeiBoUtil.createDate("1003", "今天你過的怎麼樣!"); WeiBoUtil.createDate("1003", "好好學習!"); WeiBoUtil.createDate("1003", "每天向上!"); // System.out.println("*****************************************"); //獲取1001初始化頁面信息 WeiBoUtil.getInit("1001"); //取關 WeiBoUtil.delAttend("1001", "1002"); System.out.println("*****************************************"); WeiBoUtil.getInit("1001"); } }