Python多線程

1、python併發編程之多線程

1.threading模塊

multiprocess模塊的徹底模仿了threading模塊的接口,兩者在使用層面,有很大的類似性,於是再也不詳細介紹php

1.1 開啓線程的兩種方式(同Process)html

方式一

from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() print('主線程')

方式二

from threading import Thread import time class Sayhi(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): time.sleep(2) print('%s say hello' % self.name) if __name__ == '__main__': t = Sayhi('egon') t.start() print('主線程')

 

1.2 在一個進程下開啓多個線程與在一個進程下開啓多個子進程的區別python

  1. 誰的開啓速度更快?
from threading import Thread from multiprocessing import Process import os def work(): print('hello') if __name__ == '__main__': #在主進程下開啓線程 t=Thread(target=work) t.start() print('主線程/主進程') ''' 打印結果: hello 主線程/主進程 ''' #在主進程下開啓子進程 t=Process(target=work) t.start() print('主線程/主進程') ''' 打印結果: 主線程/主進程 hello '''
  1. 瞅一瞅pid?
from threading import Thread from multiprocessing import Process import os def work(): print('hello',os.getpid()) if __name__ == '__main__': #part1:在主進程下開啓多個線程,每一個線程都跟主進程的pid同樣 t1=Thread(target=work) t2=Thread(target=work) t1.start() t2.start() print('主線程/主進程pid',os.getpid()) #part2:開多個進程,每一個進程都有不一樣的pid p1=Process(target=work) p2=Process(target=work) p1.start() p2.start() print('主線程/主進程pid',os.getpid())

1.3 練習mysql

練習一:git

多線程併發的socket服務端

import multiprocessing import threading import socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.bind(('127.0.0.1',8080)) s.listen(5) def action(conn): while True: data=conn.recv(1024) print(data) conn.send(data.upper()) if __name__ == '__main__': while True: conn,addr=s.accept() p=threading.Thread(target=action,args=(conn,)) p.start()

客戶端

import socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue s.send(msg.encode('utf-8')) data=s.recv(1024) print(data)

練習二:三個任務,一個接收用戶輸入,一個將用戶輸入的內容格式化成大寫,一個將格式化後的結果存入文件github

from threading import Thread msg_l=[] format_l=[] def talk(): while True: msg=input('>>: ').strip() if not msg:continue msg_l.append(msg) def format_msg(): while True: if msg_l: res=msg_l.pop() format_l.append(res.upper()) def save(): while True: if format_l: with open('db.txt','a',encoding='utf-8') as f: res=format_l.pop() f.write('%s\n' %res) if __name__ == '__main__': t1=Thread(target=talk) t2=Thread(target=format_msg) t3=Thread(target=save) t1.start() t2.start() t3.start()

1.4 線程的join與setdaemonweb

與進程的方法都是相似的,實際上是multiprocessing模仿threading的接口redis

join與setdaemonsql

from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.setDaemon(True) t.start() t.join() print('主線程') print(t.is_alive())

1.5 線程相關的其餘方法補充編程

Thread實例對象的方法

  • isAlive(): 返回線程是否活動的。
  • getName(): 返回線程名。
  • setName(): 設置線程名。

threading模塊提供的一些方法:

  • threading.currentThread(): 返回當前的線程變量。
  • threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
  • threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
from threading import Thread import threading from multiprocessing import Process import os def work(): import time time.sleep(3) print(threading.current_thread().getName()) if __name__ == '__main__': #在主進程下開啓線程 t=Thread(target=work) t.start() print(threading.current_thread().getName()) print(threading.current_thread()) #主線程 print(threading.enumerate()) #連同主線程在內有兩個運行的線程 print(threading.active_count()) print('主線程/主進程') ''' 打印結果: MainThread <_MainThread(MainThread, started 140735268892672)> [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>] 2 主線程/主進程 Thread-1 '''

2.2 Python GIL(Global Interpreter Lock)

'''

定義:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple
native threads from executing Python bytecodes at once. This lock is necessary mainly
because CPython’s memory management is not thread-safe. (However, since the GIL
exists, other features have grown to depend on the guarantees that it enforces.)

'''
結論:在Cpython解釋器中,同一個進程下開啓的多線程,同一時刻只能有一個線程執行,沒法利用多核優點

首先須要明確的一點是GIL並非Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就比如C++是一套語言(語法)標準,可是能夠用不一樣的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也同樣,一樣一段代碼能夠經過CPython,PyPy,Psyco等不一樣的Python執行環境來執行。像其中的JPython就沒有GIL。然而由於CPython是大部分環境下默認的Python執行環境。因此在不少人的概念裏CPython就是Python,也就想固然的把GIL歸結爲Python語言的缺陷。因此這裏要先明確一點:GIL並非Python的特性,Python徹底能夠不依賴於GIL

這篇文章透徹的剖析了GIL對python多線程的影響,強烈推薦看一下:http://www.dabeaz.com/python/UnderstandingGIL.pdf

關於GIL與Lock的比較請看2.3小節,此處只需知道:有了GIL的存在,同一時刻統一進程中只有一個線程被執行

聽到這裏,有的同窗立馬質問:進程能夠利用多核,可是開銷大,而python的多線程開銷小,但卻沒法利用多核優點,也就是說python沒用了,php纔是最牛逼的語言?

彆着急啊,老孃還沒講完呢。

要解決這個問題,咱們須要在幾個點上達成一致:

  1. cpu究竟是用來作計算的,仍是用來作I/O的?

  2. 多cpu,意味着能夠有多個核並行完成計算,因此多核提高的是計算性能

  3. 每一個cpu一旦遇到I/O阻塞,仍然須要等待,因此多核對I/O操做沒什麼用處

一個工人至關於cpu,此時計算至關於工人在幹活,I/O阻塞至關於爲工人幹活提供所需原材料的過程,工人幹活的過程當中若是沒有原材料了,則工人幹活的過程須要中止,直到等待原材料的到來。

若是你的工廠乾的大多數任務都要有準備原材料的過程(I/O密集型),那麼你有再多的工人,意義也不大,還不如一我的,在等材料的過程當中讓工人去幹別的活,

反過來說,若是你的工廠原材料都齊全,那固然是工人越多,效率越高

結論:

  對計算來講,cpu越多越好,可是對於I/O來講,再多的cpu也沒用

  固然對於一個程序來講,不會是純計算或者純I/O,咱們只能相對的去看一個程序究竟是計算密集型仍是I/O密集型,從而進一步分析python的多線程有無用武之地

分析:

咱們有四個任務須要處理,處理方式確定是要玩出併發的效果,解決方案能夠是:

方案一:開啓四個進程

方案二:一個進程下,開啓四個線程

單核狀況下,分析結果:

  若是四個任務是計算密集型,沒有多核來並行計算,方案一徒增了建立進程的開銷,方案二勝

  若是四個任務是I/O密集型,方案一建立進程的開銷大,且進程的切換速度遠不如線程,方案二勝

多核狀況下,分析結果:

  若是四個任務是計算密集型,多核意味着並行計算,在python中一個進程中同一時刻只有一個線程執行用不上多核,方案一勝

  若是四個任務是I/O密集型,再多的核也解決不了I/O問題,方案二勝

結論:如今的計算機基本上都是多核,python對於計算密集型的任務開多線程的效率並不能帶來多大性能上的提高,甚至不如串行(沒有大量切換),可是,對於IO密集型的任務效率仍是有顯著提高的。

計算密集型

from threading import Thread from multiprocessing import Process import os import time def work(): res=0 for i in range(1000000): res+=i if __name__ == '__main__': t_l=[] start_time=time.time() # for i in range(300): #串行 # work() for i in range(300): t=Thread(target=work) #在個人機器上,4核cpu,多線程大概15秒 # t=Process(target=work) #在個人機器上,4核cpu,多進程大概10秒 t_l.append(t) t.start() for i in t_l: i.join() stop_time=time.time() print('run time is %s' %(stop_time-start_time)) print('主線程')

I/O密集型

from threading import Thread from multiprocessing import Process import time import os def work(): time.sleep(2) #模擬I/O操做,能夠打開一個文件來測試I/O,與sleep是一個效果 print(os.getpid()) if __name__ == '__main__': t_l=[] start_time=time.time() for i in range(1000): t=Thread(target=work) #耗時大概爲2秒 # t=Process(target=work) #耗時大概爲25秒,建立進程的開銷遠高於線程,並且對於I/O密集型,多cpu根本無論用 t_l.append(t) t.start() for t in t_l: t.join() stop_time=time.time() print('run time is %s' %(stop_time-start_time))

應用:

多線程用於IO密集型,如socket,爬蟲,web
多進程用於計算密集型,如金融分析

2.3 同步鎖

同進程同樣

import time import threading def addNum(): global num #在每一個線程中都獲取這個全局變量 #num-=1 temp=num time.sleep(0.1) num =temp-1 # 對此公共變量進行-1操做 num = 100 #設定一個共享變量 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待全部線程執行完畢 t.join() print('Result: ', num)

鎖一般被用來實現對共享資源的同步訪問。爲每個共享資源建立一個Lock對象,當你須要訪問該資源時,調用acquire方法來獲取鎖對象(若是其它線程已經得到了該鎖,則當前線程需等待其被釋放),待資源訪問完後,再調用release方法釋放鎖:

import threading R=threading.Lock() R.acquire() ''' 對公共數據的操做 ''' R.release()

GIL VS Lock

機智的同窗可能會問到這個問題,就是既然你以前說過了,Python已經有一個GIL來保證同一時間只能有一個線程來執行了,爲何這裏還須要lock?

首先咱們須要達成共識:鎖的目的是爲了保護共享的數據,同一時間只能有一個線程來修改共享的數據

而後,咱們能夠得出結論:保護不一樣的數據就應該加不一樣的鎖。

最後,問題就很明朗了,GIL與Lock是兩把鎖,保護的數據不同,前者是解釋器級別的(固然保護的就是解釋器級別的數據,好比垃圾回收的數據),後者是保護用戶本身開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock

詳細的:

由於Python解釋器幫你自動按期進行內存回收,你能夠理解爲python解釋器裏有一個獨立的線程,每過一段時間它起wake up作一次全局輪詢看看哪些內存數據是能夠被清空的,此時你本身的程序裏的線程和py解釋器本身的線程是併發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程當中的clearing時刻,可能一個其它線程正好又從新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,爲了解決相似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題, 這能夠說是Python早期版本的遺留問題。 

2.4 死鎖與遞歸鎖

進程也有死鎖與遞歸鎖,在進程那裏忘記說了,放到這裏一切說了額

所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程,以下就是死鎖

from threading import Thread,Lock import time mutexA=Lock() mutexB=Lock() class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print('\033[41m%s 拿到A鎖\033[0m' %self.name) mutexB.acquire() print('\033[42m%s 拿到B鎖\033[0m' %self.name) mutexB.release() mutexA.release() def func2(self): mutexB.acquire() print('\033[43m%s 拿到B鎖\033[0m' %self.name) time.sleep(2) mutexA.acquire() print('\033[44m%s 拿到A鎖\033[0m' %self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t=MyThread() t.start() ''' Thread-1 拿到A鎖 Thread-1 拿到B鎖 Thread-1 拿到B鎖 Thread-2 拿到A鎖 而後就卡住,死鎖了 '''

解決方法,遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:

mutexA=mutexB=threading.RLock() 一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1,這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,即counter遞減到0爲止

2.5 信號量Semahpore

同進程的同樣

Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。

實例:(同時只有5個線程能夠得到semaphore,便可以限制最大鏈接數爲5):

import threading import time semaphore = threading.Semaphore(5) def func(): if semaphore.acquire(): print (threading.currentThread().getName() + ' get semaphore') time.sleep(2) semaphore.release() for i in range(20): t1 = threading.Thread(target=func) t1.start()

與進程池是徹底不一樣的概念,進程池Pool(4),最大隻能產生4個進程,並且從頭至尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程

2.6 Event
同進程的同樣

線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其餘線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。對象包含一個可由線程設置的信號標誌,它容許線程等待某事件的發生。在初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象,而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行

event.isSet():返回event的狀態值; event.wait():若是 event.isSet()==False將阻塞線程; event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度; event.clear():恢復event的狀態值爲False。

 

能夠考慮一種應用場景(僅僅做爲說明),例如,咱們有多個線程從Redis隊列中讀取數據來處理,這些線程都要嘗試去鏈接Redis的服務,通常狀況下,若是Redis鏈接不成功,在各個線程的代碼中,都會去嘗試從新鏈接。若是咱們想要在啓動時確保Redis服務正常,才讓那些工做線程去鏈接Redis服務器,那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做:主線程中會去嘗試鏈接Redis服務,若是正常的話,觸發事件,各工做線程會嘗試鏈接Redis服務。

不瞭解redis能夠參考mysql的例子(同樣的道理)

import threading import time import logging logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',) def worker(event): logging.debug('Waiting for redis ready...') event.wait() logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime()) time.sleep(1) def main(): readis_ready = threading.Event() t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1') t1.start() t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2') t2.start() logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event') time.sleep(3) # simulate the check progress readis_ready.set() if __name__=="__main__": main()

mysql...

from threading import Thread,Event import threading import time,random def conn_mysql(): print('\033[42m%s 等待鏈接mysql。。。\033[0m' %threading.current_thread().getName()) event.wait() print('\033[42mMysql初始化成功,%s開始鏈接。。。\033[0m' %threading.current_thread().getName()) def check_mysql(): print('\033[41m正在檢查mysql。。。\033[0m') time.sleep(random.randint(1,3)) event.set() time.sleep(random.randint(1,3)) if __name__ == '__main__': event=Event() t1=Thread(target=conn_mysql) #等待鏈接mysql t2=Thread(target=conn_mysql) #等待鏈接myqsl t3=Thread(target=check_mysql) #檢查mysql t1.start() t2.start() t3.start()

threading.Event的wait方法還接受一個超時參數,默認狀況下若是事件一致沒有發生,wait方法會一直阻塞下去,而加入這個超時參數以後,若是阻塞時間超過這個參數設定的值以後,wait方法會返回。對應於上面的應用場景,若是Redis服務器一致沒有啓動,咱們但願子線程可以打印一些日誌來不斷地提醒咱們當前沒有一個能夠鏈接的Redis服務,咱們就能夠經過設置這個超時參數來達成這樣的目的:

def conn_mysql(): count=0 while not e.is_set(): print('%s 第 <%s> 次嘗試' %(threading.current_thread().getName(),count)) count+=1 e.wait(0.5) print('%s ready to conn mysql' %threading.current_thread().getName()) time.sleep(1)

修訂上述mysql版本

from threading import Thread,Event import threading import time,random def conn_mysql(): while not event.is_set(): print('\033[42m%s 等待鏈接mysql。。。\033[0m' %threading.current_thread().getName()) event.wait(0.1) print('\033[42mMysql初始化成功,%s開始鏈接。。。\033[0m' %threading.current_thread().getName()) def check_mysql(): print('\033[41m正在檢查mysql。。。\033[0m') time.sleep(random.randint(1,3)) event.set() time.sleep(random.randint(1,3)) if __name__ == '__main__': event=Event() t1=Thread(target=conn_mysql) t2=Thread(target=conn_mysql) t3=Thread(target=check_mysql) t1.start() t2.start() t3.start()

這樣,咱們就能夠在等待Redis服務啓動的同時,看到工做線程里正在等待的狀況。

應用:鏈接池

2.7 條件Condition(瞭解)

使得線程等待,只有知足某條件時,才釋放n個線程

import threading def run(n): con.acquire() con.wait() print("run the thread: %s" %n) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start() while True: inp = input('>>>') if inp == 'q': break con.acquire() con.notify(int(inp)) con.release()
def condition_func(): ret = False inp = input('>>>') if inp == '1': ret = True return ret def run(n): con.acquire() con.wait_for(condition_func) print("run the thread: %s" %n) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start()

2.8 定時器

定時器,指定n秒後執行某操做

from threading import Timer def hello(): print("hello, world") t = Timer(1, hello) t.start() # after 1 seconds, "hello, world" will be printed

2.9 線程queue

queue隊列 :使用import queue,用法與進程Queue同樣

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class queue.Queue(maxsize=0) #先進先出

import queue q=queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(先進先出): first second third '''

class queue.LifoQueue(maxsize=0) #last in fisrt out

import queue q=queue.LifoQueue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(後進先出): third second first '''

class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

import queue q=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高 q.put((20,'a')) q.put((10,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) ''' 結果(數字越小優先級越高,優先級高的優先出隊): (10, 'b') (20, 'a') (30, 'c') '''

Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).

exception queue.Empty
Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.

exception queue.Full
Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.

Queue.qsize()
Queue.empty() #return True if empty
Queue.full() # return True if full
Queue.put(item, block=True, timeout=None)
Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

Queue.put_nowait(item)
Equivalent to put(item, False).

Queue.get(block=True, timeout=None)
Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Queue.get_nowait()
Equivalent to get(False).

Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

Queue.task_done()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

Queue.join() block直到queue被消費完畢

2.10 Python標準模塊--concurrent.futures

https://docs.python.org/dev/library/concurrent.futures.html

三 補充:paramiko模塊

  1. 介紹:

paramiko是一個用於作遠程控制的模塊,使用該模塊能夠對遠程服務器進行命令或文件操做,值得一說的是,fabric和ansible內部的遠程管理就是使用的paramiko來現實。

  1. 下載安裝
    pycrypto,因爲 paramiko 模塊內部依賴pycrypto,因此先下載安裝pycrypto
    pip3 install pycrypto
    pip3 install paramiko
    注:若是在安裝pycrypto2.0.1時發生以下錯誤
    command 'gcc' failed with exit status 1...
    多是缺乏python-dev安裝包致使
    若是gcc沒有安裝,請事先安裝gcc

  2. 使用

SSHClient
用於鏈接遠程服務器並執行基本命令

基於用戶名密碼鏈接:

import paramiko
  
# 建立SSH對象 ssh = paramiko.SSHClient() # 容許鏈接不在know_hosts文件中的主機 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 鏈接服務器 ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', password='123') # 執行命令 stdin, stdout, stderr = ssh.exec_command('df') # 獲取命令結果 result = stdout.read() # 關閉鏈接 ssh.close()

四 協程

協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。

須要強調的是:

  1. python的線程屬於內核級別的,即由操做系統控制調度(如單線程一旦遇到io就被迫交出cpu執行權限,切換其餘線程運行)

  2. 單線程內開啓協程,一旦遇到io,從應用程序級別(而非操做系統)控制切換

對比操做系統控制線程的切換,用戶在單線程內控制協程的切換,優勢以下:

  1. 協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級

  2. 單線程內就能夠實現併發的效果,最大限度地利用cpu

要實現協程,關鍵在於用戶程序本身控制程序切換,切換以前必須由用戶程序本身保存協程上一次調用時的狀態,如此,每次從新調用時,可以從上次的位置繼續執行

(詳細的:協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧)

爲此,咱們以前已經學習過一種在單線程下能夠保存程序運行狀態的方法,即yield,咱們來簡單複習一下:

1.yiled能夠保存狀態,yield的狀態保存與操做系統的保存線程狀態很像,可是yield是代碼級別控制的,更輕量級
2.send能夠把一個函數的結果傳給另一個函數,以此實現單線程內程序之間的切換

#不用yield:每次函數調用,都須要重複開闢內存空間,即重複建立名稱空間,於是開銷很大 import time def consumer(item): # print('拿到包子%s' %item) x=11111111111 x1=12111111111 x3=13111111111 x4=14111111111 y=22222222222 z=33333333333 pass def producer(target,seq): for item in seq: target(item) #每次調用函數,會臨時產生名稱空間,調用結束則釋放,循環100000000次,則重複這麼屢次的建立和釋放,開銷很是大 start_time=time.time() producer(consumer,range(100000000)) stop_time=time.time() print('run time is:%s' %(stop_time-start_time)) #30.132838010787964 #使用yield:無需重複開闢內存空間,即重複建立名稱空間,於是開銷小 import time def init(func): def wrapper(*args,**kwargs): g=func(*args,**kwargs) next(g) return g return wrapper @init def consumer(): x=11111111111 x1=12111111111 x3=13111111111 x4=14111111111 y=22222222222 z=33333333333 while True: item=yield # print('拿到包子%s' %item) pass def producer(target,seq): for item in seq: target.send(item) #無需從新建立名稱空間,從上一次暫停的位置繼續,相比上例,開銷小 start_time=time.time() producer(consumer(),range(100000000)) stop_time=time.time() print('run time is:%s' %(stop_time-start_time)) #21.882073879241943

缺點:

協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程

協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程

協程的定義(知足1,2,3就可稱爲協程):

1.必須在只有一個單線程裏實現併發
2.修改共享數據不需加鎖
3.用戶程序裏本身保存多個控制流的上下文棧
4.附加:一個協程遇到IO操做自動切換到其它協程(如何實現檢測IO,yield、greenlet都沒法實現,就用到了gevent模塊(select機制))
yield切換在沒有io的狀況下或者沒有重複開闢內存空間的操做,對效率沒有什麼提高,甚至更慢,爲此,能夠用greenlet來爲你們演示這種切換

五 Greenlet

greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可使你在任意函數之間隨意切換,而不需把這個函數先聲明爲generator

from greenlet import greenlet def test1(): print('test1,first') gr2.switch() print('test1,sencod') gr2.switch() def test2(): print('test2,first') gr1.switch() print('test2,sencod') gr1=greenlet(test1) gr2=greenlet(test2) gr1.switch()

能夠在第一次switch時傳入參數

import time from greenlet import greenlet def eat(name): print('%s eat food 1' %name) gr2.switch('alex飛飛飛') print('%s eat food 2' %name) gr2.switch() def play_phone(name): print('%s play 1' %name) gr1.switch() print('%s play 2' %name) gr1=greenlet(eat) gr2=greenlet(play_phone) gr1.switch(name='egon啦啦啦')#能夠在第一次switch時傳入參數,之後都不須要

單純的切換(在沒有io的狀況下或者沒有重複開闢內存空間的操做),反而會下降程序的執行速度

#順序執行 import time def f1(): res=0 for i in range(10000000): res+=i def f2(): res=0 for i in range(10000000): res*=i start_time=time.time() f1() f2() stop_time=time.time() print('run time is: %s' %(stop_time-start_time)) #1.7395639419555664 #切換 from greenlet import greenlet import time def f1(): res=0 for i in range(10000000): res+=i gr2.switch() def f2(): res=0 for i in range(10000000): res*=i gr1.switch() gr1=greenlet(f1) gr2=greenlet(f2) start_time=time.time() gr1.switch() stop_time=time.time() print('run time is: %s' %(stop_time-start_time)) #7.789067983627319

greenlet只是提供了一種比generator更加便捷的切換方式,仍然是沒有解決遇到IO自動切換的問題

六 Gevent

Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。

g1=gevent.spawn()建立一個協程對象g1,

spawn括號內第一個參數是函數名,如eat,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數eat的

遇到IO阻塞時會自動切換任務

import gevent import time def eat(): print('eat food 1') gevent.sleep(2) #等飯來 print('eat food 2') def play_phone(): print('play phone 1') gevent.sleep(1) #網卡了 print('play phone 2') # gevent.spawn(eat) # gevent.spawn(play_phone) # print('主') # 直接結束 #於是也須要join方法,進程或現場的jion方法只能join一個,而gevent的join方法能夠join多個 g1=gevent.spawn(eat) g2=gevent.spawn(play_phone) gevent.joinall([g1,g2]) print('主')

上例gevent.sleep(2)模擬的是gevent能夠識別的io阻塞,

而time.sleep(2)或其餘的阻塞,gevent是不能直接識別的須要用下面一行代碼,打補丁,就能夠識別了

from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊以前

或者咱們乾脆記憶成:要用gevent,須要將from gevent import monkey;monkey.patch_all()放到文件的開頭

from gevent import monkey;monkey.patch_all() import gevent import time def eat(): print('eat food 1') time.sleep(2) print('eat food 2') def play_phone(): print('play phone 1') time.sleep(1) print('play phone 2') g1=gevent.spawn(eat) g2=gevent.spawn(play_phone) gevent.joinall([g1,g2]) print('主')

同步與異步

import gevent def task(pid): """ Some non-deterministic task """ gevent.sleep(0.5) print('Task %s done' % pid) def synchronous(): for i in range(1,10): task(i) def asynchronous(): threads = [gevent.spawn(task, i) for i in range(10)] gevent.joinall(threads) print('Synchronous:') synchronous() print('Asynchronous:') asynchronous()

上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,後者阻塞當前流程,並執行全部給定的greenlet。執行流程只會在 全部greenlet執行完後纔會繼續向下走。

 

gevent線程的一些用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)

g2=gevent.spawn(func2)

g1.join() #等待g1結束

g2.join() #等待g2結束

或者上述兩步合做一步:gevent.joinall([g1,g2])

g1.value #拿到func1的返回值

 

協程應用:爬蟲

from gevent import monkey;monkey.patch_all() import gevent import requests import time def get_page(url): print('GET: %s' %url) response=requests.get(url) if response.status_code == 200: print('%d bytes received from %s' %(len(response.text),url)) start_time=time.time() gevent.joinall([ gevent.spawn(get_page,'https://www.python.org/'), gevent.spawn(get_page,'https://www.yahoo.com/'), gevent.spawn(get_page,'https://github.com/'), ]) stop_time=time.time() print('run time is %s' %(stop_time-start_time))

經過gevent實現單線程下的socket併發(from gevent import monkey;monkey.patch_all()必定要放到導入socket模塊以前,不然gevent沒法識別socket的阻塞)

服務端

from gevent import monkey;monkey.patch_all() from socket import * import gevent #若是不想用money.patch_all()打補丁,能夠用gevent自帶的socket # from gevent import socket # s=socket.socket() def server(server_ip,port): s=socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind((server_ip,port)) s.listen(5) while True: conn,addr=s.accept() gevent.spawn(talk,conn,addr) def talk(conn,addr): try: while True: res=conn.recv(1024) print('client %s:%s msg: %s' %(addr[0],addr[1],res)) conn.send(res.upper()) except Exception as e: print(e) finally: conn.close() if __name__ == '__main__': server('127.0.0.1',8080) 服務端from gevent import monkey;monkey.patch_all() from socket import * import gevent #若是不想用money.patch_all()打補丁,能夠用gevent自帶的socket # from gevent import socket # s=socket.socket() def server(server_ip,port): s=socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind((server_ip,port)) s.listen(5) while True: conn,addr=s.accept() gevent.spawn(talk,conn,addr) def talk(conn,addr): try: while True: res=conn.recv(1024) print('client %s:%s msg: %s' %(addr[0],addr[1],res)) conn.send(res.upper()) except Exception as e: print(e) finally: conn.close() if __name__ == '__main__': server('127.0.0.1',8080)

客戶端

from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))

多線程併發多個客戶端

from threading import Thread from socket import * import threading def client(server_ip,port): c=socket(AF_INET,SOCK_STREAM) c.connect((server_ip,port)) count=0 while True: c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8')) msg=c.recv(1024) print(msg.decode('utf-8')) count+=1 if __name__ == '__main__': for i in range(500): t=Thread(target=client,args=('127.0.0.1',8080)) t.start()
相關文章
相關標籤/搜索