同步博客地址 :http://blog.jerrylab.info/2016/05/dashujuchuliyingpanjidehuanchongduiliehebitmap/數組
在這篇文章中,Jerry 提到了因爲抓取隊列過大,Bitmap位數過多致使內存沒法裝下這兩個數據結構的問題。所以我須要一種新的,可以將硬盤擴展爲可用內存的實現方法,以便在 不花錢升級配置的狀況下持久地運行TB級數據抓取任務。由此Jerry本身造了個硬盤級緩衝的輪子。這樣,就不再用擔憂因內存不夠而沒法判重或者是沒法 調度的問題了。不過缺點也是顯而易見的,運行速度會變慢。但這樣至少給本來內存溢出的項目提供了一種可行的解決方案。緩存
仿 照高速緩存到內存的緩衝機制,將這種機制再現到硬盤與內存便可實現硬盤級的緩存。由此咱們設置了兩種結構。對於BitMap來講,因爲它對硬盤是採起隨機 訪問的方式,即訪問的內容和順序是不固定的,所以咱們要仿照高速緩存中的快表對硬盤實現內存中哈希表緩存。對於隊列來講,因爲它每次的存取都一定發生在隊 首或者隊尾,因此咱們只須要對隊首的輸出和隊尾的輸入作內存緩存便可。數據結構
具體的來講,咱們把操做系統中的一個文件抽象地看做是一塊連續的存儲區域,抑或是一個超大的數組(雖然在物理結構上並非這樣,但邏輯結構能夠這樣看)。所以能夠對每個字節進行編號。因而乎,這些編號就能夠看做是文件這個低速內存的地址了。ide
由 於硬盤訪問延遲很是高,因此咱們須要合理的緩衝機制。對於BitMap,咱們把文件中的每個比特當作一個數存在與否的標誌(相對應原先的版本,這些比特 是存在內存中的。在內存中,創建一個哈希表,將硬盤地址多對一地映射到哈希表的項。對最近讀寫的內容作緩存,即記錄讀寫的真實位置和值。所以若是連續讀寫 相同的位置,就不須要屢次讀取或寫入硬盤。當哈希表的項發生衝突時,即對兩個不一樣的地址讀寫,但它們映射到的確是相同的哈希表項時,咱們將髒數據(僅在內 存中修改過的數據)寫回到硬盤中。ui
對於,隊列,考慮到硬盤讀寫連續地址的速度比較快,所以咱們每次都的在頭尾讀寫指定的數據量到內存緩衝數組中,防止出現屢次讀寫小數據的事情。操作系統
硬盤級緩衝隊列實現code
# from blog.Jerrylab.info # 2016/5/6 # size -- the queue max size # filename -- the file where the queue stored # ele -- the max size of one element of queue # buffer -- the output input buffer size # the class is safe in multiple thread environment from Queue import Queue import thread import time class HDQueue: def __init__(self,size,ele,filename,buffer=10000): self.buffer=buffer ele+=1 self.ele=ele self.filename=filename self.size=size self.fo=open(filename,"wb") self.fs=(size+5)*ele+512;#512 head structure self.iq=Queue(maxsize=buffer) self.oq=Queue(maxsize=buffer) self.fo.truncate(self.fs) self.tail=1 self.head=1 self.dsize=0 self.lpos=-111 self.lock=thread.allocate_lock() def pos(self,index): return 512+index*self.ele def read(self,index): self.fi.seek(self.pos(index)) i=self.fi.readline(self.ele) return i.split("\t")[0] def write(self,index,data): #print self.pos(index)," ",index pos=self.pos(index) if pos!=self.lpos+self.ele: self.fo.seek(pos) data=str(data)+"\t" self.fo.write(data) for i in xrange(self.ele-len(data)): self.fo.write(" ") self.lpos=pos def next(self,index): index+=1 if (index>=self.size): index=0 return index def maintain(self): self.lock.acquire() while not self.iq.empty(): self.write(self.tail,self.iq.get()) self.tail=self.next(self.tail) self.dsize+=1 self.fo.flush() self.fi=open(self.filename,"rb") if self.oq.qsize()<self.buffer/10: while not self.oq.full() and self.dsize>0: self.dsize-=1 self.oq.put(self.read(self.head)) self.head=self.next(self.head) self.fi.close() self.lock.release() def put(self,data): data=str(data) if len(data)>self.ele: raise "Element Len Error" if self.iq.full(): self.maintain() self.iq.put(data) def get(self): if (self.oq.empty()): self.maintain() if self.oq.empty(): return None return self.oq.get() def empty(self): if self.iq.qsize()+self.oq.qsize()+self.dsize==0: return True else: return False if __name__ == '__main__': s=HDQueue(1000,10,"hiqueue.txt",500) for i in xrange(2000): s.put(str(i)) #k=s.get() #print k,len(k),s.head print "put finish" for i in xrange(2000): k=s.get() print k,len(k),s.head
硬盤級緩衝BitMap實現blog
#blog.jerrylab.info #2016/5/6 #the class is unsafe in multiple thread environment #you need to ensure the method of set and get once at a time #number -- the bitmap supported max number from 0 import os import thread class HDBitSet: def __init__(self,number,filename,buffer=100003): self.filename=filename self.fo=open(filename,"wb") self.fi=open(filename,"rb") self.change=False self.size=number self.buffer=buffer self.number=int(number/256+1)*32 self.fo.truncate(self.number) self.mod=buffer self.hash=[-1]*buffer self.val=[0]*buffer self.number=number def read(self,block): if self.change: self.change=False self.fi.close() self.fo.flush() self.fi=open(self.filename,"rb") self.fi.seek(block) data=self.fi.readline(1) return ord(data[0]) def write(self,block,data): self.change=True self.fo.seek(block) self.fo.write(chr(data)) def get(self,index): index=int(index) if index>self.size: return True block=int(index/8) m=index-block*8 hs=block%self.buffer if self.hash[hs]==block: val=self.val[hs] else: val=self.read(block) self.hash[hs]=block self.val[hs]=val return bool(val&(1<<m)) def set(self,index): index=int(index) f=True if index>self.size: return block=int(index/8) m=index-block*8 hs=block%self.buffer if self.hash[hs]==block: val=self.val[hs] else: if self.hash[hs]>0: self.write(self.hash[hs],self.val[hs]) val=self.read(block) self.hash[hs]=block f=False val=val|(1<<m) self.val[hs]=val return if __name__ == '__main__': s=HDBitSet(1000000,"hibitset.txt",5) for i in xrange(15): print 1<<i for i in xrange(500): s.set(i) for i in xrange(51): s.set(i) print "set finish" for i in xrange(100): if s.get(i+400): print "True",i else: print "False",i