實戰:在Java Web 項目中使用HBase

在此以前咱們使用Mysql做爲數據源,但發現這數據增加速度太快,而且因爲種種緣由,所以必須使用HBase,因此咱們要把Mysql表裏面的數據遷移到HBase中,在這裏我就不講解、不爭論爲何要使用HBase,HBase是什麼了,喜歡的就認真看下去,總有些地方是有用的 java

咱們要作的3大步驟: mysql


  1. 新建HBase表格。 web

  2. 把MYSQL數據遷移到HBase中。 sql

  3. 在Java Web項目中讀取HBase的數據。 數據庫


先介紹一下必要的一些環境: apache

HBase的版本:0.98.8-hadoop2 windows

所需的依賴包api

commons-codec-1.7.jar
commons-collections-3.2.1.jar
commons-configuration-1.6.jar
commons-lang-2.6.jar
commons-logging-1.1.3.jar
guava-12.0.1.jar
hadoop-auth-2.5.0.jar
hadoop-common-2.5.0.jar
hbase-client-0.98.8-hadoop2.jar
hbase-common-0.98.8-hadoop2.jar
hbase-protocol-0.98.8-hadoop2.jar
htrace-core-2.04.jar
jackson-core-asl-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
log4j-1.2.17.jar
mysql-connector-java-5.1.7-bin.jar
netty-3.6.6.Final.jar
protobuf-java-2.5.0.jar
slf4j-api-1.7.5.jar
slf4j-log4j12-1.7.5.jar
zookeeper-3.4.6.jar

若是在你的web項目中有些包已經存在,保留其中一個就行了,省得報奇怪的錯誤就麻煩了。 app


步驟1:建表 分佈式

在此以前,我在Mysql中的業務數據表一共有6個,其結構重複性過高了,首先看看我在HBase裏面的表結構:

表名 kpi
key fid+tid+date
簇(family) base gpower userate consum time
描述 基礎信息 發電量相關指標 可利用率 自耗電量 累計運行小時數 檢修小時數 利用小時數
列(qualifier) fid tid date power windspeed unpower theory coup time power num cpower gpower runtime checktime usetime
描述 風場ID 風機號 日期 發電量 風速 棄風電量 理論電量 耦合度 故障時間 故障損失電量 故障臺次 當天自耗電量 當天發電量 當天併網秒數 當天檢修秒數 當天利用秒數

這個表中咱們有5個family,其中base Family是對應6個mysql表中的key列, gpower、userate、consum分別對應一個表,time對應3個表。

這個kpi表的rowkey設計是base中的3個qualifier,分別從3個維度查詢數據,這樣的設計已經能夠知足咱們的需求了。

具體在HBase中如何建表如何搭建環境本身參考我寫的【手把手教你配置HBase徹底分佈式環境】這篇文章吧。


步驟2:把MySQL數據遷移到HBase


這時我用gpower對應的mysql表來作演示吧,其餘表的道理都同樣。(這裏可能有人會說爲何不用第三方插件直接數據庫對數據庫遷移,這裏我統一回答一下,我不會,我也不須要。)

okay,首先咱們來看看代碼先吧:

import java.io.File;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

public class GpowerTransfer{
	
	private static final String QUOREM = "192.168.103.50,192.168.103.51,192.168.103.52,192.168.103.53,192.168.103.54,192.168.103.55,192.168.103.56,192.168.103.57,192.168.103.58,192.168.103.59,192.168.103.60";//這裏是你HBase的分佈式集羣結點,用逗號分開。
	private static final String CLIENT_PORT = "2181";//端口
	private static Logger log = Logger.getLogger(GpowerTransfer.class);
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		BasicConfigurator.configure();
		log.setLevel(Level.DEBUG);
		String tableName = "kpi";//HBase表名稱
		Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", QUOREM);   
        conf.set("hbase.zookeeper.property.clientPort", CLIENT_PORT);
        try { File workaround = new File(".");
            System.getProperties().put("hadoop.home.dir", workaround.getAbsolutePath());
            new File("./bin").mkdirs();
            new File("./bin/winutils.exe").createNewFile();//這幾段奇怪的代碼在windows跑的時候不加有時候分報錯,在web項目中能夠不要,但單獨的java程序仍是加上去吧,知道什麼緣由的小夥伴能夠告訴我一下,不勝感激。
			HBaseAdmin admin = new HBaseAdmin(conf);
			if(admin.tableExists(tableName)){
				Class.forName("com.mysql.jdbc.Driver");//首先將mysql中的數據讀取出來,而後再插入到HBase中
				String url = "jdbc:mysql://192.168.***.***:3306/midb?useUnicode=true&characterEncoding=utf-8";
				String username = "********";
				String password = "********";
				Connection con = DriverManager.getConnection(url, username, password);
				PreparedStatement pstmt = con.prepareStatement("select * from kpi_gpower");
				ResultSet rs = pstmt.executeQuery();
				HTable table = new HTable(conf, tableName);
				log.debug(tableName + ":start copying data to hbase...");
				List<Put> list = new ArrayList<Put>();
				SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
				String base = "base";//family名稱
				String gpower = "gpower";//family名稱
				String[] qbase = {"fid","tid","date"};//qualifier名稱
				String[] qgpower = {"power","windspeed","unpower","theory","coup"};//qualifier名稱
				while(rs.next()){
				    String rowKey = rs.getString("farmid") + ":" + (rs.getInt("turbineid")<10?("0"+rs.getInt("turbineid")):rs.getInt("turbineid")) + ":" + sdf.format(rs.getDate("vtime"));//拼接rowkey
				    Put put = new Put(Bytes.toBytes(rowKey));//新建一條記錄,而後下面對相應的列進行賦值
				    put.add(base.getBytes(), qbase[0].getBytes(), Bytes.toBytes(rs.getString("farmid")));//base:fid
				    put.add(base.getBytes(), qbase[1].getBytes(), Bytes.toBytes(rs.getInt("turbineid")+""));//base:tid
				    put.add(base.getBytes(), qbase[2].getBytes(), Bytes.toBytes(rs.getDate("vtime")+""));//base:date
				    put.add(gpower.getBytes(), qgpower[0].getBytes(), Bytes.toBytes(rs.getFloat("value")+""));//gpower:power
				    put.add(gpower.getBytes(), qgpower[1].getBytes(), Bytes.toBytes(rs.getFloat("windspeed")+""));//gpower:windspeed
				    put.add(gpower.getBytes(), qgpower[2].getBytes(), Bytes.toBytes(rs.getFloat("unvalue")+""));//gpower:unvalue
				    put.add(gpower.getBytes(), qgpower[3].getBytes(), Bytes.toBytes(rs.getFloat("theory")+""));//gpower:theory
				    put.add(gpower.getBytes(), qgpower[4].getBytes(), Bytes.toBytes(rs.getFloat("coup")+""));//gpower:coup
				    list.add(put);
				}
				table.put(list);//這裏真正對錶進行插入操做
				log.debug(tableName + ":completed data copy!");
				table.close();//這裏要很是注意一點,若是你頻繁地對錶進行打開跟關閉,性能將會直線降低,可能跟集羣有關係。
			}else{
				admin.close();
				log.error("table '" + tableName + "' not exisit!");
				throw new IllegalArgumentException("table '" + tableName + "' not exisit!");
			}
			admin.close();
		} catch (Exception e) {
			e.printStackTrace();
		} 
	}
}



在put語句進行add的時候要特別注意:對於int、float、Date等等非String類型的數據,要記得將其轉換成String類型,這裏我直接用+""解決了,不然在你讀取數據的時候就會遇到麻煩了。

步驟3:Java Web項目讀取HBase裏面的數據

ok,咱們成功地把數據遷移到HBase,咱們剩下的任務就是在Web應用中讀取數據了。

首先咱們要確保Web項目中已經把必要的Jar包添加到ClassPath了,下面我對一些HBase的鏈接作了小封裝:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;

/**
 * @author a01513
 *
 */
public class HBaseConnector {
	
	private static final String QUOREM = "192.168.103.50,192.168.103.51,192.168.103.52,192.168.103.53,192.168.103.54,192.168.103.55,192.168.103.56,192.168.103.57,192.168.103.58,192.168.103.59,192.168.103.60";
	private static final String CLIENT_PORT = "2181";
	private HBaseAdmin admin;
	private Configuration conf;
	
	
	public HBaseAdmin getHBaseAdmin(){
		getConfiguration();
        try {
			admin = new HBaseAdmin(conf);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return admin; 
	}
	
	public Configuration getConfiguration(){
		if(conf == null){
			conf = HBaseConfiguration.create();
	        conf.set("hbase.zookeeper.quorum", QUOREM);   
	        conf.set("hbase.zookeeper.property.clientPort", CLIENT_PORT);   
		}
		return conf;
	}



這裏的代碼基本上跟遷移的那部分代碼同樣,因爲我在其餘地方都要重用這些代碼,就裝在一個地方省得重複寫了。

我在Service層作了一下測試,下面看看具體的讀取過程:

private final String tableName = "kpi";
@Override
	public List<GenPowerEntity> getGenPower(String farmid,int ltb,int htb,String start,String end) {
		List<GenPowerEntity> list = new ArrayList<GenPowerEntity>();
		HBaseConnector hbaseConn = new HBaseConnector();
		HBaseAdmin admin = hbaseConn.getHBaseAdmin();
		try {
			if(admin.tableExists(tableName)){
				HTable table = new HTable(hbaseConn.getConfiguration(), tableName);
				Scan scan = new Scan();
				scan.addFamily(Bytes.toBytes("base"));
				scan.addFamily(Bytes.toBytes("gpower"));
				scan.addFamily(Bytes.toBytes("userate"));
				String startRowKey = new String();
				String stopRowKey = new String();
				if("".equals(start) && !"".equals(end)){
					stopRowKey = farmid + ":" + Tools.addZero(htb) + ":" + end;
					scan.setStopRow(Bytes.toBytes(stopRowKey));
				}else if(!"".equals(start) && "".equals(end)){
					startRowKey = farmid + ":" + Tools.addZero(ltb) + ":" + start;
					scan.setStartRow(Bytes.toBytes(startRowKey));
				}else if(!"".equals(start) && !"".equals(end)){
					startRowKey = farmid + ":" + Tools.addZero(ltb) + ":" + start;
					stopRowKey = farmid + ":" + Tools.addZero(htb) + ":" + end;
					scan.setStartRow(Bytes.toBytes(startRowKey));
					scan.setStopRow(Bytes.toBytes(stopRowKey));
				}else{
					table.close();
					admin.close();
					return null;
				}
				ResultScanner rsc = table.getScanner(scan);
				Iterator<Result> it = rsc.iterator();
				List<GenPowerEntity> slist = new ArrayList<GenPowerEntity>();
				List<UseRateEntity> ulist = new ArrayList<UseRateEntity>();
				String tempRowKey = "";//這個臨時rowkey是用來判斷一行數據是否已經讀取完了的。
				GenPowerEntity gpower = new GenPowerEntity();
				UseRateEntity userate = new UseRateEntity();
				while(it.hasNext()){
					for(Cell cell: it.next().rawCells()){
						String rowKey = new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength(),"UTF-8");
						String family = new String(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength(),"UTF-8");
						String qualifier = new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength(),"UTF-8");
						String value = new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength(),"UTF-8");//假如咱們當時插入HBase的時候沒有把int、float等類型的數據轉換成String,這裏就會亂碼了,而且用Bytes.toInt()這個方法還原也沒有用,哈哈
						System.out.println("RowKey=>"+rowKey+"->"+family+":"+qualifier+"="+value);
						if("".equals(tempRowKey))
							tempRowKey = rowKey;
						if(!rowKey.equals(tempRowKey)){
							slist.add(gpower);
							ulist.add(userate);
							gpower = null;
							userate = null;
							gpower = new GenPowerEntity();
							userate = new UseRateEntity();
							tempRowKey = rowKey;
						}
						switch(family){
						case "base":
							switch(qualifier){
							case "fid":
								gpower.setFarmid(value);
								userate.setFarmid(value);
								break;
							case "tid":
								gpower.setTurbineid(Integer.parseInt(value));
								userate.setTurbineid(Integer.parseInt(value));
								break;
							case "date":
								SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
								Date date = null;
								try {
									date = sdf.parse(value);
								} catch (ParseException e) {
									e.printStackTrace();
								}
								gpower.setVtime(date);
								userate.setVtime(date);
								break;
							}
							break;
						case "gpower":
							switch(qualifier){
							case "power":
								gpower.setValue(Float.parseFloat(value));
								break;
							case "windspeed":
								gpower.setWindspeed(Float.parseFloat(value));
								break;
							case "unpower":
								gpower.setUnvalue(Float.parseFloat(value));
								break;
							case "theory":
								gpower.setTvalue(Float.parseFloat(value));
								break;
							case "coup":
								gpower.setCoup(Float.parseFloat(value));
								break;
							}
							break;
						case "userate":
							switch(qualifier){
							case "num":
								userate.setFnum(Integer.parseInt(value));
								break;
							case "power":
								userate.setFpower(Float.parseFloat(value));
								break;
							case "time":
								userate.setFvalue(Float.parseFloat(value));
								break;
							}
							break;
						}
						
					}
				}
				rsc.close();
				table.close();
				admin.close();
				......
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
		return list;
	}



這是我在Service層中用做測試的一個方法,業務邏輯代碼能夠直接無視(已經用.....代替了,哈哈),至此咱們的全部工做完成,對於更深刻的應用,還要靠本身去認真挖掘學習了。

相關文章
相關標籤/搜索