大數據處理——硬盤級的緩衝隊列和BitMap

同步博客地址 :http://blog.jerrylab.info/2016/05/dashujuchuliyingpanjidehuanchongduiliehebitmap/數組

  • 介紹

這篇文章中,Jerry 提到了因爲抓取隊列過大,Bitmap位數過多致使內存沒法裝下這兩個數據結構的問題。所以我須要一種新的,可以將硬盤擴展爲可用內存的實現方法,以便在 不花錢升級配置的狀況下持久地運行TB級數據抓取任務。由此Jerry本身造了個硬盤級緩衝的輪子。這樣,就不再用擔憂因內存不夠而沒法判重或者是沒法 調度的問題了。不過缺點也是顯而易見的,運行速度會變慢。但這樣至少給本來內存溢出的項目提供了一種可行的解決方案。緩存

  • 思路

仿 照高速緩存到內存的緩衝機制,將這種機制再現到硬盤與內存便可實現硬盤級的緩存。由此咱們設置了兩種結構。對於BitMap來講,因爲它對硬盤是採起隨機 訪問的方式,即訪問的內容和順序是不固定的,所以咱們要仿照高速緩存中的快表對硬盤實現內存中哈希表緩存。對於隊列來講,因爲它每次的存取都一定發生在隊 首或者隊尾,因此咱們只須要對隊首的輸出和隊尾的輸入作內存緩存便可。數據結構

具體的來講,咱們把操做系統中的一個文件抽象地看做是一塊連續的存儲區域,抑或是一個超大的數組(雖然在物理結構上並非這樣,但邏輯結構能夠這樣看)。所以能夠對每個字節進行編號。因而乎,這些編號就能夠看做是文件這個低速內存的地址了。ide

由 於硬盤訪問延遲很是高,因此咱們須要合理的緩衝機制。對於BitMap,咱們把文件中的每個比特當作一個數存在與否的標誌(相對應原先的版本,這些比特 是存在內存中的。在內存中,創建一個哈希表,將硬盤地址多對一地映射到哈希表的項。對最近讀寫的內容作緩存,即記錄讀寫的真實位置和值。所以若是連續讀寫 相同的位置,就不須要屢次讀取或寫入硬盤。當哈希表的項發生衝突時,即對兩個不一樣的地址讀寫,但它們映射到的確是相同的哈希表項時,咱們將髒數據(僅在內 存中修改過的數據)寫回到硬盤中。ui

對於,隊列,考慮到硬盤讀寫連續地址的速度比較快,所以咱們每次都的在頭尾讀寫指定的數據量到內存緩衝數組中,防止出現屢次讀寫小數據的事情。操作系統

  • 實現

硬盤級緩衝隊列實現code

# from blog.Jerrylab.info
# 2016/5/6
# size -- the queue max size
# filename -- the file where the queue stored 
# ele -- the max size of one element of queue
# buffer -- the output input buffer size
# the class is safe in multiple thread environment
from Queue import Queue
import thread
import time
class HDQueue:
    def __init__(self,size,ele,filename,buffer=10000):
        self.buffer=buffer
        ele+=1
        self.ele=ele
        self.filename=filename
        self.size=size
        self.fo=open(filename,"wb")
        self.fs=(size+5)*ele+512;#512 head structure
        self.iq=Queue(maxsize=buffer)
        self.oq=Queue(maxsize=buffer)
        self.fo.truncate(self.fs)
        self.tail=1
        self.head=1
        self.dsize=0
        self.lpos=-111
        self.lock=thread.allocate_lock()
    def pos(self,index):
        return 512+index*self.ele
    def read(self,index):
        self.fi.seek(self.pos(index))
        i=self.fi.readline(self.ele)
        return i.split("\t")[0]
    def write(self,index,data):
        #print self.pos(index)," ",index
        pos=self.pos(index)
        if pos!=self.lpos+self.ele:
            self.fo.seek(pos)
        data=str(data)+"\t"
        self.fo.write(data)
        for i in xrange(self.ele-len(data)):
            self.fo.write(" ")
        self.lpos=pos
    def next(self,index):
        index+=1
        if (index>=self.size):
            index=0
        return index
    def maintain(self):
        self.lock.acquire()
        while not self.iq.empty():
            self.write(self.tail,self.iq.get())
            self.tail=self.next(self.tail)
            self.dsize+=1
        self.fo.flush()
        self.fi=open(self.filename,"rb")
        if self.oq.qsize()<self.buffer/10:
            while not self.oq.full() and self.dsize>0:
                self.dsize-=1
                self.oq.put(self.read(self.head))
                self.head=self.next(self.head)
        self.fi.close()
        self.lock.release()
    def put(self,data):
        data=str(data)
        if len(data)>self.ele:
            raise "Element Len Error"
        if self.iq.full():
            self.maintain()
        self.iq.put(data)
    def get(self):
        if (self.oq.empty()):
            self.maintain()
        if self.oq.empty():
            return None
        return self.oq.get()
    def empty(self):
        if self.iq.qsize()+self.oq.qsize()+self.dsize==0:
            return True
        else:
            return False
if __name__ == '__main__':
    s=HDQueue(1000,10,"hiqueue.txt",500)
    for i in xrange(2000):
        s.put(str(i))
        #k=s.get()
        #print k,len(k),s.head
    print "put finish"
    for i in xrange(2000):
        k=s.get()
        print k,len(k),s.head

硬盤級緩衝BitMap實現blog

#blog.jerrylab.info
#2016/5/6
#the class is unsafe in multiple thread environment 
#you need to ensure the method of set and get once at a time
#number -- the bitmap supported max number from 0
import os
import thread
class HDBitSet:
    def __init__(self,number,filename,buffer=100003):
        self.filename=filename
        self.fo=open(filename,"wb")
        self.fi=open(filename,"rb")
        self.change=False
        self.size=number
        self.buffer=buffer
        self.number=int(number/256+1)*32
        self.fo.truncate(self.number)
        self.mod=buffer
        self.hash=[-1]*buffer
        self.val=[0]*buffer
        self.number=number
    def read(self,block):
        if self.change:
            self.change=False
            self.fi.close()
            self.fo.flush()
            self.fi=open(self.filename,"rb")
        self.fi.seek(block)
        data=self.fi.readline(1)
        return  ord(data[0])
    def write(self,block,data):
        self.change=True
        self.fo.seek(block)
        self.fo.write(chr(data))
    def get(self,index):
        index=int(index)
        if index>self.size:
            return True
        block=int(index/8)
        m=index-block*8
        hs=block%self.buffer
        if self.hash[hs]==block:
            val=self.val[hs]
        else:
            val=self.read(block)
            self.hash[hs]=block
            self.val[hs]=val
        return bool(val&(1<<m))
    def set(self,index):
        index=int(index)
        f=True
        if index>self.size:
            return
        block=int(index/8)
        m=index-block*8
        hs=block%self.buffer
        if self.hash[hs]==block:
            val=self.val[hs]
        else:
            if self.hash[hs]>0:
                self.write(self.hash[hs],self.val[hs])
            val=self.read(block)
            self.hash[hs]=block
            f=False
        val=val|(1<<m)
        self.val[hs]=val
        return
if __name__ == '__main__':
    s=HDBitSet(1000000,"hibitset.txt",5)
    for i in xrange(15):
        print 1<<i
    for i in xrange(500):
        s.set(i)
    for i in xrange(51):
        s.set(i)
    print "set finish"    
    for i in xrange(100):
        if s.get(i+400):
            print "True",i
        else:
            print "False",i
相關文章
相關標籤/搜索