Event 事件 是線程間通訊的最簡單方法之一,主要用於線程同步。html
定義一個全局內置標誌Flag,若是Flag爲False,執行到 event.wait 時程序就會阻塞,若是Flag爲True,event.wait 便不會阻塞多線程
【注意若是處於阻塞狀態,無論在哪使得Flag爲true,wait都會繼續執行】app
set() 將標誌設置爲True,並通知全部處於阻塞狀態的線程恢復運行ui
clear() 將標誌設置爲Falseurl
isSet() 獲取內置標誌的狀態,返回 True 或者 Falsespa
wait(timeout) 若是標誌爲False,將使得線程阻塞,若是爲True,繼續運行,默認爲False線程
示例代碼--等通知代理
import threading import time event = threading.Event() def chihuoguo(name): # 等待事件,進入等待阻塞狀態 print '%s 已經啓動' % threading.currentThread().getName() print '小夥伴 %s 已經進入就餐狀態!'%name time.sleep(1) event.wait() # 收到事件後進入運行狀態 print '%s 收到通知了.' % threading.currentThread().getName() print '小夥伴 %s 開始吃咯!'%name threads = [] thread1 = threading.Thread(target=chihuoguo, args=("a", )) thread2 = threading.Thread(target=chihuoguo, args=("b", )) threads.append(thread1) threads.append(thread2) for thread in threads: thread.start() time.sleep(0.1) # 發送事件通知 print '主線程通知小夥伴開吃咯!' event.set()
示例代碼--互相通知code
import threading import time def producer(): print u'等人來買包子....' event.wait() #event.clear() print event.isSet() print u'chef:sb is coming for baozi...' print u'chef:making a baozi for sb...' time.sleep(5) print u'chef:你的包子好了...' event.set() def consumer(): print u'chenchao:去買包子....' event.set() time.sleep(2) print 'chenchao:waiting for baozi to be ready...' print event.wait() print u'chenchao:哎呀真好吃....' event = threading.Event() p = threading.Thread(target=producer,args=()) c = threading.Thread(target=consumer,args=()) p.start() c.start()
輸出server
等人來買包子.... chenchao:去買包子.... True chef:sb is coming for baozi... chef:making a baozi for sb... chenchao:waiting for baozi to be ready... True chenchao:哎呀真好吃.... chef:你的包子好了...
上面實現了一個生產者-消費者模式,顯然有錯誤,包子還沒作好就吃上了。
稍微細心的縷下思路就會發現,消費者中的wait並無阻塞線程,由於Flag此時爲True
解決方法:
1. 用另外一個 event2 來阻塞線程
2. 在生產者得到set時及時把Flag設置爲False 【取消生產者中 event.clear() 的註釋便可】
注意點1
import threading event = threading.Event() print(1) print(event.wait()) # 打印也會使線程阻塞 print(2)
注意點2
import time import threading def myfunc(): while 1: time.sleep(1) print(1) event = threading.Event() ts = [] if len(ts) > 2: event.wait() # 此時阻塞,已經開啓的線程將繼續運行 for i in range(12): t = threading.Thread(target=myfunc) t.start() ts.append(t)
多線程驗證代理ip的有效性
問題:計算機並不能無休止的增長線程,每臺計算機都有本身的上限
### 計算機可以執行的最大線程數 def myfunc(): time.sleep(2) count = 0 while 1: count += 1 print(count) t = threading.Thread(target=myfunc) t.start()
超過上限,就會報錯
thread.error: can't start new thread
思路:設置最大線程數,當啓動的線程超過最大限制時,阻塞,再也不生成新線程,而且持續跟蹤線程數,一旦減少或者小於某個閾值,就取消阻塞,繼續生成線程
class MyTestProxy(object): def __init__(self): self.sFile = 'ip.txt' self.dFile = 'alive.txt' self.url = 'https://www.qiushibaike.com/text/' self.threadmax = 500 # 最大線程數 self.threadmin = 400 # 最低線程數 self.timeout = 3 self.regex = re.compile('qiushibaike.com') self.aliveList = [] self.event = threading.Event() self.event2 = threading.Event() self.lock = threading.Lock() self.run() def run(self): with open(self.sFile, 'rb') as fp: lines = fp.readlines() self.ts = 0 # 初始化線程數 while lines: if self.ts > self.threadmax: self.event.clear() self.event.wait() # 超過設定線程就阻塞 line = lines.pop() t = threading.Thread(target=self.linkWithProxy, args=(line, )) t.start() self.lock.acquire() self.ts += 1 # 啓動一個就加1,ts 被其餘線程一直在更新,因此加鎖 self.lock.release() self.event2.wait() # 處理完畢後統一存儲 with open(self.dFile, 'w') as fp: for i in range(len(self.aliveList)): fp.write(self.aliveList[i]) def act(self): # 執行完一個線程就減1,由於同時執行,要加鎖 self.lock.acquire() self.ts -= 1 self.lock.release() print(self.ts) if self.ts < self.threadmin: self.event.set() # 小於最低線程取消阻塞 if self.ts == 0: self.event2.set() def linkWithProxy(self, line): # 爬蟲 server = line.strip() # print(server) protocol = line.split(':')[0] opener = urllib2.build_opener(urllib2.ProxyHandler({protocol:server})) urllib2.install_opener(opener) try: response = urllib2.urlopen(self.url, timeout=self.timeout) except: return else: try: str = response.read() if self.regex.search(str): # print(str) print('%s connect success'%server) print(response.geturl()) self.aliveList.append(line) except: return finally: self.act() if __name__ == '__main__': time.clock() tp = MyTestProxy() print(time.clock())
效率仍是不錯的
參考資料:
http://www.cnblogs.com/huxi/archive/2010/06/26/1765808.html