介紹: python
Hbase:開源的分佈式數據庫 數據庫
資料介紹:http://www.oschina.net/p/hbase api
Thrift:一個軟件框架,用來進行可擴展且跨語言的服務的開發。最初由Facebook開發,做爲Hadoop的一個工具,提供跨語言服務開發; 數組
資料介紹:http://dongxicheng.org/search-engine/thrift-guide/ app
官方使用手冊:http://download.csdn.net/detail/wyjzt999/5141006從安裝到使用都很全面 框架
咱們項目裏客戶端是用python開發,所以須要Thrift提供server端,通過thrift對Hbase進行數據讀寫操做,性能很是不錯,而且能夠在Hadoop集羣上作並行拓展,穩定性高,Facebook內部通訊也是採用thrift來作; dom
首先學習一下Hbase的表結構: socket
Row Key 分佈式
Row key行鍵 (Row key)能夠是任意字符串(最大長度是 64KB,實際應用中長度通常爲 10-100bytes),在hbase內部,row key保存爲字節數組。 ide
列族 (column family)
hbase表中的每一個列,都歸屬與某個列族。列族是表的chema的一部分(而列不是),必須在使用表以前定義。列名都以列族做爲前綴。例如courses:history , courses:math 都屬於 courses 這個列族。
時間戳
HBase中經過row和columns肯定的爲一個存貯單元稱爲cell。每一個 cell都保存着同一份數據的多個版本。版本經過時間戳來索引。時間戳的類型是 64位整型。時間戳能夠由hbase(在數據寫入時自動 )賦值,此時時間戳是精確到毫秒的當前系統時間。時間戳也能夠由客戶顯式賦值。若是應用程序要避免數據版本衝突,就必須本身生成具備惟一性的時間戳。每一個 cell中,不一樣版本的數據按照時間倒序排序,即最新的數據排在最前面。
爲了不數據存在過多版本形成的的管理 (包括存貯和索引)負擔,hbase提供了兩種數據版本回收方式。一是保存數據的最後n個版本,二是保存最近一段時間內的版本(好比最近七天)。用戶能夠針對每一個列族進行設置。
對Hbase而言,表結構設計會對系統的性能以及開銷上形成很大的區別;
1.首先創建與thriftserver端的鏈接
from thrift import Thrift from thrift.transport import TSocket, TTransport from thrift.protocol import TBinaryProtocol from hbase import Hbase #server端地址和端口 transport = TSocket.TSocket(host, port) #能夠設置超時 transport.setTimeout(5000) #設置傳輸方式(TFramedTransport或TBufferedTransport) trans = TTransport.TBufferedTransport(transport) #設置傳輸協議 protocol = TBinaryProtocol.TBinaryProtocol(trans) #肯定客戶端 client = Hbase.Client(protocol) #打開鏈接 transport.open()
from hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation, TRegionInfo from hbase.ttypes import IOError, AlreadyExists #獲取表名 client.getTableNames() #建立新表 _TABLE = "keyword"
demo = ColumnDescriptor(name='data:',maxVersions = 10)#列族data能保留最近的10個數據,每一個列名後面要跟:號 createTable(_TABLE, [demo])
#建立列名2個data:url data:word tmp1= [Mutation(column="data:url", value="www.baidu.com")] tmp2= [Mutation(column="data:word", value="YaGer")] #新建2個列 (表名,行鍵, 列名) client.mutateRow(_TABLE, row, tmp1) client.mutateRow(_TABLE, row, tmp1)
#從表中取數據 #經過最大版本數取數據 client.getByMaxver(_TABLE,'00001','data:word', 10)#一次取10個版本 #取列族內數據 client.getColumns(_TABLE, '00001')
3.支持thrift並行拓展和失效轉移Failover機制
#file name:hbaseconn.py #! /usr/bin/env python # -*- coding: utf-8 -*- #提供創建鏈接的方法和取數據操做的方法 import logging import traceback import time,sys from unittest import TestCase, main import socket from thrift import Thrift from thrift.transport import TSocket from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol from hbase import Hbase from hbase.ttypes import IOError as HbaseIOError, AlreadyExists from hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation _TABLE = 'thrift_check' _ROW = 'test_for_thrift' _DATA = 'data:word' _VALUE = 'Flag' class DbConnError(Exception): """Conn Db exception. Timeout or any except for connection from low layer api """ pass class Connection: def __init__(self, trans, client, addr, port): self.trans = trans self.client = client self.hp = addr self.port = port pass class CenterDb: @classmethod def open(cls, host_port): cls.tc_list = [] for hp in host_port: trans, client = cls._open(*hp) cls.tc_list.append(Connection(trans, client, hp[0], hp[1])) return cls.tc_list @classmethod def _open(cls, host, port): transport = TSocket.TSocket(host, port) transport.setTimeout(5000) trans = TTransport.TBufferedTransport(transport) protocol = TBinaryProtocol.TBinaryProtocol(trans) client = Hbase.Client(protocol) ok = False try: trans.open() ok = True except TSocket.TTransportException, e: logerr('CenterDb(Hbase) Open error(%s, %d)' % (host, port)) ok = False else: pass # dbg('CenterDb connected (%s, %d)' % (host, port)) return trans, client @classmethod def _initTable(cls, client): dat = ColumnDescriptor(name = 'data', maxVersions = 1) tmp = [Mutation(column = _DATA, value = _VALUE)] try: client.createTable(_TABLE, [dat]) client.mutateRow(_TABLE, _ROW, tmp) dbg("Create Table For Thrift Test Success!") return True except AlreadyExists: return True return False @classmethod def _get(cls, client): client.getVer(_TABLE,_ROW,_DATA, 1) return True @classmethod def _reconnect(cls, trans): trans.close() trans.open() def __init__(self, transport, client): self.t = transport self.c = client def __str__(self): return 'CenterDb.open(%s, %d)' % (self.t, self.c) def __del__(self): self.t.close() def getColumns(self, table, row): tr_list = [] tr_list = self._failover('getRow', table, row) if (not tr_list): return {} return tr_list[0].columns
#file name: thriftmanage.py #! /usr/bin/python # -*- coding: utf-8 -*- #提供管理thrift鏈接的方法 #建一個線程循環檢測thrift的可以使用性,非鏈接池 import logging import time,sys,random import threading from hbase import Hbase from hbase.ttypes import IOError as HbaseIOError from hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation import hbaseconn from hbaseconn import CenterDb class Failover(threading.Thread): def __init__(self,serverid, serverlist): threading.Thread.__init__(self) self.serverlist = serverlist self.serverid = serverid self.lenlist = len(serverlist) self.conn_list = [] self.invalid_con_list = [] ''' cur_conn : now oper is using my_conn : this schd server should use [(trans1,client1),(trans2,client2),...] ''' self.cur_conn = None self.my_conn = None self._makeConn() self._getConn() self._init_for_check() self.switched_flag = False def _makeConn(self): '''make all the conntion to thrift serverlist ''' self.conn_list = CenterDb.open(self.serverlist) def _getConn(self): '''get only one connection from the conn_list,confirm cur_conn and my_conn ''' self.cur_conn = self.my_conn = self.conn_list[(int(self.serverid) % self.lenlist)] self.other_conn = self.conn_list[:] self.other_conn.remove(self.my_conn) return True def _init_for_check(self): '''init _TABLLE test_for_flag ''' try: if not CenterDb._initTable( self.my_conn.client ): dbg("Error In Create Table For Thrift Test!") except Exception, e: dbg("init_for_check thrift:%s" % e) '''make the only conn for check proc ''' self.check_conn = CenterDb.open( ((self.my_conn.hp, self.my_conn.port),))[0] def _switch(self): '''when my_conn failed, choose the other client randomly; when my_conn is reset OK, cur_conn will use my_conn again ''' #print 'Schd%s come in _switch' % self.serverid if 0 == len(self.other_conn): return False trycount = 0 while True: try: if trycount == 3*self.lenlist :#try 3*length times return False tmp_conn = random.choice(self.other_conn) #CenterDb._reconnect(tmp_conn.trans) #DEBUG if self._checker(tmp_conn): self.cur_conn = tmp_conn dbg('Schd%s _switch cur_conn: %s' % (self.serverid, self.cur_conn.hp)) return True else: trycount += 1 logerr('Schd%s _switch for %d times' % (self.serverid, trycount)) CenterDb._reconnect(tmp_conn.trans)#close this trans and try again;breakdown early if self._checker(tmp_conn): self.cur_conn = tmp_conn return True else: continue#can't be used except Exception, e: continue def _failover(self,oper, *args, **kwargs): result = [] try : result = getattr(self.cur_conn.client, oper)(*args, **kwargs) return result except HbaseIOError, e: logerr("_failover : %s " % e) except Exception, e: logerr( 'Schd%s _failover : Connect to %s thrift Server closed! Choose another......Reason:%s ' % \ (self.serverid, self.cur_conn.hp, e)) self.cur_conn.trans.close() self.switched_flag = True if self._switch(): logerr('Schd%s _failover : Now using %s server' % (self.serverid, self.cur_conn.hp)) result = getattr(self.cur_conn.client, oper)(*args, **kwargs) else: logerr( 'Schd%s _failover : Switch 3 rotate Find No Healthy Thrift server !' % self.serverid) return result def getByMaxver(self, table, row, column, max_ver): """Get cell list by ver Get cell list no more than max versions Args: table: table name row: row key column: column name max_ver: max version to retrieve cell """ cell_list = [] start_time = time.time() cell_list = self._failover('getVer', table, row, column, max_ver) if (not cell_list): return [] take = time.time() - start_time if take > 0.015: logerr('Hbase over 15ms:take %s ms' % str("%.3f" % (take*1000))) return map(lambda x: (x.value, x.timestamp), cell_list) def getColumns(self, table, row): tr_list = [] tr_list = self._failover('getRow', table, row) if (not tr_list): return {} return tr_list[0].columns def _checker(self,conn): '''check my_conn , be sure it is connected and get data OK ''' try: if conn.trans.isOpen(): CenterDb._get(conn.client) return True else: return False except Exception, e: logerr( 'Schd%s _checker : Connect to %s closed! Please Restart now..... Reason: %s ' % \ (self.serverid, conn.hp, e)) return False def _restart(self): '''if my_conn failed, restart it's trans ''' while True: #print 'come in _restart %s ' % self.my_conn.hp self.my_conn.trans.close() self.check_conn.trans.close() try: time.sleep(2) self.my_conn.trans.open() self.check_conn.trans.open() if self.my_conn.trans.isOpen() and self.check_conn.trans.isOpen(): CenterDb._get(self.check_conn.client) #self.check_conn.client.getVer('keywordurl','test_for_thrift','data:word', 1) return True else: continue except Exception, e: logerr('Schd%s _restart : Connect to %s is not restart yet ... Reason:%s ' % \ (self.serverid, self.my_conn.hp, e)) self.my_conn.trans.close() self.check_conn.trans.close() continue def run(self): while True: time.sleep(1) if self._checker(self.check_conn) and (not self.switched_flag): continue else: if self._restart(): logerr( 'Schd%s Connection from %s to my:%s Recovered ! ' % \ (self.serverid,self.cur_conn.hp,self.my_conn.hp)) self.cur_conn = self.my_conn self.switched_flag = False continue
使用thrift生成的代碼中提供的方法有: void enableTable(Bytes tableName) void disableTable(Bytes tableName) bool isTableEnabled(Bytes tableName) void compact(Bytes tableNameOrRegionName) void majorCompact(Bytes tableNameOrRegionName) getTableNames() getColumnDescriptors(Text tableName) getTableRegions(Text tableName) void createTable(Text tableName, columnFamilies) void deleteTable(Text tableName) get(Text tableName, Text row, Text column) getVer(Text tableName, Text row, Text column, i32 numVersions) getVerTs(Text tableName, Text row, Text column, i64 timestamp, i32 numVersions) getRow(Text tableName, Text row) getRowWithColumns(Text tableName, Text row, columns) getRowTs(Text tableName, Text row, i64 timestamp) getRowWithColumnsTs(Text tableName, Text row, columns, i64 timestamp) getRows(Text tableName, rows) getRowsWithColumns(Text tableName, rows, columns) getRowsTs(Text tableName, rows, i64 timestamp) getRowsWithColumnsTs(Text tableName, rows, columns, i64 timestamp) void mutateRow(Text tableName, Text row, mutations) void mutateRowTs(Text tableName, Text row, mutations, i64 timestamp) void mutateRows(Text tableName, rowBatches) void mutateRowsTs(Text tableName, rowBatches, i64 timestamp) i64 atomicIncrement(Text tableName, Text row, Text column, i64 value) void deleteAll(Text tableName, Text row, Text column) void deleteAllTs(Text tableName, Text row, Text column, i64 timestamp) void deleteAllRow(Text tableName, Text row) void deleteAllRowTs(Text tableName, Text row, i64 timestamp) ScannerID scannerOpenWithScan(Text tableName, TScan scan) ScannerID scannerOpen(Text tableName, Text startRow, columns) ScannerID scannerOpenWithStop(Text tableName, Text startRow, Text stopRow, columns) ScannerID scannerOpenWithPrefix(Text tableName, Text startAndPrefix, columns) ScannerID scannerOpenTs(Text tableName, Text startRow, columns, i64 timestamp) ScannerID scannerOpenWithStopTs(Text tableName, Text startRow, Text stopRow, columns, i64 timestamp) scannerGet(ScannerID id) scannerGetList(ScannerID id, i32 nbRows) void scannerClose(ScannerID id) 參考:http://yannramin.com/2008/07/19/using-facebook-thrift-with-python-and-hbase/