python基礎-第九篇-9.1初瞭解Python線程、進程、協程

  瞭解相關概念以前,咱們先來看一張圖python

 

進程:git

  • 優勢:同時利用多個cpu,可以同時進行多個操做
  • 缺點:耗費資源(從新開闢內存空間)

線程:程序員

  • 優勢:共享內存,IO操做時候,創造併發操做
  • 缺點:搶佔資源

經過對比,咱們能夠得出:github

  • 因爲計算多用到cpu,因此多進程適合計算密集型
  • 因爲IO操做不用到cpu,因此多線程適合IO密集型
  • 進程不是越多越好,cpu個數=進程個數
  • 線程也不是越多越好,具體案例具體分析,請求上下文切換耗時
  • 計算機中執行任務的最小單位:線程
  • 進程和線程的目的都是提升效率
  • 另外,GIL全局解釋器鎖,這個是Python獨有的,做用是一個進程裏一次只能執行一個線程,固然這個鎖只適用於須要調用cpu的狀況

 

Python線程

  Threading用於提供線程相關的操做,線程是應用程序中工做的最小單元算法

import threading
import time

def show(arg):
    time.sleep(1)
    print('thread'+str(arg))

for i in range(10):
    t = threading.Thread(target=show,args=(i,))
    t.start()

print('main thread stop')

   上述代碼建立了10個前臺線程,而後控制器就交給了cpu,cpu根據指定算法進行調度,分片執行指令編程

更多方法:數組

  • start            線程準備就緒,等待CPU調度
  • setName      爲線程設置名稱
  • getName      獲取線程名稱
  • setDaemon   設置爲後臺線程或前臺線程(默認)

                   若是是後臺線程,主線程執行過程當中,後臺線程也在進行,主線程執行完畢後,後臺線程不論成功與否,均中止安全

                   若是是前臺線程,主線程執行過程當中,前臺線程也在進行,主線程執行完畢後,等待前臺線程也執行完成後,程序中止數據結構

  • join              逐個執行每一個線程,執行完畢後繼續往下執行
  • run              線程被cpu調度後自動執行線程對象的run方法

 

自定義線程類:多線程

import threading
import time

class MyThread(threading.Thread):

    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num = num

    def run(self): #定義每一個線程要運行的函數

        print('running on number:%s'%self.num)
        time.sleep(3)

if __name__ == '__main__':
    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()

   不過咱們有個疑問啦,在定義的這個類中,根本沒涉及調用run函數,這是怎麼實現的呢??

  那咱們去看下源碼就明白了,實際上是start方法再起做用

  

  

  

  因此start方法在底層是調用了run方法

 

 線程鎖(Lock、RLock)

  因爲線程之間是進行隨機調度,因此不可避免的存在多個線程同時修改同一條數據,從而可能會出現髒數據,因此出現了線程鎖,同一時刻只容許一個線程執行操做。

 

未上鎖的:

import threading
import time

num = 0

def show(arg):
    global num
    time.sleep(1)
    num += 1
    print(num)

for i in range(10):
    t = threading.Thread(target=show,args=(i,))
    t.start()

print('main thread stop')

 

上鎖的:

import threading
import time

num = 0
lock = threading.RLock()

def func():
    lock.acquire()
    global num
    num += 1
    time.sleep(1)
    print(num)
    lock.release()

for i in range(10):
    t = threading.Thread(target=func)
    t.start()

   咱們會發現,這兩段代碼輸出的結果都同樣,並無產生髒數據,可是細心你會發現:打印結果的過程是不一樣的,未加鎖的--能夠說是幾乎同時打印結果,而加了鎖的,則是一個一個打印,這就是鎖在起做用,對同一資源,在一個點上只能執行一個線程。

 

事件(event)

  Python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法:set、wait、clear

  事件處理的機制:全局定義了一個‘flag’,若是‘flag’值爲False,那麼當程序執行event.wait方法時就會阻塞,若是‘Flag’值爲True,那麼event.wait方法時便再也不阻塞

  • clear  將‘Flag’設置爲False
  • set  將‘Flag’設置爲True
import threading
import time

def do(event):
    print('start')
    event.wait()
    print('execute')

event_obj = threading.Event()
for i in range(10):
    t = threading.Thread(target=do,args=(event_obj,))
    t.start()

event_obj.clear()
# time.sleep(2)
inp = input('input:')
if inp == 'true':
    event_obj.set()

 

import threading
import time

event = threading.Event()

def func():
    print('%s wait for event...'%threading.currentThread().getName())
    #等待--阻塞
    event.wait()

    #收到事件後進入運行狀態
    print('%s recv event.'%threading.currentThread().getName())

t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t1.start()
t2.start()

time.sleep(2)

#發出事件通知
print('MainThread set event.')
event.set()

 

信號量(Semaphore)

  Semaphore是同時容許必定數量的線程更改數據,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去。

import threading,time

def run(n):
    semaphore.acquire()
    time.sleep(1)
    print('run the thread:%s'%n)
    semaphore.release()

if __name__ == '__main__':
    num = 0
    #最多容許5個線程同時運行
    semaphore = threading.BoundedSemaphore(5)
    for i in range(20):
        t = threading.Thread(target=run,args=(i,))
        t.start()

 

條件(Condition)

  使用線程等待,只有知足某條件時,才釋放n個線程

  • acquire  給線程上鎖
  • wait       wait方法釋放當前線程佔用的鎖,同時掛起線程,直至被喚醒或超時(需timeout參數)。當線程被喚醒並從新佔有鎖的時候,程序纔會繼續執行下去。
  • notify     喚醒一個掛起的線程(若是存在掛起的線程),不會釋放所佔用的鎖
  • notifyall  調用這個方法將通知等待池中全部線程,這些線程都將進入鎖定池嘗試得到鎖定,此方法不會釋放鎖定,使用前線程必須已得到鎖定。不然將拋出異常
import threading
import time
def consumer(cond):
    with cond:
        print("consumer before wait")
        cond.wait()
        print("consumer after wait")
  
def producer(cond):
    with cond:
        print("producer before notifyAll")
        cond.notifyAll()
        print("producer after notifyAll")
  
condition = threading.Condition()
c1 = threading.Thread(name="c1", target=consumer, args=(condition,))
c2 = threading.Thread(name="c2", target=consumer, args=(condition,))
  
p = threading.Thread(name="p", target=producer, args=(condition,))
  
c1.start()
time.sleep(2)
c2.start()
time.sleep(2)
p.start()
 
# consumer()線程要等待producer()設置了Condition以後才能繼續。

 

import threading

def run(n):
    con.acquire()
    con.wait()
    print('run the thread:%s'%n)
    con.release()


if __name__ == '__main__':

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run,args=(i,))
        t.start()

    while True:
        inp = input('>>>')
        if inp == 'q':
            break

        con.acquire()
        con.notify(int(inp))
        con.release()

 

Python進程

from multiprocessing import Process
import time

def foo(i):
    print('say hi',i)

if __name__ == '__main__':
    for i in range(10):
        p = Process(target=foo,args=(i,))
        p.start()

   咱們能夠看到,進程和線程代碼實現幾乎是相同的,對於進程而言,模塊是multiprocessing,另外,在建立進程前,加了一個__name__的驗證,這是因爲操做系統的緣由,反正你只要加上了就能夠了。

  另外,咱們已經提到過,建立進程就等同搭建了一個進程環境,消耗內存是不小的(相對線程)。

 

進程數據共享

  因爲進程建立時,數據是各持有一份的,默認狀況下進程間是沒法共享數據的。

from multiprocessing import Process
import time

li = []

def foo(i):
    li.append(i)
    print('say hi',li)

if __name__ == '__main__':
    for i in range(10):
        p = Process(target=foo,args=(i,))
        p.start()

    print('ending',li)

結果爲:
    say hi [1]
    say hi [0]
    say hi [2]
    say hi [3]
    say hi [4]
    say hi [5]
    say hi [6]
    say hi [7]
    ending []
    say hi [8]
    say hi [9]

   從結果裏,咱們也知道,進程間數據是不共享的,列表元素沒有實現累加。

 

  不過,若是你硬要實現共享的話,辦法仍是有的,請往下看:

方法一:引用數組Array

from multiprocessing import Process,Array

def Foo(temp,i):
    temp[i] = 100+i
    for item in temp:
        print(i,'----->',item)

if __name__ == '__main__':
    temp = Array('i', [11, 22, 33, 44])

    for i in range(2):
        p = Process(target=Foo,args=(temp,i,))
        p.start()

 

方法二:manage.dict()

from multiprocessing import Process,Manager

def Foo(dic,i):
    dic[i] = 100 + i
    print(dic.values())

if __name__ == '__main__':
    manage = Manager()
    dic = manage.dict()

    for i in range(2):
        p = Process(target=Foo,args=(dic,i,))
        p.start()
        p.join()

 

方法三:multiprocessing.Queue

from multiprocessing import Process, Queue

def f(i,q):
    print(i,q.get())

if __name__ == '__main__':
    q = Queue()

    q.put("h1")
    q.put("h2")
    q.put("h3")

    for i in range(10):
        p = Process(target=f, args=(i,q,))
        p.start()

   當建立進程時(非使用時),共享數據會被拿到子進程中,當進程中執行完畢後,再賦值給原值,另外涉及數據共享就一定存在同一份數據被多個進程同時修改,因此在multiprocessing模塊裏也也提供了RLock類。

 

進程池

  進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可以使用的進程,那麼程序就會等待,直到進程池中有可用進程爲止。

  • apply(func[, args[, kwds]]) :使用arg和kwds參數調用func函數,結果返回前會一直阻塞,因爲這個緣由,apply_async()更適合併發執行,另外,func函數僅被pool中的一個進程運行。

  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一個變體,會返回一個結果對象。若是callback被指定,那麼callback能夠接收一個參數而後被調用,當結果準備好回調時 會調用callback,調用失敗時,則用error_callback替換callback。 Callbacks應被當即完成,不然處理結果的線程會被阻塞。

  • close() : 阻止更多的任務提交到pool,待任務完成後,工做進程會退出。

  • terminate() : 無論任務是否完成,當即中止工做進程。在對pool對象進程垃圾回收的時候,會當即調用terminate()。

  • join() : wait工做線程的退出,在調用join()前,必須調用close() or terminate()。這樣是由於被終止的進程須要被父進程調用wait(join等價與wait),不然進程會成爲殭屍進程

 

  • apply        每個任務是排隊進行
  • apply_async  每個任務都併發進行,能夠設置回調函數
from multiprocessing import Process,Pool
import time

def Foo(i):
    time.sleep(2)
    return i+100

def Bar(arg):
    print(arg)

if __name__ == '__main__':
    pool = Pool(5)

    for i in range(10):
        pool.apply_async(func=Foo,args=(i,),callback=Bar)

    print('end')
    pool.close()
    pool.join() #進程池中進程執行完畢後再關閉
    print('really end')

 

隊列(queue)

  適用於多線程編程的先進先出數據結構,能夠用來安全的傳遞多線程信息。

  • q = queue.Queue(maxsize=0)   構造一個先進顯出隊列,maxsize指定隊列長度,參數不填默認表示隊列長度無限制。
  • q.join()    等到隊列爲kong的時候,在執行別的操做
  • q.put(item, block=True, timeout=None)   將item放入Queue尾部,item必須存在,能夠參數block默認爲True,表示當隊列滿時,會等待隊列給出可用位置,爲False時爲非阻塞,此時若是隊列已滿,會引起queue.Full 異常。 可選參數timeout,表示 會阻塞設置的時間,事後,若是隊列沒法給出放入item的位置,則引起 queue.Full 異常
  • q.get(block=True, timeout=None)   移除並返回隊列頭部的一個值,可選參數block默認爲True,表示獲取值的時候,若是隊列爲空,則阻塞,爲False時,不阻塞,若此時隊列爲空,則引起 queue.Empty異常。 可選參數timeout,表示會阻塞設置的時候,事後,若是隊列爲空,則引起Empty異常。
import threading
import queue

que = queue.Queue(10)

def s(i):
    que.put(i)

def x(i):
    g = que.get(i)
    print('get',g)

for i in range(1,13):
    t = threading.Thread(target=s,args=(i,))
    t.start()

for i in range(1,11):
    t = threading.Thread(target=x,args=(i,))
    t.start()

print('size',que.qsize())

結果爲:
    get 1
    get 2
    get 3
    get 4
    get 5
    get 6
    get 7
    get 8
    get 9
    get 10
    size 

 

Python協程

  線程和進程的操做是由程序觸發系統接口,最後的執行者是系統;而協程的操做則是程序員

  協程存在的意義:對於多線程應用,cpu經過切片來切換線程間的執行,線程切換時須要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼執行順序。

  協程的適用場景:當程序中存在大量不須要cpu的操做時(IO),適用於協程。例如:爬蟲

greenlet

from greenlet import greenlet
 
 
def test1():
    print 12
    gr2.switch()
    print 34
    gr2.switch()
 
 
def test2():
    print 56
    gr1.switch()
    print 78
 
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

gevent

import gevent
 
def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')
 
def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')
 
gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])
 1 from gevent import monkey; monkey.patch_all()
 2 import gevent
 3 import urllib2
 4 
 5 def f(url):
 6     print('GET: %s' % url)
 7     resp = urllib2.urlopen(url)
 8     data = resp.read()
 9     print('%d bytes received from %s.' % (len(data), url))
10 
11 gevent.joinall([
12         gevent.spawn(f, 'https://www.python.org/'),
13         gevent.spawn(f, 'https://www.yahoo.com/'),
14         gevent.spawn(f, 'https://github.com/'),
15 ])
遇到IO自動切換

 

            歡迎你們對個人博客內容提出質疑和提問!謝謝

 

                                                                             筆者:拍省先生

相關文章
相關標籤/搜索