一個應用程序,能夠多進程、也能夠多線程.
一個python腳本,默認是單進程,單線程的。
I/O操做(音頻、視頻、顯卡操做),不佔用CPU,因此:java
python中有個全局解釋器鎖,叫GIL(全稱Global Interpreter Lock
),致使一個進程只能由一個線程讓CPU去調度,但在java c#可使用多個線程。
多線程,多進程的目的,是爲了提升併發,I/O密集型用多線程,計算密集型,用多進程。python
咱們來看看怎麼建立多線程:git
def f1(args): print(args) import threading t=threading.Thread(target=f1,args=(123,)) #建立一個線程,target表示線程執行的目標,args表示參數 t.start() #並不表明當前當即被執行,系統來決定 f1(111)
以上代碼結果print順序會隨機!程序員
更多的方法:github
import time def f1(args): time.sleep(5) print(args) import threading t1=threading.Thread(target=f1,args=(123,)) t1.setDaemon(True) #表示主線程不等待子線程 t.start() #並不表明當前被當即被執行,系統來決定 f1(111) t.join(2) #表示主程序執行到此,等待...直到子線程執行完畢 print(222222) print(333333)
下面看下run方法:c#
class MyThread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) self.num = num def run(self):#定義每一個線程要運行的函數 print("running on number:%s" %self.num) time.sleep(3) if __name__ == '__main__': t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start()
線程有兩種調用方式:數組
簡單調用方式:自定義一個繼承threading.thread的子類,經過自定義類的對象調用網絡
import threading def f1(arg): print(arg) t = threading.Thread(target=f1,args=(123,)) t.run()
自定義類的調用方式多線程
import threading def f2(arg): print(arg) class MyThread(threading.Thread): def __init__(self,func,args): self.func=func self.args=args super(MyThread,self).__init__() def run(self): self.func(self.args) obj=MyThread(f2,123) obj.run()
因爲線程之間進行隨機調度,而且每一個線程可能只執行n條操做後,當多個線程同時修改同一條數據時,可能會出現髒數據:同一時刻只能容許指定的線程數執行操做.
python中的線程鎖有Lock
, RLock
兩種,其中RLock用的較多,由於支持多層嵌套的方式,Lock用的較少,不支持多層嵌套鎖.併發
def func(l): global NUM #上鎖 l.acquire() NUM-=1 time.sleep(2) print(NUM) #開鎖 l.release() lock=threading.RLock() #放行幾個線程出去執行 for i in range(30): t=threading.Thread(target=func,args=(lock,)) t.start()
若是不使用線程鎖,上面程序會有30個線程同時執行,結果爲30個-20
semaphore,同時容許指定數量的線程更改數據,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去。
import threading,time NUM=10 def func(l): global NUM #上鎖 l.acquire() NUM-=1 time.sleep(2) print(NUM) #開鎖 l.release() lock=threading.BoundedSemaphore(3) #放行幾個線程出去執行 for i in range(30): t=threading.Thread(target=func,args=(lock,)) t.start()
從上面兩個代碼對比,咱們會發現,semaphore若是設置爲1時,也可實現信號鎖的功能.
python線程中的event主要用於讓主線程控制其子線程的執行方式(有點相似交警控制紅綠燈),event主要提供三個方法:set wait clear
事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。
import threading def func(i,e): print(i) e.wait() #檢測是什麼燈 print(i+100) event=threading.Event() for i in range(10): t = threading.Thread(target=func,args=(i,event)) t.start() event.clear() #默認是紅燈 inp = input('>>>') if inp == '1': event.set() #設置成綠燈
使線程等待,當條件成立時,釋放線程執行.
import threading def func(i,con): print(i) con.acquire() #配合,固定格式,線程hold住 con.wait() print(i+100) con.release() c=threading.Condition() for i in range(10): t = threading.Thread(target=func,args=(i,c,)) t.start() while True: inp = input('>>>|') if inp == 'q': break c.acquire() #如下都是固定格式 c.notify(int(inp)) c.release()
例子1中,寫到函數func中的:c.acquire(),c.notify(args),c.release()
是固定格式.
例子2:wait_for
import threading def condition(): ret=False r = input('>>|') if r == 'true': ret= True else: ret=False return ret def func(i,con): print(i) con.acquire() #配合,固定格式,線程hold住 con.wait_for(condition) print(i+100) con.release() c=threading.Condition() for i in range(10): t = threading.Thread(target=func,args=(i,c,)) t.start()
其中例子2中,con.acquire(),con.wait_for(condition)
是固定格式配合使用,攔截線程,con.release()釋放線程.
定時器,延遲多長時間(單位:秒)執行
import threading def hello(): print('hello,world!!') t=threading.Timer(1,hello) t.start()
python的線程池有兩種實現方式,咱們先來看一個比較簡單的實現方式.
實現思路:
線程類
線程類
import threading,time,queue class ThreadPool: def __init__(self,maxsize): self.maxsize=maxsize self._q=queue.Queue(maxsize) for i in range(maxsize): self._q.put(threading.Thread) def get_thread(self): return self._q.get() def add_thread(self): self._q.put(threading.Thread) pool=ThreadPool(5) def task(arg,p): print(arg) time.sleep(1) p.add_thread() for i in range(100): t = pool.get_thread() #線程池中沒有線程爲阻塞狀態 obj=t(target=task,args=(i,pool)) obj.start()
此方式的缺點:沒有將線程重複利用,要直到建立一個線程的耗時多是一個線程執行的好幾倍,因此有了第二種方式.
第二種方式是也是使用隊列,但隊列中的元素爲爲一個個(函數名,函數參數,)
的元組,建立一個線程組成的列表,線程輪流去隊列中取到元組,分解後執行函數,而後取下一個函數.
import queue import threading import contextlib import time StopEvent = object() class ThreadPool(object): def __init__(self, max_num, max_task_num = None): if max_task_num: self.q = queue.Queue(max_task_num) else: self.q = queue.Queue() self.max_num = max_num self.cancel = False self.terminal = False self.generate_list = [] self.free_list = [] def run(self, func, args, callback=None): """ 線程池執行一個任務 :param func: 任務函數 :param args: 任務函數所需參數 :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值(默認爲None,即:不執行回調函數) :return: 若是線程池已經終止,則返回True不然None """ if self.cancel: return if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args, callback,) self.q.put(w) def generate_thread(self): """ 建立一個線程 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循環去獲取任務函數並執行任務函數 """ current_thread = threading.currentThread self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func, arguments, callback = event try: result = func(*arguments) success = True except Exception as e: success = False result = None if callback is not None: try: callback(success, result) except Exception as e: pass with self.worker_state(self.free_list, current_thread): if self.terminal: event = StopEvent else: event = self.q.get() else: self.generate_list.remove(current_thread) def close(self): """ 執行完全部的任務後,全部線程中止 """ self.cancel = True full_size = len(self.generate_list) while full_size: self.q.put(StopEvent) full_size -= 1 def terminate(self): """ 不管是否還有任務,終止線程 """ self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.empty() @contextlib.contextmanager def worker_state(self, state_list, worker_thread): """ 用於記錄線程中正在等待的線程數 """ state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread) # How to use pool = ThreadPool(5) def callback(status, result): # status, execute action status # result, execute action return value pass def action(i): print(i) for i in range(30): ret = pool.run(action, (i,), callback) time.sleep(5) print(len(pool.generate_list), len(pool.free_list)) print(len(pool.generate_list), len(pool.free_list)) pool.close() pool.terminate()
進程與線程的使用方式基本雷同.好比start,daemon(用法略不一樣,意義相同),join,各類鎖等等.
默認進程之間是沒法進行共享的,看例子:
from multiprocessing import Process li = [] def foo(i): li.append(i) print('say hi',li) for i in range(10): p = Process(target=foo,args=(i,)) p.start() print('ending',li)
out:
say hi [0] say hi [1] say hi [2] say hi [3] say hi [4] say hi [5] say hi [6] say hi [7] ending [] say hi [8] say hi [9]
那麼如何讓進程之間可以共享呢?
基本可分爲三種方式:
queues方式:
from multiprocessing import queues from multiprocessing import Process import multiprocessing def foo(i,arg): arg.put(i) print('say hi',i,arg.qsize()) if __name__ == '__main__': li=queues.Queue(20,ctx=multiprocessing) for i in range(10): p=Process(target=foo,args=(i,li,)) p.start()
Array方式,數組有個特性,必須初始化的時候指定數組的長度和元素類型:
from multiprocessing import Process from multiprocessing import Array def foo(i,arg): arg[i]=i+100 for item in arg: print(item) print('======') if __name__ == '__main__': li=Array('i',10) for i in range(10): p=Process(target=foo,args=(i,li,)) p.start()
Array的類型對應表:
'c': ctypes.c_char, 'u': ctypes.c_wchar, 'b': ctypes.c_byte, 'B': ctypes.c_ubyte, 'h': ctypes.c_short, 'H': ctypes.c_ushort, 'i': ctypes.c_int, 'I': ctypes.c_uint, 'l': ctypes.c_long, 'L': ctypes.c_ulong, 'f': ctypes.c_float, 'd': ctypes.c_double
manager.dict 可實現數據共享
進程和進程之間若是想通信,須要鏈接p.join()
from multiprocessing import Process from multiprocessing import Manager def foo(i,arg): arg[i]=i+100 print(arg.values()) if __name__ == '__main__': obj=Manager() li=obj.dict() for i in range(10): p=Process(target=foo,args=(i,li,)) p.start() p.join()
當建立進程時(非使用時),共享數據會被拿到子進程中,當進程中執行完畢後,再賦值給原值。
進程鎖例子:
from multiprocessing import Process, Array, RLock def Foo(lock,temp,i): """ 將第0個數加100 """ lock.acquire() temp[0] = 100+i for item in temp: print(i,'----->',item) lock.release() lock = RLock() temp = Array('i', [11, 22, 33, 44]) for i in range(20): p = Process(target=Foo,args=(lock,temp,i,)) p.start()
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進程,那麼程序就會等待,直到進程池中有可用進程爲止。
進程池有兩種方式:
from multiprocessing import Pool import time def f1(arg): time.sleep(1) print(arg) if __name__ == '__main__': pool=Pool(5) for i in range(30): pool.apply(func=f1,args=(i,))
異步操做:
from multiprocessing import Pool import time def f1(arg): time.sleep(1) print(arg) if __name__ == '__main__': pool=Pool(5) for i in range(30): pool.apply_async(func=f1,args=(i,)) pool.close() #全部任務執行完畢 #1 time.sleep(1) #pool.terminate() #當即終止,不論是否有任務正在執行或者待執行 #2 pool.join()
其中 #1 #2 二選一操做
pool.terminate 當即終止,不論是否有任務正在執行或者等待執行
pool.close 全部任務執行完畢後關閉
原理:利用一個線程,分解一個線程成爲多個微線程==>程序級別作的,與操做系統沒有關係.
與線程進程的區別:線程和進程的操做是由程序觸發系統接口,最後的執行者是系統;協程的操做則是程序員。
協程的適用場景:涉及到http的I/O請求,協程是高性能的代名詞.因此,網絡爬蟲不少是使用協程方式.
協程存在的意義:對於多線程應用,CPU經過切片的方式來切換線程間的執行,線程切換時須要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。
協程的執行方式:打個比方,1個很牛逼的足球隊員,前面一排並列的足球,從第一個足球踢出去,而後提出第二個第三個,等足球彈回起始位置時,足球員對此足球接住後再次剃出或者停住球,這個足球員就是協程
使用前須要安裝gevent第三方模塊
pip3 install gevent
看下代碼吧,自動切換,關鍵詞gevent.spawn()
:
from gevent import monkey;monkey.patch_all() import gevent import requests def f(url): print('GET: %s' % url) resp = requests.get(url) data = resp.text print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'), ])