在此以前咱們使用Mysql做爲數據源,但發現這數據增加速度太快,而且因爲種種緣由,所以必須使用HBase,因此咱們要把Mysql表裏面的數據遷移到HBase中,在這裏我就不講解、不爭論爲何要使用HBase,HBase是什麼了,喜歡的就認真看下去,總有些地方是有用的 java
咱們要作的3大步驟: mysql
新建HBase表格。 web
把MYSQL數據遷移到HBase中。 sql
在Java Web項目中讀取HBase的數據。 數據庫
先介紹一下必要的一些環境: apache
HBase的版本:0.98.8-hadoop2 windows
所需的依賴包: api
commons-codec-1.7.jar commons-collections-3.2.1.jar commons-configuration-1.6.jar commons-lang-2.6.jar commons-logging-1.1.3.jar guava-12.0.1.jar hadoop-auth-2.5.0.jar hadoop-common-2.5.0.jar hbase-client-0.98.8-hadoop2.jar hbase-common-0.98.8-hadoop2.jar hbase-protocol-0.98.8-hadoop2.jar htrace-core-2.04.jar jackson-core-asl-1.9.13.jar jackson-mapper-asl-1.9.13.jar log4j-1.2.17.jar mysql-connector-java-5.1.7-bin.jar netty-3.6.6.Final.jar protobuf-java-2.5.0.jar slf4j-api-1.7.5.jar slf4j-log4j12-1.7.5.jar zookeeper-3.4.6.jar
若是在你的web項目中有些包已經存在,保留其中一個就行了,省得報奇怪的錯誤就麻煩了。 app
步驟1:建表 分佈式
在此以前,我在Mysql中的業務數據表一共有6個,其結構重複性過高了,首先看看我在HBase裏面的表結構:
表名 | kpi | |||||||||||||||
key | fid+tid+date | |||||||||||||||
簇(family) | base | gpower | userate | consum | time | |||||||||||
描述 | 基礎信息 | 發電量相關指標 | 可利用率 | 自耗電量 | 累計運行小時數 | 檢修小時數 | 利用小時數 | |||||||||
列(qualifier) | fid | tid | date | power | windspeed | unpower | theory | coup | time | power | num | cpower | gpower | runtime | checktime | usetime |
描述 | 風場ID | 風機號 | 日期 | 發電量 | 風速 | 棄風電量 | 理論電量 | 耦合度 | 故障時間 | 故障損失電量 | 故障臺次 | 當天自耗電量 | 當天發電量 | 當天併網秒數 | 當天檢修秒數 | 當天利用秒數 |
這個表中咱們有5個family,其中base Family是對應6個mysql表中的key列, gpower、userate、consum分別對應一個表,time對應3個表。
這個kpi表的rowkey設計是base中的3個qualifier,分別從3個維度查詢數據,這樣的設計已經能夠知足咱們的需求了。
具體在HBase中如何建表如何搭建環境本身參考我寫的【手把手教你配置HBase徹底分佈式環境】這篇文章吧。
步驟2:把MySQL數據遷移到HBase
這時我用gpower對應的mysql表來作演示吧,其餘表的道理都同樣。(這裏可能有人會說爲何不用第三方插件直接數據庫對數據庫遷移,這裏我統一回答一下,我不會,我也不須要。)
okay,首先咱們來看看代碼先吧:
import java.io.File; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.BasicConfigurator; import org.apache.log4j.Level; import org.apache.log4j.Logger; public class GpowerTransfer{ private static final String QUOREM = "192.168.103.50,192.168.103.51,192.168.103.52,192.168.103.53,192.168.103.54,192.168.103.55,192.168.103.56,192.168.103.57,192.168.103.58,192.168.103.59,192.168.103.60";//這裏是你HBase的分佈式集羣結點,用逗號分開。 private static final String CLIENT_PORT = "2181";//端口 private static Logger log = Logger.getLogger(GpowerTransfer.class); /** * @param args */ public static void main(String[] args) { BasicConfigurator.configure(); log.setLevel(Level.DEBUG); String tableName = "kpi";//HBase表名稱 Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", QUOREM); conf.set("hbase.zookeeper.property.clientPort", CLIENT_PORT); try { File workaround = new File("."); System.getProperties().put("hadoop.home.dir", workaround.getAbsolutePath()); new File("./bin").mkdirs(); new File("./bin/winutils.exe").createNewFile();//這幾段奇怪的代碼在windows跑的時候不加有時候分報錯,在web項目中能夠不要,但單獨的java程序仍是加上去吧,知道什麼緣由的小夥伴能夠告訴我一下,不勝感激。 HBaseAdmin admin = new HBaseAdmin(conf); if(admin.tableExists(tableName)){ Class.forName("com.mysql.jdbc.Driver");//首先將mysql中的數據讀取出來,而後再插入到HBase中 String url = "jdbc:mysql://192.168.***.***:3306/midb?useUnicode=true&characterEncoding=utf-8"; String username = "********"; String password = "********"; Connection con = DriverManager.getConnection(url, username, password); PreparedStatement pstmt = con.prepareStatement("select * from kpi_gpower"); ResultSet rs = pstmt.executeQuery(); HTable table = new HTable(conf, tableName); log.debug(tableName + ":start copying data to hbase..."); List<Put> list = new ArrayList<Put>(); SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd"); String base = "base";//family名稱 String gpower = "gpower";//family名稱 String[] qbase = {"fid","tid","date"};//qualifier名稱 String[] qgpower = {"power","windspeed","unpower","theory","coup"};//qualifier名稱 while(rs.next()){ String rowKey = rs.getString("farmid") + ":" + (rs.getInt("turbineid")<10?("0"+rs.getInt("turbineid")):rs.getInt("turbineid")) + ":" + sdf.format(rs.getDate("vtime"));//拼接rowkey Put put = new Put(Bytes.toBytes(rowKey));//新建一條記錄,而後下面對相應的列進行賦值 put.add(base.getBytes(), qbase[0].getBytes(), Bytes.toBytes(rs.getString("farmid")));//base:fid put.add(base.getBytes(), qbase[1].getBytes(), Bytes.toBytes(rs.getInt("turbineid")+""));//base:tid put.add(base.getBytes(), qbase[2].getBytes(), Bytes.toBytes(rs.getDate("vtime")+""));//base:date put.add(gpower.getBytes(), qgpower[0].getBytes(), Bytes.toBytes(rs.getFloat("value")+""));//gpower:power put.add(gpower.getBytes(), qgpower[1].getBytes(), Bytes.toBytes(rs.getFloat("windspeed")+""));//gpower:windspeed put.add(gpower.getBytes(), qgpower[2].getBytes(), Bytes.toBytes(rs.getFloat("unvalue")+""));//gpower:unvalue put.add(gpower.getBytes(), qgpower[3].getBytes(), Bytes.toBytes(rs.getFloat("theory")+""));//gpower:theory put.add(gpower.getBytes(), qgpower[4].getBytes(), Bytes.toBytes(rs.getFloat("coup")+""));//gpower:coup list.add(put); } table.put(list);//這裏真正對錶進行插入操做 log.debug(tableName + ":completed data copy!"); table.close();//這裏要很是注意一點,若是你頻繁地對錶進行打開跟關閉,性能將會直線降低,可能跟集羣有關係。 }else{ admin.close(); log.error("table '" + tableName + "' not exisit!"); throw new IllegalArgumentException("table '" + tableName + "' not exisit!"); } admin.close(); } catch (Exception e) { e.printStackTrace(); } } }
在put語句進行add的時候要特別注意:對於int、float、Date等等非String類型的數據,要記得將其轉換成String類型,這裏我直接用+""解決了,不然在你讀取數據的時候就會遇到麻煩了。
步驟3:Java Web項目讀取HBase裏面的數據
ok,咱們成功地把數據遷移到HBase,咱們剩下的任務就是在Web應用中讀取數據了。
首先咱們要確保Web項目中已經把必要的Jar包添加到ClassPath了,下面我對一些HBase的鏈接作了小封裝:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HBaseAdmin; /** * @author a01513 * */ public class HBaseConnector { private static final String QUOREM = "192.168.103.50,192.168.103.51,192.168.103.52,192.168.103.53,192.168.103.54,192.168.103.55,192.168.103.56,192.168.103.57,192.168.103.58,192.168.103.59,192.168.103.60"; private static final String CLIENT_PORT = "2181"; private HBaseAdmin admin; private Configuration conf; public HBaseAdmin getHBaseAdmin(){ getConfiguration(); try { admin = new HBaseAdmin(conf); } catch (Exception e) { e.printStackTrace(); } return admin; } public Configuration getConfiguration(){ if(conf == null){ conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", QUOREM); conf.set("hbase.zookeeper.property.clientPort", CLIENT_PORT); } return conf; }
這裏的代碼基本上跟遷移的那部分代碼同樣,因爲我在其餘地方都要重用這些代碼,就裝在一個地方省得重複寫了。
我在Service層作了一下測試,下面看看具體的讀取過程:
private final String tableName = "kpi"; @Override public List<GenPowerEntity> getGenPower(String farmid,int ltb,int htb,String start,String end) { List<GenPowerEntity> list = new ArrayList<GenPowerEntity>(); HBaseConnector hbaseConn = new HBaseConnector(); HBaseAdmin admin = hbaseConn.getHBaseAdmin(); try { if(admin.tableExists(tableName)){ HTable table = new HTable(hbaseConn.getConfiguration(), tableName); Scan scan = new Scan(); scan.addFamily(Bytes.toBytes("base")); scan.addFamily(Bytes.toBytes("gpower")); scan.addFamily(Bytes.toBytes("userate")); String startRowKey = new String(); String stopRowKey = new String(); if("".equals(start) && !"".equals(end)){ stopRowKey = farmid + ":" + Tools.addZero(htb) + ":" + end; scan.setStopRow(Bytes.toBytes(stopRowKey)); }else if(!"".equals(start) && "".equals(end)){ startRowKey = farmid + ":" + Tools.addZero(ltb) + ":" + start; scan.setStartRow(Bytes.toBytes(startRowKey)); }else if(!"".equals(start) && !"".equals(end)){ startRowKey = farmid + ":" + Tools.addZero(ltb) + ":" + start; stopRowKey = farmid + ":" + Tools.addZero(htb) + ":" + end; scan.setStartRow(Bytes.toBytes(startRowKey)); scan.setStopRow(Bytes.toBytes(stopRowKey)); }else{ table.close(); admin.close(); return null; } ResultScanner rsc = table.getScanner(scan); Iterator<Result> it = rsc.iterator(); List<GenPowerEntity> slist = new ArrayList<GenPowerEntity>(); List<UseRateEntity> ulist = new ArrayList<UseRateEntity>(); String tempRowKey = "";//這個臨時rowkey是用來判斷一行數據是否已經讀取完了的。 GenPowerEntity gpower = new GenPowerEntity(); UseRateEntity userate = new UseRateEntity(); while(it.hasNext()){ for(Cell cell: it.next().rawCells()){ String rowKey = new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength(),"UTF-8"); String family = new String(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength(),"UTF-8"); String qualifier = new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength(),"UTF-8"); String value = new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength(),"UTF-8");//假如咱們當時插入HBase的時候沒有把int、float等類型的數據轉換成String,這裏就會亂碼了,而且用Bytes.toInt()這個方法還原也沒有用,哈哈 System.out.println("RowKey=>"+rowKey+"->"+family+":"+qualifier+"="+value); if("".equals(tempRowKey)) tempRowKey = rowKey; if(!rowKey.equals(tempRowKey)){ slist.add(gpower); ulist.add(userate); gpower = null; userate = null; gpower = new GenPowerEntity(); userate = new UseRateEntity(); tempRowKey = rowKey; } switch(family){ case "base": switch(qualifier){ case "fid": gpower.setFarmid(value); userate.setFarmid(value); break; case "tid": gpower.setTurbineid(Integer.parseInt(value)); userate.setTurbineid(Integer.parseInt(value)); break; case "date": SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); Date date = null; try { date = sdf.parse(value); } catch (ParseException e) { e.printStackTrace(); } gpower.setVtime(date); userate.setVtime(date); break; } break; case "gpower": switch(qualifier){ case "power": gpower.setValue(Float.parseFloat(value)); break; case "windspeed": gpower.setWindspeed(Float.parseFloat(value)); break; case "unpower": gpower.setUnvalue(Float.parseFloat(value)); break; case "theory": gpower.setTvalue(Float.parseFloat(value)); break; case "coup": gpower.setCoup(Float.parseFloat(value)); break; } break; case "userate": switch(qualifier){ case "num": userate.setFnum(Integer.parseInt(value)); break; case "power": userate.setFpower(Float.parseFloat(value)); break; case "time": userate.setFvalue(Float.parseFloat(value)); break; } break; } } } rsc.close(); table.close(); admin.close(); ...... } } catch (IOException e) { e.printStackTrace(); } return list; }
這是我在Service層中用做測試的一個方法,業務邏輯代碼能夠直接無視(已經用.....代替了,哈哈),至此咱們的全部工做完成,對於更深刻的應用,還要靠本身去認真挖掘學習了。