簡單實現併發:python concurrent模塊

可使用python 3中的concurrent模塊,若是python環境是2.7的話,須要下載https://pypi.python.org/packages/source/f/futures/futures-2.1.6.tar.gz#md5=cfab9ac3cd55d6c7ddd0546a9f22f453html

此futures包便可食用concurrent模塊。python

官方文檔:http://pythonhosted.org//futures/編程

 

對於python來講,做爲解釋型語言,Python的解釋器必須作到既安全又高效。咱們都知道多線程編程會遇到的問題,解釋器要留意的是避免在不一樣的線程操做內部共享的數據,同時它還要保證在管理用戶線程時保證老是有最大化的計算資源。而python是經過使用全局解釋器鎖來保護數據的安全性:安全

python代碼的執行由python虛擬機來控制,即Python先把代碼(.py文件)編譯成字節碼(字節碼在Python虛擬機程序裏對應的是PyCodeObject對象,.pyc文件是字節碼在磁盤上的表現形式),交給字節碼虛擬機,而後虛擬機一條一條執行字節碼指令,從而完成程序的執行。python在設計的時候在虛擬機中,同時只能有一個線程執行。一樣地,雖然python解釋器中能夠運行多個線程,但在任意時刻,只有一個線程在解釋器中運行。而對python虛擬機的訪問由全局解釋器鎖來控制,正是這個鎖能保證同一時刻只有一個線程在運行。在多線程的環境中,python虛擬機按一下方式執行:多線程

1,設置GIL(global interpreter lock).併發

2,切換到一個線程執行。app

3,運行:less

    a,指定數量的字節碼指令。異步

    b,線程主動讓出控制(能夠調用time.sleep(0))。函數

4,把線程設置爲睡眠狀態。

5,解鎖GIL.

6,再次重複以上步驟。

GIL的特性,也就致使了python不能充分利用多核cpu。而對面向I/O的(會調用內建操做系統C代碼的)程序來講,GIL會在這個I/O調用以前被釋放,以容許其餘線程在這個線程等待I/O的時候運行。若是線程併爲使用不少I/O操做,它會在本身的時間片一直佔用處理器和GIL。這也就是所說的:I/O密集型python程序比計算密集型的程序更能充分利用多線程的好處。

總之,不要使用python多線程,使用python多進程進行併發編程,就不會有GIL這種問題存在,而且也能充分利用多核cpu。

 

一,提供的功能

提供了多線程和多進程的併發功能

二,基本方法

class   concurrent.futures.Executor (注:Executor爲ThreadPoolExecutor或者ProcessPoolExecutor)

提供的方法以下:

    submit(fn, *args, **kwargs)

    fn:爲須要異步執行的函數

    args,kwargs:爲給函數傳遞的參數

    例:

#!/bin/env python
#coding:utf-8
import time,re
import os,datetime
from concurrent import futures

def wait_on_b():
   print 5
   time.sleep(2)

def wait_on_a():
   print 6
   time.sleep(2)


ex = futures.ThreadPoolExecutor(max_workers=2)
ex.submit(wait_on_b)
ex.submit(wait_on_a)

wait_on_a和wait_on_b函數會同時執行,由於使用了2個worker

#####################################

    map(func, *iterables, timeout=None)

    此map函數和python自帶的map函數功能相似,只不過concurrent模塊的map函數從迭代器得到參數後異步執行。而且,每個異步操做,能用timeout參數來設置超時時間,timeout的值能夠是int或float型,若是操做timeout的話,會raisesTimeoutError。若是timeout參數不指定的話,則不設置超時間。

    func:爲須要異步執行的函數

    iterables:能夠是一個能迭代的對象,例如列表等。每一次func執行,會從iterables中取參數。

    timeout:設置每次異步操做的超時時間

    例:

#!/bin/env python
#coding:utf-8
import time,re
import os,datetime
from concurrent import futures

data = [‘1‘,‘2‘]

def wait_on(argument):
   print argument
   time.sleep(2)
   return ‘ok‘

ex = futures.ThreadPoolExecutor(max_workers=2)
for i in ex.map(wait_on,data):
   print i

map函數異步執行完成以後,結果也是list,數據須要從list中取出

######################################

submit函數和map函數,根據須要,選一個使用便可。

    shutdown(wait=True)

    此函數用於釋放異步執行操做後的系統資源。

    If wait is True then this method will not return until all the pending futures are done executing and the resources associated with the executor have been freed. If wait is False then this method will return immediately and the resources associated with the executor will be freed when all pending futures are done executing. Regardless of the value of wait, the entire Python program will not exit until all pending futures are done executing.

You can avoid having to call this method explicitly if you use the with statement, which will shutdown the Executor (waiting as if Executor.shutdown() were called with wait set to True):

with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, ‘src1.txt‘, ‘dest1.txt‘)

三,完整的concurrent例子:

#!/bin/env python
#coding:utf-8
import time,re,fcntl
import os,datetime
from concurrent import futures

count_list = list()
MinuteNum = 1
StartTime = datetime.datetime(2014, 4, 16, 19, 31, 0, 484870)
NowTime = datetime.datetime.now()
os.system(‘:>new.txt‘)

f_new = open(‘new.txt‘,‘a‘)

def test(CountTimeFormat):
   f = open(‘push_slave.stdout‘,‘r‘)
   for line in f.readlines():
       if re.search(CountTimeFormat,line):

           #得到文件專用鎖
           fcntl.flock(f_new, fcntl.LOCK_EX)
           f_new.writelines(line)
           f_new.flush()

           #釋放文件鎖
           fcntl.flock(f_new, fcntl.LOCK_UN)
           break

while 1:
   AfterOneMinute = datetime.timedelta(minutes=MinuteNum)
   CountTime = AfterOneMinute+StartTime
   CountTimeFormat = CountTime.strftime(‘%Y-%m-%d %H:%M‘)
   MinuteNum = MinuteNum+1
   count_list.append(CountTimeFormat)
   if CountTimeFormat == "2014-04-23 16:00":
       break

def exec_cmd():
   with futures.ProcessPoolExecutor(max_workers=24) as executor:
       dict(( executor.submit(test, times), times) for times in count_list)

if __name__ == ‘__main__‘:
   exec_cmd()
   f_new.close()
相關文章
相關標籤/搜索