Python全棧開發之十一、進程和線程

1、線程

  多任務能夠由多進程完成,也能夠由一個進程內的多線程完成,一個進程內的全部線程,共享同一塊內存python中建立線程比較簡單,導入threading模塊,下面來看一下代碼中如何建立多線程。html

def f1(i):
    time.sleep(1)
    print(i)

if __name__ == '__main__':
    for i in range(5):
        t = threading.Thread(target=f1, args=(i,))
        t.start()
    print('start')          # 主線程等待子線程完成,子線程併發執行


>>start
>>2
>>1
>>3
>>0
>>4

  主線程從上到下執行,建立5個子線程,打印出'start',而後等待子線程執行完結束,若是想讓線程要一個個依次執行完,而不是併發操做,那麼就要使用join方法。下面來看一下代碼前端

import threading
import time

def f1(i):
    time.sleep(1)
    print(i)

if __name__ == '__main__':
    for i in range(5):
        t = threading.Thread(target=f1, args=(i,))
        t.start()
        t.join()
    print('start')      # 線程從上到下依次執行,最後打印出start

>>0
>>1
>>2
>>3
>>4
>>start

  上面的代碼不適用join的話,主線程會默認等待子線程結束,纔會結束,若是不想讓主線程等待子線程的話,能夠子線程啓動以前設置將其設置爲後臺線程,若是是後臺線程,主線程執行過程當中,後臺線程也在進行,主線程執行完畢後,後臺線程不論成功與否,均中止,前臺線程則相反,若果不加指定的話,默認爲前臺線程,下面從代碼來看一下,如何設置爲後臺線程。例以下面的例子,主線程直接打印start,執行完後就結束,而不會去等待子線程,子線程中的數據也就不會打印出來python

import threading
import time

def f1(i):
    time.sleep(1)
    print(i)

if __name__ == '__main__':
    for i in range(5):
        t = threading.Thread(target=f1, args=(i,))
        t.setDaemon(True)
        t.start()

    print('start')      # 主線程不等待子線程

>> start 

  除此以外,本身還能夠爲線程自定義名字,經過 t = threading.Thread(target=f1, args=(i,), name='mythread{}'.format(i)) 中的name參數,除此以外,Thread還有一下一些方法git

  • t.getName() : 獲取線程的名稱
  • t.setName() : 設置線程的名稱 
  • t.name : 獲取或設置線程的名稱
  • t.is_alive() : 判斷線程是否爲激活狀態
  • t.isAlive() :判斷線程是否爲激活狀態
  • t.isDaemon() : 判斷是否爲守護線程

2、線程鎖

  因爲線程是共享同一分內存的,因此若是操做同一份數據,很容易形成衝突,這時候就能夠爲線程加上一個鎖了,這裏咱們使用Rlock,而不使用Lock,由於Lock若是屢次獲取鎖的時候會出錯,而RLock容許在同一線程中被屢次acquire,可是須要用n次的release才能真正釋放所佔用的瑣,一個線程獲取了鎖在釋放以前,其餘線程只有等待。 github

import threading
G = 1
lock = threading.RLock()
def fun():
    lock.acquire()    # 獲取鎖
    global G
    G += 2
    print(G, threading.current_thread().name)
    lock.release()   # 釋放鎖
    return


for i in range(10):
    t = threading.Thread(target=fun, name='t-{}'.format(i))
    t.start()

3 t-0
5 t-1
7 t-2
9 t-3
11 t-4
13 t-5
15 t-6
17 t-7
19 t-8
21 t-9    

3、線程間通訊Event

Event是線程間通訊最間的機制之一,主要用於主線程控制其餘線程的執行,主要用過wait,clear,set,這三個方法來實現的的,下面來看一個簡單的例子,windows

import threading
import time

def f1(event):
    print('start:')
    event.wait()            # 阻塞在,等待 set
    print('end:')

if __name__ == '__main__':
    event_obj  = threading.Event()
    for i in range(5):
        t = threading.Thread(target=f1, args=(event_obj,))
        t.start()

    event_obj.clear()      # 清除標誌位 
    inp = input('>>>>:')
    if inp == 'true':
        event_obj.set()   # 設置標誌位

4、隊列  

  能夠簡單的理解爲一種先進先出的數據結構,好比用於生產者消費者模型,或者用於寫線程池,以及前面寫select的時候,讀寫分離時候可用隊列存儲數據等等,之後用到隊列的地方不少,所以對於隊列的用法要熟練掌握。下面首先來看一下隊列提供了哪些用法數組

q = queue.Queue(maxsize=0)  # 構造一個先進顯出隊列,maxsize指定隊列長度,爲0時,表示隊列長度無限制。

q.join()        # 等到隊列爲kong的時候,在執行別的操做
q.qsize()       # 返回隊列的大小 (不可靠)
q.empty()       # 當隊列爲空的時候,返回True 不然返回False (不可靠)
q.full()        # 當隊列滿的時候,返回True,不然返回False (不可靠)
q.put(item, block=True, timeout=None)   # 將item放入Queue尾部,item必須存在,參數block默認爲True,表示當隊列滿時,會等待
                        # 爲False時爲非阻塞,此時若是隊列已滿,會引起queue.Full 異常。 可選參數timeout,表示會阻塞設置的時間,
                        # 若是在阻塞時間裏 隊列仍是沒法放入,則引起 queue.Full 異常

q.get(block=True, timeout=None)     #  移除並返回隊列頭部的一個值,可選參數block默認爲True,表示獲取值的時候,若是隊列爲空,則阻塞
                       #  阻塞的話若此時隊列爲空,則引起queue.Empty異常。 可選參數timeout,表示會阻塞設置的時間,
q.get_nowait()               #  等效於 get(item,block=False) 

下面用代碼來簡單的演示下,消費者生成者模型,只是簡單的演示下。數據結構

message = queue.Queue(10)

def product(num):
    for i in range(num):
        message.put(i)
        print('將{}添加到隊列中'.format(i))
        time.sleep(random.randrange(0, 1))


def consume(num):
    count = 0
    while count<num:
        i = message.get()
        print('將{}從隊列取出'.format(i))
        time.sleep(random.randrange(1, 2))
        count += 1


t1 = threading.Thread(target=product, args=(10, ))
t1.start()

t2 = threading.Thread(target=consume, args=(10, ))
t2.start()

5、進程 

  線程的上一級就是進程,進程可包含不少線程,進程和線程的區別是進程間的數據不共享,多進程也能夠用來處理多任務,不過多進程很消耗資源,計算型的任務最好交給多進程來處理,IO密集型最好交給多線程來處理,此外進程的數量應該和cpu的核心說保持一致。多線程

在windows中不能用fork來建立多進程,所以只能導入multiprocessing,來模擬多進程,下面首先來看一下怎麼建立進程,你們能夠先猜一下下面的結果是什麼併發

l = []

def f(i):
    l.append(i)
    print('hi', l)

if __name__ == '__main__':
    for i in range(10):
        p = multiprocessing.Process(target=f, args=(i,))        # 數據不共享,建立10份 l列表
        p.start()

6、進程間數據共享  

進程間的數據是不共享的,可是我若是非要數據共享了,那麼就須要用其餘方式了 

一、Value,Array

def f(a, b):
    a.value = 3.111
    for i in range(len(b)):
        b[i] += 100

if __name__ == '__main__':
    num = Value('f', 3.333)        # 相似C語言中的 浮點型數
    l = Array('i', range(10))       # 相似C語言中的整形數組,長度爲10
    print(num.value)
    print(l[:])

    p = Process(target=f, args=(num, l))
    p.start()
    p.join()
    print(num.value)              # 你們本身運行一下,看下兩次打印結果是否同樣
    print(l[:])

二、manage  

方式一,使用的都是C語言中的數據結構,若是你們對c不熟悉的話,用起來比較麻煩,方式2就能夠支持python自帶的數據,下面來看一下

from multiprocessing import Process,Manager

def Foo(dic, i):
    dic[i] = 100 + i
    print(dic.values())

if __name__ == '__main__':
    manage = Manager()
    dic = manage.dict()

    for i in range(2):
        p = Process(target=Foo, args=(dic, i))
        p.start()
        p.join()

7、進程池  

  實際應用中,並非每次執行任務的時候,都去建立多進程,而是維護了一個進程池,每次執行的時候,都去進程池取一個,若是進程池裏面的進程取光了,就會阻塞在那裏,直到進程池中有可用進程爲止。首先來看一下進程池提供了哪些方法

  • apply(func[, args[, kwds]]) :使用arg和kwds參數調用func函數,結果返回前會一直阻塞,因爲這個緣由,apply_async()更適合併發執行,另外,func函數僅被pool中的一個進程運行。

  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一個變體,會返回一個結果對象。若是callback被指定,那麼callback能夠接收一個參數而後被調用,當結果準備好回調時會調用callback,調用失敗時,則用error_callback替換callback。 Callbacks應被當即完成,不然處理結果的線程會被阻塞。

  • close() : 等待任務完成後在中止工做進程,阻止更多的任務提交到pool,待任務完成後,工做進程會退出。

  • terminate() : 無論任務是否完成,當即中止工做進程。在對pool對象進程垃圾回收的時候,會當即調用terminate()。

  • join() : 等待工做線程的退出,在調用join()前,必須調用close() or terminate()。這樣是由於被終止的進程須要被父進程調用wait(join等價與wait,不然進程會成爲殭屍進程。

下面來簡單的看一下代碼怎麼用的

from multiprocessing import Pool
import time

def f1(i):
    time.sleep(1)
    # print(i)
    return i

def cb(i):
    print(i)

if __name__ == '__main__':
    poo = Pool(5)
    for i in range(20):
        # poo.apply(func=f1, args=(i,))   # 串行執行,排隊執行 有join
        poo.apply_async(func=f1, args=(i,), callback=cb)  # 併發執行 主進程不等子進程,無join
    print('**********')

    poo.close()
    poo.join()

8、線程池 

  對於前面的進程池,python自帶了一個模塊Pool供咱們使用,可是對於線程池,則沒有提供,所以須要咱們本身寫,本身寫的話,就須要用到隊列,下面咱們來看一下本身怎麼實現一個線程池,首先寫一個最簡單的版本。 

import threading
import time
import queue

class ThreadPool:
    def __init__(self, max_num=20):
        self.queue = queue.Queue(max_num)
        for i in range(max_num):
            self.add()

    def add(self):
        self.queue.put(threading.Thread)

    def get(self):
        return self.queue.get()

def f(tp, i):
    time.sleep(1)
    print(i)
    tp.add()

p = ThreadPool(10)
for i in range(20):
    thread = p.get()
    t = thread(target=f, args=(p, i))
    t.start()

上述代碼寫了一個線程池類,基本實現了線程池的功能,可是有不少缺點,沒有實現回掉函數,每次執行任務的時候,任務處理函數每次執行完都須要自動執行對象的add方法,將線程對象添加到隊列中去,並且類初始化的時候,一次性將全部的線程類都添加到隊列中去了,總之上面的線程池雖然實現簡單,可是實際上卻有不少問題,下面來看一個真正意義上的線程池。

  在寫代碼以前,咱們先來看一下該怎麼設計這樣一個線程池,上面的線程池,咱們的隊列中,存的是線程類,咱們每處理一個任務都實例化一個線程,而後執行完了以後,該線程就被丟棄了,這樣有點不合適。咱們此次設計的時候,

  1. 隊列中存的不是線程類,而是任務,咱們從隊列中拿取的都是任務
  2. 每次執行任務的時候,不是都要生成一個線程,而是若是之前生成的線程有空閒的話,就用之前的線程
  3. 支持回掉機制,支持close,terminate

下面來一下代碼是怎麼實現的

import threading
import queue
import time
import contextlib

class ThreadingPool:
    def __init__(self, num):
        self.max = num
        self.terminal = False
        self.q = queue.Queue()
        self.generate_list = []         # 保存已經生成的線程
        self.free_list = []             # 保存那些已經完成任務的線程

    def run(self, func, args=None, callbk=None):
        self.q.put((func, args, callbk))            # 將任務信息做爲一個元祖放到隊列中去
        if len(self.free_list) == 0 and len(self.generate_list) < self.max:
           self.threadstart()

    def threadstart(self):
        t = threading.Thread(target=self.handel)
        t.start()

    def handel(self):
        current_thread = threading.current_thread()
        self.generate_list.append(current_thread)
        event = self.q.get()
        while event != 'stop':
            func, args, callbk = event
            flag = True
            try:
                ret = func(*args)
            except Exception as e:
                flag = False
                ret = e

            if callbk is not None:
                try:
                    callbk(ret)
                except Exception as e:
                    pass

            if not self.terminal:
                with self.auto_append_remove(current_thread):
                    event = self.q.get()
            else:
                event = 'stop'
        else:
            self.generate_list.remove(current_thread)

    def terminate(self):
        self.terminal = True

        while self.generate_list:
            self.q.put('stop')
        self.q.empty()

    def close(self):
        num = len(self.generate_list)
        while num:
            self.q.put('stop')
            num -= 1

    @contextlib.contextmanager
    def auto_append_remove(self, thread):
        self.free_list.append(thread)
        try:
            yield
        finally:
            self.free_list.remove(thread)

def f(i):
    # time.sleep(1)
    return i

def f1(i):
    print(i)

p = ThreadingPool(5)
for i in range(20):
    p.run(func=f, args=(i,), callbk=f1)

p.close()

9、協程 

協程,又稱微線程,協程執行看起來有點像多線程,可是事實上協程就是隻有一個線程,所以,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優點就越明顯,此外由於只有一個線程,不須要多線程的鎖機制,也不存在同時寫變量衝突。協程的適用場景:當程序中存在大量不須要CPU的操做時(IO)下面來看一個利用協程例子

from gevent import monkey
import gevent
import requests

# 把標準庫中的thread/socket等給替換掉
# 這樣咱們在後面使用socket的時候能夠跟日常同樣使用,無需修改任何代碼,可是它變成非阻塞的了.
monkey.patch_all()      # 猴子補丁

def f(url):
    print('GET: %s' % url)
    resp = requests.get(url)
    data = resp.text
    print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://github.com/'),
])

上面的例子,利用協程,一個線程完成全部的請求,發出請求的時候,不會等待回覆,而是一次性將全部的請求都發出求,收到一個回覆就處理一個回覆,這樣一個線程就解決了全部的事情,效率極高。

10、小結 

這篇博文是pyton基礎知識的最後一篇,後面會講的博文會講開始講前端的知識,這裏附上目錄http://www.cnblogs.com/Wxtrkbc/p/5606048.html,之後會繼續更新的,

相關文章
相關標籤/搜索