8_15 併發編程4,線程池與協程,io模型

一。線程池併發

  線程池是一個處理線程任務的集合,他是能夠接受必定量的線程任務,並建立線程,處理該任務,處理結束後不會馬上關閉池子,會繼續等待提交的任務,也就是他們的進程/線程號不會改變。app

  當線程池中的任務沒有結束時是不會接受下一個任務的。異步

  它的操做有:socket

  pool = ThreadPoolExecutor()tcp

  建立一個線程池,其中括號中表明的是一次能夠接納的線程任務,能夠不加參數,不加參數其數量就是當前cpu的個數*5。函數

  res = pool.submit(func,args)spa

  提交一個任務,args表明的是函數的參數。res接受的是該submit的返回值,相似於以下的類:線程

<Future at 0x2057e656940 state=running>

  state表明的是當前該線程的狀態。3d

  res.result()code

  而使用result能夠將提交的任務函數的返回值獲取。

  這裏的result還有等待任務的返回值的做用。若是任務沒結束,就會一直等待,能夠將並行操做改爲串行操做。

  pool.shutdown()

  能夠將池子關閉,並等待池子終端 任務所有結束再執行下面代碼。  

例子:

import time
from concurrent.futures import ThreadPoolExecutor
import os
from gevent import os

pool = ThreadPoolExecutor(5)

def task(n):
    print(n,os.getpid())
    time.sleep(2)
    return n**2


list_1 = []
for i in range(20):
    res = pool.submit(task,i) #提交任務
    print(res.result())#等待任務的返回值
    list_1.append(res)

pool.shutdown() #關閉池子,等待池子中的任務運行完畢
for j in list_1:
    print('>>>',j.result())
print('')

  進程池:

  進程池的使用和線程池差很少,區別僅只有包名不一樣,在進程池中咱們能夠驗證如下池中的進程/線程是不是用的一樣的進程/線程,使用os。getpid()方法便可。

  進程值不傳值,裏面的數值默認時cpu的個數。

import time
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import os
from gevent import os

# pool = ThreadPoolExecutor(5)
pool = ProcessPoolExecutor(5)


def task(n):
    print(n,os.getpid())
    time.sleep(2)
    return n**2

def callback(n):
    print(n.result())

if __name__ == '__main__':
    list_1 = []
    for i in range(20):
        res = pool.submit(task,i).add_done_callback(callback) #提交任務
        # print(res.result())#等待任務的返回值
        list_1.append(res)

  異步回調:

  除了上面使用的將返回的future對象添加到列表,再調用result()方法返回其返回值之外,還能夠對指派任務的返回值用

add_done_callback(callback)

  方法,將該對象回調到callback(能夠自定義)函數,由函數接納處理該值,函數的參數就是任務的返回值,多個返回值要設置多個參數。

  回調是在生產返回值時就運行的。

二。協程

  協程就是在單線程的狀況下實現併發。

  通常程序的多道技術都是用 切換+保存狀態 實現

  在通常的cpu運算時,都是在五種狀態中來回切換的,程序運行的5種狀態:

  1.新建。2.就緒。3.運行。4.阻塞。5.結束。

  通常的,程序都是在2,3,4的狀態來回切換,有2種狀況。

  1,程序遇到了io操做,由運行態進入到了阻塞態,直到io操做結束後再到阻塞態等待時間片。

  2.程序的時間片用完,由運行態到就緒態。

  協程的做用就是使得線程遇到io操做本身切換,運行的方式從1.變成2.線程持續不斷的就緒,能夠得到大量的cpu運算時間。

  要實現這個功能須要考慮線程的保存狀態問題。

  這裏就要用到迭代器的知識,yield,

  yield能夠保存上一次操做的狀態,因此使用yield能夠驗證協程對計算密集型的線程操做後是否能加快效率。

#串行執行 0.8540799617767334
# import time
#
# def func1():
#     for i in range(10000000):
#         i+1
#
# def func2():
#     for i in range(10000000):
#         i+1
#
# start = time.time()
# func1()
# func2()
# stop = time.time()
# print(stop - start)
#基於yield併發執行  1.3952205181121826
# import time
# def func1():
#     while True:
#         10000000+1
#         yield
#
# def func2():
#     g=func1()
#     for i in range(10000000):
#         time.sleep(100)  # 模擬IO,yield並不會捕捉到並自動切換
#         i+1
#         next(g)
#
# start=time.time()
# func2()
# stop=time.time()
# print(stop-start)

  能夠看到,在計算密集的線程中,不斷切換線程是不利於程序的運行的。

  而yield不能識別io操做,而進行線程之間的切換的,因此須要引入一個模塊gevent。

  gevent是一個能夠識別io的魔塊,但不能識別time.sleep,因此還要調用另一個模塊識別time.sleep。

from gevent import monkey;monkey.patch_all() 
# 因爲該模塊常常被使用 因此建議寫成一行 from gevent import spawn import time

  spawn()能夠檢測()中的全部任務

def heng():
    print("")
    time.sleep(2)
    print('')

def ha():
    print('')
    time.sleep(3)
    print('')

def heiheihei():
    print('嘿嘿嘿')
    time.sleep(5)
    print('嘿嘿嘿')

start = time.time()
g1 = spawn(heng)
g2 = spawn(ha)  # spawn會檢測全部的任務
g3 = spawn(heiheihei)
g1.join()
g2.join()
g3.join()
# heng()
# ha()
print(time.time() - start)
哼
哈
嘿嘿嘿
哼
哈
嘿嘿嘿
5.033252716064453

  本來10秒鐘的程序,如今須要5秒鐘就能夠運行結束了。

  spawn能夠將全部線程添加至一個列表,輪流運行其沒有io操做的部分。

  spawn有一個返回值g

  注意,須要在程序最後等待全部程序都運行結束才結束程序,使用g.join方法。

三。使用gevent實現tcp的併發

from gevent import monkey;monkey.patch_all()
import socket
from gevent import spawn


server = socket.socket()
server.bind(('127.0.0.1',8080))
server.listen(5)


def talk(conn):
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0:break
            print(data.decode('utf-8'))
            conn.send(data.upper())
        except ConnectionResetError as e:
            print(e)
            break
    conn.close()

def server1():
    while True:
        conn, addr = server.accept()
        spawn(talk,conn)

if __name__ == '__main__':
    g1 = spawn(server1)
    g1.join()

四。IO模型。

  1.阻塞型IO

  阻塞型io是在進行io操做時,先跳入阻塞態,而後等待數據。

  數據得到後拷貝數據,

  最後再進入就緒態,

  其中等待數據和拷貝數據都是再阻塞狀態:

  2.非阻塞io

  非阻塞io是在遇到io操做時,先發送接受數據請求,若是沒有數據就返回一個沒有的信號,以後會反覆發送數據請求,直到有數據爲止,這種模型很佔cpu操做。

  3.IO多路複用

  這個模型中有一個select,是一個監測機制,相似於列表,管理io操做。

  當須要進行io操做時,調用select尋找數據,若是找到數據就返回數據,

  等待的操做所有交給select。

  4.異步IO(asyn。。。。)

   在遇到io操做時,有一個回調機制,當須要io操做時,回調機制(內存中)會去尋找數據,當尋找到數據後會返回數據

相關文章
相關標籤/搜索