線程:python
線程能夠理解成輕量的進程,實際在linux二者幾乎沒有區別,惟一的區別是線程並不產生新的地址空間和資源。linux
Threading用於提供線程的相關操做,線程是應用程序中工做的最小單元。程序員
import threading import time def show(arg): time.sleep(1) print('thread'+str(arg)) for i in range(10): t = threading.Thread(target=show, args=(i,)) t.start()
上述代碼建立了10個前臺線程,而後控制器就交給了cpu,CPU根據內部算法進行調度,算法
更多的方法:shell
若是是後臺線程,主線程執行過程當中,後臺線程也在進行,主線程執行完畢後,後臺線程不論成功與否,均中止編程
若是是前臺線程,主線程執行過程當中,前臺線程也在進行,主線程執行完畢後,等待前臺線程也執行完成後,程序中止設計模式
線程鎖:緩存
因爲線程之間是進行隨機調度,而且每一個線程可能只執行n條執行以後,CPU接着執行其餘線程。因此,可能出現以下問題:安全
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading import time gl_num = 0 def show(arg): global gl_num time.sleep(1) gl_num +=1 print gl_num for i in range(10): t = threading.Thread(target=show, args=(i,)) t.start() print 'main thread stop'
#!/usr/bin/env python #coding:utf-8 import threading import time gl_num = 0 #生成鎖 lock = threading.RLock() def Func(): lock.acquire() #給線程上鎖 global gl_num gl_num +=1 time.sleep(1) print gl_num lock.release() #釋放線程鎖 for i in range(10): #建立10個線程 t = threading.Thread(target=Func) t.start()
event網絡
python線程的事件用於主線程控制其餘線程的執行,線程主要提供了三個方法:set、wait、clear
事件處理的機制:全局定義了一個"flag",值爲False,,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading def do(event): print 'start' event.wait() print 'execute' event_obj = threading.Event() for i in range(10): t = threading.Thread(target=do, args=(event_obj,)) t.start() event_obj.clear() inp = input('input:') if inp == 'true': event_obj.set()
使用線程隊列
當多個線程須要共享數據的或者資源的時候,可能會使線程的使用變得複雜,線程的模塊提供了許多同步原語,包括信號量、條件變量、事件和鎖。當這些選項存在時,最佳實踐是轉而關注於使用隊列,相比較而言,隊列更容易處理,而且更容易處理,而且可使線程編程更加安全,由於他們可以有效地傳遞單個線程資源的全部訪問,並支持更加清晰的、可讀性更高的設計模式。
url獲取序列:
import urllib2 import time hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com", "http://ibm.com", "http://apple.com"] start = time.time() #grabs urls of hosts and prints first 1024 bytes of page for host in hosts: url = urllib2.urlopen(host) print url.read(1024) print("Elapsed Time: %s" % (time.time() - start))
簡單版本的線程池:
#!/usr/bin/env python # -*- coding:utf-8 -*- import Queue import threading class ThreadPool(object): def __init__(self, max_num=20): self.queue = queue.Queue(max_num) for i in xrange(max_num): self.queue.put(threading.Thread) def get_thread(self): return self.queue.get() def add_thread(self): self.queue.put(threading.Thread) pool = ThreadPool(10) def func(arg, p): print arg import time time.sleep(2) p.add_thread() for i in xrange(30): thread = pool.get_thread() t = thread(target=func, args=(i, pool)) t.start()
import queue import threading import contextlib StopEvent = object() class ThreadPool(object): def __init__(self, max_num): self.q = queue.Queue(max_num) self.max_num = max_num self.cancel = False self.generate_list = [] self.free_list = [] def run(self, func, args, callback=None): """ 線程池執行一個任務 :param func: 任務函數 :param args: 任務函數所需參數 :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值(默認爲None,即:不執行回調函數) :return: 若是線程池已經終止,則返回True不然None """ if self.cancel: return True if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args, callback,) self.q.put(w) def generate_thread(self): """ 建立一個線程 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循環去獲取任務函數並執行任務函數 """ current_thread = threading.currentThread self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func, arguments, callback = event try: result = func(*arguments) success = True except Exception, e: success = False result = None if callback is not None: try: callback(success, result) except Exception, e: pass with self.worker_state(self.free_list, current_thread): event = self.q.get() else: self.generate_list.remove(current_thread) def terminal(self): """ 終止線程池中的全部線程 """ self.cancel = True full_size = len(self.generate_list) while full_size: self.q.put(StopEvent) full_size -= 1 @contextlib.contextmanager def worker_state(self, state_list, worker_thread): """ 用於記錄線程中正在等待的線程數 """ state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread)
#!/usr/bin/env python # -*- coding:utf-8 -*- import queue import threading import contextlib import time StopEvent = object() class ThreadPool(object): def __init__(self, max_num): self.q = queue.Queue() self.max_num = max_num self.terminal = False self.generate_list = [] self.free_list = [] def run(self, func, args, callback=None): """ 線程池執行一個任務 :param func: 任務函數 :param args: 任務函數所需參數 :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值(默認爲None,即:不執行回調函數) :return: 若是線程池已經終止,則返回True不然None """ if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args, callback,) self.q.put(w) def generate_thread(self): """ 建立一個線程 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循環去獲取任務函數並執行任務函數 """ current_thread = threading.currentThread self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func, arguments, callback = event try: result = func(*arguments) success = True except Exception as e: success = False result = None if callback is not None: try: callback(success, result) except Exception as e: pass with self.worker_state(self.free_list, current_thread): if self.terminal: event = StopEvent else: event = self.q.get() else: self.generate_list.remove(current_thread) def close(self): """ 執行完全部的任務後,全部線程中止 """ full_size = len(self.generate_list) while full_size: self.q.put(StopEvent) full_size -= 1 def terminate(self): """ 不管是否還有任務,終止線程 """ self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.empty() @contextlib.contextmanager def worker_state(self, state_list, worker_thread): """ 用於記錄線程中正在等待的線程數 """ state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread) # How to use pool = ThreadPool(5) def callback(status, result): # status, execute action status # result, execute action return value pass def action(i): time.sleep(1) print(i) for i in range(30): ret = pool.run(action, (i,), callback) # pool.close() # pool.terminate()
#!/usr/bin/env python # -*- coding:utf-8 -*- import queue import threading import contextlib import time StopEvent = object() class ThreadPool(object): def __init__(self, max_num, max_task_num = None): if max_task_num: self.q = queue.Queue(max_task_num) else: self.q = queue.Queue() self.max_num = max_num self.cancel = False self.terminal = False self.generate_list = [] self.free_list = [] def run(self, func, args, callback=None): """ 線程池執行一個任務 :param func: 任務函數 :param args: 任務函數所需參數 :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值(默認爲None,即:不執行回調函數) :return: 若是線程池已經終止,則返回True不然None """ if self.cancel: return if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args, callback,) self.q.put(w) def generate_thread(self): """ 建立一個線程 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循環去獲取任務函數並執行任務函數 """ current_thread = threading.currentThread self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func, arguments, callback = event try: result = func(*arguments) success = True except Exception as e: success = False result = None if callback is not None: try: callback(success, result) except Exception as e: pass with self.worker_state(self.free_list, current_thread): if self.terminal: event = StopEvent else: event = self.q.get() else: self.generate_list.remove(current_thread) def close(self): """ 執行完全部的任務後,全部線程中止 """ self.cancel = True full_size = len(self.generate_list) while full_size: self.q.put(StopEvent) full_size -= 1 def terminate(self): """ 不管是否還有任務,終止線程 """ self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.empty() @contextlib.contextmanager def worker_state(self, state_list, worker_thread): """ 用於記錄線程中正在等待的線程數 """ state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread) # How to use pool = ThreadPool(5) def callback(status, result): # status, execute action status # result, execute action return value pass def action(i): print(i) for i in range(30): ret = pool.run(action, (i,), callback) time.sleep(5) print(len(pool.generate_list), len(pool.free_list)) print(len(pool.generate_list), len(pool.free_list)) # pool.close() # pool.terminate()
queue模塊:
queue就是對隊列,它是線程安全的
舉例來講,咱們去肯德基吃飯。廚房是給咱們作飯的地方,前臺負責把廚房作好的飯賣給顧客,顧客則去前臺領取作好的飯。這裏的前臺就至關於咱們的隊列。
這個模型也叫生產者-消費者模型
import queue q = queue.Queue(maxsize=0) # 構造一個先進顯出隊列,maxsize指定隊列長度,爲0 時,表示隊列長度無限制。 q.join() # 等到隊列爲kong的時候,在執行別的操做 q.qsize() # 返回隊列的大小 (不可靠) q.empty() # 當隊列爲空的時候,返回True 不然返回False (不可靠) q.full() # 當隊列滿的時候,返回True,不然返回False (不可靠) q.put(item, block=True, timeout=None) # 將item放入Queue尾部,item必須存在,能夠參數block默認爲True,表示當隊列滿時,會等待隊列給出可用位置, 爲False時爲非阻塞,此時若是隊列已滿,會引起queue.Full 異常。 可選參數timeout,表示 會阻塞設置的時間,事後, 若是隊列沒法給出放入item的位置,則引起 queue.Full 異常q.get(block=True, timeout=None) # 移除並返回隊列頭部的一個值,可選參數block默認爲True,表示獲取值的時候,若是隊列爲空,則阻塞,爲False時,不阻塞, 若此時隊列爲空,則引起 queue.Empty異常。 可選參數timeout,表示會阻塞設置的時候,事後,若是隊列爲空,則引起Empty異常。q.put_nowait(item) # 等效於 put(item,block=False)q.get_nowait() # 等效於 get(item,block=False)
生產者--消費者:
#!/usr/bin/env python import Queue import threading message = Queue.Queue(10) def producer(i): while True: message.put(i) def consumer(i): while True: msg = message.get() for i in range(12): t = threading.Thread(target=producer, args=(i,)) t.start() for i in range(10): t = threading.Thread(target=consumer, args=(i,)) t.start()
multiprocessing模塊
multiprocessing是python的多進程管理包,和threading.Thread相似。直接從側面用subprocesses替換線程使用GIL的方式,因爲這一點,multiprocessing模塊可讓程序員在給定的機器上充分的利用CPU。
在multiprocessing中,經過建立Process對象生成進程,而後調用它的start()方法,
from multiprocessing import Process def f(name): print('hello', name) if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() p.join()
進程:
進程間的數據共享
在使用併發設計的時候最好儘量的避免共享數據,尤爲是在使用多進程的時候。 若是你真有須要 要共享數據, multiprocessing提供了兩種方式。
數據能夠用Value或Array存儲在一個共享內存地圖裏,以下:
from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:])
輸出:
3.1415927 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
建立num和arr時,「d」和「i」參數由Array模塊使用的typecodes建立:「d」表示一個雙精度的浮點數,「i」表示一個有符號的整數,這些共享對象將被線程安全的處理。Array(‘i’, range(10))中的‘i’參數:
‘c’: ctypes.c_char ‘u’: ctypes.c_wchar ‘b’: ctypes.c_byte ‘B’: ctypes.c_ubyte‘h’: ctypes.c_short ‘H’: ctypes.c_ushort ‘i’: ctypes.c_int ‘I’: ctypes.c_uint ‘l’: ctypes.c_long, ‘L’: ctypes.c_ulong ‘f’: ctypes.c_float ‘d’: ctypes.c_double
from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l)
輸出:
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Server process manager比 shared memory 更靈活,由於它能夠支持任意的對象類型。另外,一個單獨的manager能夠經過進程在網絡上不一樣的計算機之間共享,不過他比shared memory要慢。
進程池:
Pool類描述了一個工做進程池,他有幾種不一樣的方法讓任務卸載工做進程。
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止咱們能夠用Pool類建立一個進程池, 展開提交的任務給進程池。 例:
from multiprocessing import Pool import time def myFun(i): time.sleep(2) return i+100 def end_call(arg): print("end_call",arg) p = Pool(5) # print(p.map(myFun,range(10))) for i in range(10): p.apply_async(func=myFun,args=(i,),callback=end_call) print("end")p.close() p.join()
from multiprocessing import Pool, TimeoutError import time import os def f(x): return x*x if __name__ == '__main__': # 建立4個進程 with Pool(processes=4) as pool: # 打印 "[0, 1, 4,..., 81]" print(pool.map(f, range(10))) # 使用任意順序輸出相同的數字, for i in pool.imap_unordered(f, range(10)): print(i) # 異步執行"f(20)" res = pool.apply_async(f, (20,)) # 只運行一個進程 print(res.get(timeout=1)) # 輸出 "400" # 異步執行 "os.getpid()" res = pool.apply_async(os.getpid, ()) # 只運行一個進程 print(res.get(timeout=1)) # 輸出進程的 PID # 運行多個異步執行可能會使用多個進程 multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)] print([res.get(timeout=1) for res in multiple_results]) # 是一個進程睡10秒 res = pool.apply_async(time.sleep, (10,)) try: print(res.get(timeout=1)) except TimeoutError: print("發現一個 multiprocessing.TimeoutError異常") print("目前,池中還有其餘的工做") # 退出with塊中已經中止的池 print("Now the pool is closed and no longer available")
class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
apply(func[, args[, kwds]]) :使用arg和kwds參數調用func函數,結果返回前會一直阻塞,因爲這個緣由,apply_async()更適合併發執行,另外,func函數僅被pool中的一個進程運行。
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一個變體,會返回一個結果對象。若是callback被指定,那麼callback能夠接收一個參數而後被調用,當結果準備好回調時會調用callback,調用失敗時,則用error_callback替換callback。 Callbacks應被當即完成,不然處理結果的線程會被阻塞。
close() : 阻止更多的任務提交到pool,待任務完成後,工做進程會退出。
terminate() : 無論任務是否完成,當即中止工做進程。在對pool對象進程垃圾回收的時候,會當即調用terminate()。
join() : wait工做線程的退出,在調用join()前,必須調用close() or terminate()。這樣是由於被終止的進程須要被父進程調用wait(join等價與wait),不然進程會成爲殭屍進程。
map(func, iterable[, chunksize])¶
map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
imap(func, iterable[, chunksize])¶
imap_unordered(func, iterable[, chunksize])
starmap(func, iterable[, chunksize])¶
starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
協程
協程又叫微線程,從技術的角度來講,「協程就是你能夠暫停執行的函數」。若是你把它理解成「就像生成器同樣」,那麼你就想對了。 線程和進程的操做是由程序觸發系統接口,最後的執行者是系統;協程的操做則是程序員。
協程存在的意義:對於多線程應用,CPU經過切片的方式來切換線程間的執行,線程切換時須要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。
協程的適用場景:當程序中存在大量不須要CPU的操做時(IO),適用於協程;
Event Loop是一種等待程序分配時間或消息的編程架構。簡單的說就是 當事件A發生的時候,咱們就去執行事件B。 最簡單的例子就是:當咱們瀏覽網頁的時候,咱們點擊頁面的某個元素,這個點擊事件會被 JavaScript 捕捉到,而後 JavaScript 就會檢查這個事件是否綁定了onclick()回調函數來處理這個事件,只要綁定了,onclick()回調函數就會被執行。
event loop是協程執行的控制點, 若是你但願執行協程, 就須要用到它們。
event loop提供了以下的特性:
協程示例:
import asyncio async def cor1(): print("COR1 start") await cor2() print("COR1 end") async def cor2(): print("COR2") loop = asyncio.get_event_loop() loop.run_until_complete(cor1()) loop.close()
最後三行是重點:
subprocess模塊
經過使用subprocess模塊能夠建立新的進程,鏈接到他們的輸入/輸出/錯誤管道,並獲取他們的返回值。 該模塊計劃替代及一箇舊的模塊的方法:
os.system os.spawn*
在全部用例調用subprocess時推薦使用run()方法,更高級的用例,能夠直接使用subprocess.Popen接口。
在Python3.5增長的。
subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, timeout=None, check=False)
run()默認不會捕捉到標準輸出和標準錯誤輸出,要捕捉的話,能夠爲標準輸出和標準錯誤輸出指定subprocess.PIPE(一個特殊值,可被用於Popen的stdin, stdout或 stderr參數,表示一個標準流的管道應該被打開, Popen.communicate()用的最多)。
subprocess.run(["ls","-l"]) ;
當shell=True時,args能夠是一個字符串。subprocess.run("ls -l",shell=True)
。>>> ret = subprocess.run(["ls", "-l"]) # doesn't capture output CompletedProcess(args=['ls', '-l'], returncode=0) >>> print(ret.stdout) None >>> subprocess.run("exit 1", shell=True, check=True) Traceback (most recent call last): ... subprocess.CalledProcessError: Command 'exit 1' returned non-zero exit status 1 >>> ret1 = subprocess.run(["ls", "-l", "/dev/null"], stdout=subprocess.PIPE) CompletedProcess(args=['ls', '-l', '/dev/null'], returncode=0, stdout=b'crw-rw-rw- 1 root root 1, 3 Jan 23 16:23 /dev/null\n') >>> print(ret.stdout) b'crw-rw-rw- 1 root root 1, 3 6\xe6\x9c\x88 8 06:50 /dev/null\n'
call()方法等價於:run(..., check=True)
和run()方法類因此,只是不支持input參數和check參數;
注意: 不要在這個方法裏使用stdout=PIPE 或 stderr=PIPE,當到一個管道的輸出填滿系統的管道緩存時,子進程會被阻塞。
subprocess.check_output(args, *, stdin=None, stderr=None, shell=False, universal_newlines=False, timeout=None)
check_call()方法等價於: run(..., check=True, stdout=PIPE).stdout
3.1新增,3.3時增長了timeout參數,3.4時增長了對關鍵字參數的支持
內部調用的是run()方法,可是會捕捉到stdout
>>> ret = subprocess.check_output(["ls", "-l", "/dev/null"]) >>> print(ret) b'crw-rw-rw- 1 root root 1, 3 6\xe6\x9c\x88 8 06:50 /dev/null\n'
上面的四個方法本質上調用的都是subprocess中的Popen類。
Popen對象都有如下方法:
poll() : 檢查子進程是否已經終止,返回一個returncode,至關於exit code。
wait() : 等待子進程終止,返回一個returncode
communicate(input=None) :和進程進行交互:發送數據到stdin;從stdout和stderr讀取數據,直到讀取完。等待進程終止。可選參數input會傳遞數據給子進程,當input=None事,沒有數據傳遞給子進程。 communicate() 返回一個元組 (stdout, stderr). 注意: 讀取的數據在內存的buffer中,因此當數據大小大於buffer或者沒有限制時,不要使用這個方法。