高效編程之 多線程Event

Event 簡介

Event 事件 是線程間通訊的最簡單方法之一,主要用於線程同步。html

 

處理機制

定義一個全局內置標誌Flag,若是Flag爲False,執行到 event.wait 時程序就會阻塞,若是Flag爲True,event.wait 便不會阻塞多線程

【注意若是處於阻塞狀態,無論在哪使得Flag爲true,wait都會繼續執行】app

 

接口

set()     將標誌設置爲True,並通知全部處於阻塞狀態的線程恢復運行ui

clear()       將標誌設置爲Falseurl

isSet()  獲取內置標誌的狀態,返回 True 或者 Falsespa

wait(timeout)   若是標誌爲False,將使得線程阻塞,若是爲True,繼續運行,默認爲False線程

 

示例代碼

示例代碼--等通知代理

import threading
import time

event = threading.Event()


def chihuoguo(name):
  # 等待事件,進入等待阻塞狀態
  print '%s 已經啓動' % threading.currentThread().getName()
  print '小夥伴 %s 已經進入就餐狀態!'%name
  time.sleep(1)
  event.wait()
  # 收到事件後進入運行狀態
  print '%s 收到通知了.' % threading.currentThread().getName()
  print '小夥伴 %s 開始吃咯!'%name


threads = []
thread1 = threading.Thread(target=chihuoguo, args=("a", ))
thread2 = threading.Thread(target=chihuoguo, args=("b", ))
threads.append(thread1)
threads.append(thread2)

for thread in threads:
  thread.start()

time.sleep(0.1)
# 發送事件通知
print '主線程通知小夥伴開吃咯!'
event.set()

 

示例代碼--互相通知code

import threading
import time

def producer():
    print u'等人來買包子....'
    event.wait()
    #event.clear()
    print event.isSet()
    print u'chef:sb is coming for baozi...'
    print u'chef:making a baozi for sb...'
    time.sleep(5)

    print u'chef:你的包子好了...'
    event.set()

def consumer():
    print u'chenchao:去買包子....'
    event.set()

    time.sleep(2)
    print 'chenchao:waiting for baozi to be ready...'
    print event.wait()
    print u'chenchao:哎呀真好吃....'

event = threading.Event()

p = threading.Thread(target=producer,args=())
c = threading.Thread(target=consumer,args=())
p.start()
c.start()

輸出server

等人來買包子....
chenchao:去買包子....
True
chef:sb is coming for baozi...
chef:making a baozi for sb...
chenchao:waiting for baozi to be ready...
True
chenchao:哎呀真好吃....
chef:你的包子好了...

上面實現了一個生產者-消費者模式,顯然有錯誤,包子還沒作好就吃上了。

稍微細心的縷下思路就會發現,消費者中的wait並無阻塞線程,由於Flag此時爲True

解決方法:

1. 用另外一個 event2 來阻塞線程

2. 在生產者得到set時及時把Flag設置爲False 【取消生產者中 event.clear() 的註釋便可】

 

注意點1 

import threading
event = threading.Event()
print(1)
print(event.wait())     # 打印也會使線程阻塞
print(2)

 

注意點2

import time
import threading

def myfunc():
    while 1:
        time.sleep(1)
        print(1)

event = threading.Event()

ts = []
if len(ts) > 2:
    event.wait()        # 此時阻塞,已經開啓的線程將繼續運行

for i in range(12):
    t = threading.Thread(target=myfunc)
    t.start()
    ts.append(t)

 

實戰案例

多線程驗證代理ip的有效性

 

問題:計算機並不能無休止的增長線程,每臺計算機都有本身的上限

### 計算機可以執行的最大線程數

def myfunc():
    time.sleep(2)

count = 0
while 1:
    count += 1
    print(count)
    t = threading.Thread(target=myfunc)
    t.start()

超過上限,就會報錯

thread.error: can't start new thread

 

思路:設置最大線程數,當啓動的線程超過最大限制時,阻塞,再也不生成新線程,而且持續跟蹤線程數,一旦減少或者小於某個閾值,就取消阻塞,繼續生成線程

class MyTestProxy(object):
    def __init__(self):
        self.sFile = 'ip.txt'
        self.dFile = 'alive.txt'
        self.url = 'https://www.qiushibaike.com/text/'
        self.threadmax = 500    # 最大線程數
        self.threadmin = 400    # 最低線程數
        self.timeout = 3
        self.regex = re.compile('qiushibaike.com')
        self.aliveList = []

        self.event = threading.Event()
        self.event2 = threading.Event()
        self.lock = threading.Lock()

        self.run()

    def run(self):
        with open(self.sFile, 'rb') as fp:
            lines = fp.readlines()
            self.ts = 0         # 初始化線程數
            while lines:
                if self.ts > self.threadmax:
                    self.event.clear()
                    self.event.wait()           # 超過設定線程就阻塞

                line = lines.pop()
                t = threading.Thread(target=self.linkWithProxy, args=(line, ))
                t.start()
                self.lock.acquire()
                self.ts += 1    # 啓動一個就加1,ts 被其餘線程一直在更新,因此加鎖
                self.lock.release()

            self.event2.wait()  # 處理完畢後統一存儲
            with open(self.dFile, 'w') as fp:
                for i in range(len(self.aliveList)):
                    fp.write(self.aliveList[i])

    def act(self):
        # 執行完一個線程就減1,由於同時執行,要加鎖
        self.lock.acquire()
        self.ts -= 1
        self.lock.release()
        print(self.ts)
        if self.ts < self.threadmin:
            self.event.set()        # 小於最低線程取消阻塞
        if self.ts == 0:
            self.event2.set()

    def linkWithProxy(self, line):
        # 爬蟲
        server = line.strip()
        # print(server)
        protocol = line.split(':')[0]
        opener = urllib2.build_opener(urllib2.ProxyHandler({protocol:server}))
        urllib2.install_opener(opener)

        try:
            response = urllib2.urlopen(self.url, timeout=self.timeout)
        except:
            return
        else:
            try:
                str = response.read()
                if self.regex.search(str):
                    # print(str)
                    print('%s connect success'%server)
                    print(response.geturl())
                    self.aliveList.append(line)
            except:
                return
        finally:
            self.act()


if __name__ == '__main__':
    time.clock()
    tp = MyTestProxy()
    print(time.clock())

 

效率仍是不錯的

 

 

參考資料:

http://www.cnblogs.com/huxi/archive/2010/06/26/1765808.html

相關文章
相關標籤/搜索