Python 經過thrift接口鏈接Hbase讀取存儲數據

介紹: python

Hbase:開源的分佈式數據庫 數據庫

資料介紹:http://www.oschina.net/p/hbase api

Thrift:一個軟件框架,用來進行可擴展且跨語言的服務的開發。最初由Facebook開發,做爲Hadoop的一個工具,提供跨語言服務開發; 數組

資料介紹:http://dongxicheng.org/search-engine/thrift-guide/ app

官方使用手冊:http://download.csdn.net/detail/wyjzt999/5141006從安裝到使用都很全面 框架

咱們項目裏客戶端是用python開發,所以須要Thrift提供server端,通過thrift對Hbase進行數據讀寫操做,性能很是不錯,而且能夠在Hadoop集羣上作並行拓展,穩定性高,Facebook內部通訊也是採用thrift來作; dom

 

首先學習一下Hbase的表結構: socket

Row Key 分佈式

Row key行鍵 (Row key)能夠是任意字符串(最大長度是 64KB,實際應用中長度通常爲 10-100bytes),在hbase內部,row key保存爲字節數組。 ide

列族 (column family)

hbase表中的每一個列,都歸屬與某個列族。列族是表的chema的一部分(而列不是),必須在使用表以前定義。列名都以列族做爲前綴。例如courses:history , courses:math 都屬於 courses 這個列族。

時間戳

HBase中經過row和columns肯定的爲一個存貯單元稱爲cell。每一個 cell都保存着同一份數據的多個版本。版本經過時間戳來索引。時間戳的類型是 64位整型。時間戳能夠由hbase(在數據寫入時自動 )賦值,此時時間戳是精確到毫秒的當前系統時間。時間戳也能夠由客戶顯式賦值。若是應用程序要避免數據版本衝突,就必須本身生成具備惟一性的時間戳。每一個 cell中,不一樣版本的數據按照時間倒序排序,即最新的數據排在最前面。

爲了不數據存在過多版本形成的的管理 (包括存貯和索引)負擔,hbase提供了兩種數據版本回收方式。一是保存數據的最後n個版本,二是保存最近一段時間內的版本(好比最近七天)。用戶能夠針對每一個列族進行設置。

對Hbase而言,表結構設計會對系統的性能以及開銷上形成很大的區別;


 

 1.首先創建與thriftserver端的鏈接

from thrift import Thrift
from thrift.transport import TSocket, TTransport
from thrift.protocol import TBinaryProtocol
from hbase import Hbase

#server端地址和端口
transport = TSocket.TSocket(host, port)
#能夠設置超時
transport.setTimeout(5000)
#設置傳輸方式(TFramedTransport或TBufferedTransport)
trans = TTransport.TBufferedTransport(transport)
#設置傳輸協議
protocol = TBinaryProtocol.TBinaryProtocol(trans)
#肯定客戶端
client = Hbase.Client(protocol)
#打開鏈接
transport.open()

2.而後就能夠作具體的操做,好比查表,刪表,插入Row Key等等
from hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation, TRegionInfo
from hbase.ttypes import IOError, AlreadyExists

#獲取表名
client.getTableNames()
#建立新表
_TABLE = "keyword"
demo = ColumnDescriptor(name='data:',maxVersions = 10)#列族data能保留最近的10個數據,每一個列名後面要跟:號
createTable(_TABLE, [demo])

#建立列名2個data:url data:word  
tmp1= [Mutation(column="data:url", value="www.baidu.com")]
tmp2= [Mutation(column="data:word", value="YaGer")]
#新建2個列 (表名,行鍵, 列名)
client.mutateRow(_TABLE, row, tmp1)
client.mutateRow(_TABLE, row, tmp1)


#從表中取數據
#經過最大版本數取數據
client.getByMaxver(_TABLE,'00001','data:word', 10)#一次取10個版本
#取列族內數據
client.getColumns(_TABLE, '00001')



3.支持thrift並行拓展和失效轉移Failover機制

#file name:hbaseconn.py
#! /usr/bin/env python  
# -*- coding: utf-8 -*-
#提供創建鏈接的方法和取數據操做的方法
import logging
import traceback
import time,sys
from unittest import TestCase, main
import socket

from thrift import Thrift  
from thrift.transport import TSocket  
from thrift.transport import TTransport  
from thrift.protocol import TBinaryProtocol  
from hbase import Hbase  

from hbase.ttypes import IOError as HbaseIOError, AlreadyExists
from hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation


_TABLE = 'thrift_check'
_ROW = 'test_for_thrift'
_DATA = 'data:word'
_VALUE = 'Flag'

class DbConnError(Exception):
    """Conn Db exception. 
    
    Timeout or any except for connection from low layer api
    """
    pass

class Connection:
    def __init__(self, trans, client, addr, port):
        self.trans = trans
        self.client = client
        self.hp = addr
        self.port = port
        pass

class CenterDb:
    @classmethod
    def open(cls, host_port):

        cls.tc_list = []
        for hp in host_port:
            trans, client = cls._open(*hp)
            cls.tc_list.append(Connection(trans, client, hp[0], hp[1]))

        return cls.tc_list

    @classmethod
    def _open(cls, host, port):
        transport = TSocket.TSocket(host, port)
        transport.setTimeout(5000)
        trans = TTransport.TBufferedTransport(transport)
        protocol = TBinaryProtocol.TBinaryProtocol(trans)
        client = Hbase.Client(protocol)
        ok = False
        try:
            trans.open()
            ok = True
        except TSocket.TTransportException, e:
            logerr('CenterDb(Hbase) Open error(%s, %d)' % (host, port))
            ok = False
        else:
            pass
#            dbg('CenterDb connected (%s, %d)' % (host, port))
        return trans, client

    @classmethod
    def _initTable(cls, client):
        dat = ColumnDescriptor(name = 'data', maxVersions = 1)
        tmp = [Mutation(column = _DATA, value = _VALUE)]
        try:
           client.createTable(_TABLE, [dat])
           client.mutateRow(_TABLE, _ROW, tmp)
           dbg("Create Table For Thrift Test Success!")
           return  True
        except AlreadyExists:
            return True
        return False

    @classmethod
    def _get(cls, client):
        client.getVer(_TABLE,_ROW,_DATA, 1)
        return True

    @classmethod
    def _reconnect(cls, trans):
        trans.close()
        trans.open()


    def __init__(self, transport, client):
        self.t = transport
        self.c = client

    def __str__(self):
        return 'CenterDb.open(%s, %d)' % (self.t, self.c)

    def __del__(self):
        self.t.close()
    

    def getColumns(self, table, row):

        tr_list = []
        tr_list = self._failover('getRow', table, row)
        if (not tr_list):
            return {}
        return tr_list[0].columns



#file name: thriftmanage.py
#! /usr/bin/python
# -*- coding: utf-8 -*-
#提供管理thrift鏈接的方法
#建一個線程循環檢測thrift的可以使用性,非鏈接池
import logging
import time,sys,random
import threading
from hbase import Hbase  
from hbase.ttypes import IOError as HbaseIOError
from hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation
import hbaseconn
from hbaseconn import CenterDb


class Failover(threading.Thread):
    def __init__(self,serverid, serverlist):
        threading.Thread.__init__(self)

        self.serverlist = serverlist
        self.serverid = serverid
        
        self.lenlist = len(serverlist)
        
        self.conn_list = []
        self.invalid_con_list = []
        '''
            cur_conn : now oper is using 
            my_conn : this schd server should use
            [(trans1,client1),(trans2,client2),...] 
        '''
        self.cur_conn = None
        self.my_conn = None
        
        self._makeConn()
        self._getConn()
        self._init_for_check()

        self.switched_flag = False


    def _makeConn(self):
        '''make all the conntion to thrift serverlist
        '''
        self.conn_list = CenterDb.open(self.serverlist)

    def _getConn(self):
        '''get only one connection from the conn_list,confirm cur_conn and my_conn
        '''
        self.cur_conn = self.my_conn = self.conn_list[(int(self.serverid) % self.lenlist)] 

        self.other_conn = self.conn_list[:]
        self.other_conn.remove(self.my_conn)
    
        return True

    def _init_for_check(self):
    
        '''init _TABLLE test_for_flag 
        '''
        try:
            if not CenterDb._initTable( self.my_conn.client ):
                dbg("Error In Create Table For Thrift Test!")
        except Exception, e:
            dbg("init_for_check thrift:%s" % e)

        
        '''make the only conn for check proc
        '''
        self.check_conn = CenterDb.open( ((self.my_conn.hp, self.my_conn.port),))[0]  

    def _switch(self):
        '''when my_conn failed, choose the other client randomly;
           when my_conn is reset OK, cur_conn will use my_conn again
        '''
        #print 'Schd%s come in _switch' % self.serverid
        if 0 == len(self.other_conn):
            return False
        trycount = 0
        while True:
            try:
                if trycount == 3*self.lenlist :#try 3*length times
                    return False
                tmp_conn = random.choice(self.other_conn)
                #CenterDb._reconnect(tmp_conn.trans)
                #DEBUG
                if self._checker(tmp_conn):
                    self.cur_conn = tmp_conn
                    dbg('Schd%s _switch cur_conn: %s' % (self.serverid, self.cur_conn.hp))
                    return True
                else:
                    trycount += 1
                    logerr('Schd%s _switch for %d times' % (self.serverid, trycount))
                    CenterDb._reconnect(tmp_conn.trans)#close this trans and try again;breakdown early
                    if self._checker(tmp_conn):
                        self.cur_conn = tmp_conn
                        return True
                    else:
                        continue#can't be used
            except Exception, e:
                continue

    def _failover(self,oper, *args, **kwargs):
        result = []
        try :
            result = getattr(self.cur_conn.client, oper)(*args, **kwargs)
            return result

        except HbaseIOError, e:
             logerr("_failover : %s " % e)

        except Exception, e:
            logerr( 'Schd%s _failover : Connect to %s thrift Server closed! Choose another......Reason:%s ' % \
                (self.serverid, self.cur_conn.hp, e))
            self.cur_conn.trans.close()
            self.switched_flag = True
            if self._switch():
                logerr('Schd%s _failover : Now using %s server' % (self.serverid, self.cur_conn.hp))
                result = getattr(self.cur_conn.client, oper)(*args, **kwargs)
            else: 
                logerr( 'Schd%s _failover : Switch 3 rotate Find No Healthy Thrift server !' % self.serverid)
        return result

    def getByMaxver(self, table, row, column, max_ver):
        """Get cell list by ver
        Get cell list no more than max versions
        Args:
            table: table name
            row: row key
            column: column name
            max_ver: max version to retrieve cell
        """
        cell_list = []
        start_time = time.time()
        cell_list = self._failover('getVer', table, row, column, max_ver)
        if (not cell_list):
            return []
        
        take = time.time() - start_time
        if take > 0.015:
            logerr('Hbase over 15ms:take %s ms' % str("%.3f" % (take*1000)))

        return map(lambda x: (x.value, x.timestamp), cell_list)
    
    def getColumns(self, table, row):
        tr_list = []
        tr_list = self._failover('getRow', table, row)
        if (not tr_list):
            return {}
        return tr_list[0].columns

                
    def _checker(self,conn):
        '''check my_conn , be sure it is connected and get data OK
        '''
        try:
            if conn.trans.isOpen():
                CenterDb._get(conn.client)
                return True    
            else: 
                return False 
            
        except Exception, e:
            logerr( 'Schd%s  _checker : Connect to %s closed! Please Restart now..... Reason: %s ' % \
                (self.serverid, conn.hp, e))
            return False
        
    def _restart(self):
        '''if my_conn failed, restart it's trans 
        '''
        while True:
            #print 'come in _restart %s ' % self.my_conn.hp
            self.my_conn.trans.close()
            self.check_conn.trans.close()
            try:
                time.sleep(2)
                self.my_conn.trans.open()
                self.check_conn.trans.open()
                if self.my_conn.trans.isOpen() and self.check_conn.trans.isOpen():
                    CenterDb._get(self.check_conn.client)
                    #self.check_conn.client.getVer('keywordurl','test_for_thrift','data:word', 1)
                    return True
                else: continue    
            except Exception, e:
                logerr('Schd%s _restart : Connect to %s is not restart yet ... Reason:%s ' % \
                    (self.serverid, self.my_conn.hp, e))
                self.my_conn.trans.close()
                self.check_conn.trans.close()
                continue

    def run(self):
        while True:
            time.sleep(1)
            if self._checker(self.check_conn) and (not self.switched_flag):
                continue
            else:
                if self._restart():
                    logerr( 'Schd%s Connection from  %s to my:%s Recovered ! ' % \
                        (self.serverid,self.cur_conn.hp,self.my_conn.hp))
                    self.cur_conn = self.my_conn
                    self.switched_flag = False
                continue


使用thrift生成的代碼中提供的方法有: void enableTable(Bytes tableName) void disableTable(Bytes tableName) bool isTableEnabled(Bytes tableName) void compact(Bytes tableNameOrRegionName) void majorCompact(Bytes tableNameOrRegionName) getTableNames() getColumnDescriptors(Text tableName) getTableRegions(Text tableName) void createTable(Text tableName, columnFamilies) void deleteTable(Text tableName) get(Text tableName, Text row, Text column) getVer(Text tableName, Text row, Text column, i32 numVersions) getVerTs(Text tableName, Text row, Text column, i64 timestamp, i32 numVersions) getRow(Text tableName, Text row) getRowWithColumns(Text tableName, Text row,  columns) getRowTs(Text tableName, Text row, i64 timestamp) getRowWithColumnsTs(Text tableName, Text row,  columns, i64 timestamp) getRows(Text tableName,  rows) getRowsWithColumns(Text tableName,  rows,  columns) getRowsTs(Text tableName,  rows, i64 timestamp) getRowsWithColumnsTs(Text tableName,  rows,  columns, i64 timestamp) void mutateRow(Text tableName, Text row,  mutations) void mutateRowTs(Text tableName, Text row,  mutations, i64 timestamp) void mutateRows(Text tableName,  rowBatches) void mutateRowsTs(Text tableName,  rowBatches, i64 timestamp) i64 atomicIncrement(Text tableName, Text row, Text column, i64 value) void deleteAll(Text tableName, Text row, Text column) void deleteAllTs(Text tableName, Text row, Text column, i64 timestamp) void deleteAllRow(Text tableName, Text row) void deleteAllRowTs(Text tableName, Text row, i64 timestamp) ScannerID scannerOpenWithScan(Text tableName, TScan scan) ScannerID scannerOpen(Text tableName, Text startRow,  columns) ScannerID scannerOpenWithStop(Text tableName, Text startRow, Text stopRow,  columns) ScannerID scannerOpenWithPrefix(Text tableName, Text startAndPrefix,  columns) ScannerID scannerOpenTs(Text tableName, Text startRow,  columns, i64 timestamp) ScannerID scannerOpenWithStopTs(Text tableName, Text startRow, Text stopRow,  columns, i64 timestamp) scannerGet(ScannerID id) scannerGetList(ScannerID id, i32 nbRows) void scannerClose(ScannerID id)    參考:http://yannramin.com/2008/07/19/using-facebook-thrift-with-python-and-hbase/

相關文章
相關標籤/搜索