進程池與線程池、協程、協程實現TCP服務端併發、IO模型

進程池與線程池、協程、協程實現TCP服務端併發、IO模型

1、進程池與線程池

一、線程池

'''
開進程開線程都須要消耗資源,只不過二者比較的狀況下線程消耗的資源比較少
在計算機可以承受範圍內最大限度的利用計算機
什麼是池?
	在保證計算機硬件安全的狀況下最大限度的利用計算機
	池實際上是下降了程序的運行效率,可是保證了計算機硬件的安全
	(硬件的發展跟不上軟件的速度)
'''
from concurrent.futures import ThreadPoolExecutor
import time

pool = ThreadPoolExecutor(5)  # 括號中能夠傳參數,指定線程池內的線程個數,也能夠不傳,不傳默認是當前所在計算機的CPU個數乘5


def task(n):
    print(n)
    time.sleep(2)


# pool.submit(task, 1)  # 朝線程池中提交任務,異步提交

'''
任務的提交方式:
    同步:原地等待任務的返回結果
    異步:不等待任務的返回結果,直接執行下一行代碼
異步的結果怎麼拿?
'''
for i in range(20):
    res = pool.submit(task, i)  # 任務(task)的返回結果,是Future類的一個對象
    print(res)  # <Future at 0x31aeeb0 state=pending>,這個對象是及時生成的,因此能夠立馬返回,不改變異步執行
    # 可是res的值是在任務執行完之後纔會有
    # print(res.result())  # 經過result取值,而且是原地等待結果的返回,這一行代碼直接將異步執行改成了同步執行
# 若是仍是想要程序異步執行,同時還能拿到任務的返回結果,就要用一個列表將res所有放進去,待任務所有提交完之後,再for循環拿出res的值

# 異步提交任務,待任務所有執行完畢後,拿到任務的返回值
from concurrent.futures import ThreadPoolExecutor
import time


def task(n):
    print(n)
    time.sleep(1)
    return n ** 2


pool = ThreadPoolExecutor(5)
t_list = []
for i in range(20):
    res = pool.submit(task, i)
    t_list.append(res)
pool.shutdown()  # 關閉池子,等待池子中全部的任務執行完畢後,纔會往下運行代碼
for t in t_list:
    print('>>>>:', t.result())

二、進程池+異步回調機制

from concurrent.futures import ProcessPoolExecutor
import time
import os

pool = ProcessPoolExecutor(5)


def task(n):
    print(n,os.getpid())  #獲取當前進程號
    time.sleep(1)
    return n ** 2


def call_back(n):
    print('拿到了異步提交任務的返回結果', n.result())


if __name__ == '__main__':
    t_list = []
    for i in range(20):
        res = pool.submit(task, i).add_done_callback(call_back)  # 提交任務的時候,綁定一個回調函數,一旦該任務有結果,馬上執行對應的回調函數
        t_list.append(res)

pool.shutdown()
for t in t_list:
    print('>>>:', t.result())
'''
異步回調機制:當異步提交的任務有返回結果以後,會自動觸發回調函數的執行
根據打印出的進程號,能夠發現:
	池子中建立的進程建立一次就不會再建立了
	至始至終用的都是最初的那幾個
	這樣的話節省開闢進程的資源
上述結論對線程一樣適用
'''

2、協程

進程:資源單位
線程:執行單位
協程:單線程下實現併發
併發:切換+保存狀態
	ps:看起來像是同時運行的,就能夠稱之爲併發
協程:徹底是程序員本身意淫出來的名詞
	單線程下實現併發
併發的條件:多道技術
	空間上的複用:共用同一套操做系統
    時間上的複用:切換+保存狀態
程序員本身經過代碼本身檢測程序中的IO
一旦遇到IO本身經過代碼切換
給操做系統的感受就是你這個線程沒有任何的IO
ps:欺騙操做系統,讓他誤覺得你這個程序一直沒有IO
	從而保證程序在運行態和就緒態來回切換
    提高代碼的運行效率
切換+保存狀態就必定可以提高效率嗎?
	當你的任務是IO密集型的狀況下	提高效率
    若是你的任務是計算密集型的	下降效率
極限提高CPU工做效率的方式:
	多進程下開多線程
    多線程下再開協程
# 串行執行  1.5458002090454102
import time


def func1():
    for i in range(10000000):
        i + 1


def func2():
    for i in range(10000000):
        i + 1


start = time.time()
func1()
func2()
stop = time.time()
print(stop - start)

# 基於yield併發執行  2.3516733646392822
# yield能夠保存上一次的結果
import time


def func1():
    while True:
        10000000 + 1
        yield


def func2():
    g = func1()
    for i in range(10000000):
        # time.sleep(100)  # 模擬IO,yield並不會捕捉到並自動切換
        i + 1
        next(g)


start = time.time()
func2()
stop = time.time()
print(stop - start)
'''
須要找到一個可以識別IO的一個工具————gevent模塊,這是一個第三方模塊,須要咱們手動下載
'''
from gevent import monkey;monkey.patch_all()
from gevent import spawn
import time
'''
注意gevent模塊沒辦法自動識別time.sleep()等IO狀況
須要你手動再配置一個參數
from gevent import monkey;monkey.patch_all(),使spawn可以監測time.sleep()等IO
因爲該模塊常用,因此建議寫成一行
'''


def heng():
    print('哼')
    time.sleep(2)
    print('哼')


def ha():
    print('哈')
    time.sleep(3)
    print('哈')


def heiheihei():
    print('嘿嘿嘿')
    time.sleep(4)
    print('嘿嘿嘿')


start = time.time()
g1 = spawn(heng)  # 對傳入的函數名,加括號自動調用,而且監測其狀態
g2 = spawn(ha)
g3 = spawn(heiheihei)
g1.join()  # 等待任務運行完畢
g2.join()
g3.join()
print(time.time() - start)  # 4.0027806758880615

3、經過協程實現TCP服務端併發

# 服務端
from gevent import monkey;monkey.patch_all()
from gevent import spawn
import socket

server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)


def talk(conn):
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0: break
            print(data.decode('utf-8'))
            conn.send(data.upper())
        except ConnectionResetError as e:
            print(e)
            break
    conn.close()


def server1():
    while True:
        conn, addr = server.accept()
        spawn(talk, conn)


if __name__ == '__main__':
    g1 = spawn(server1)
    g1.join()
    
    
# 客戶端
import socket
from threading import Thread, current_thread


def client1():
    client = socket.socket()
    client.connect(('127.0.0.1', 8080))
    n = 0
    while True:
        data = f'{current_thread().name} {n}'
        client.send(data.encode('utf-8'))
        res = client.recv(1024)
        print(res.decode('utf-8'))
        n += 1


for i in range(400):
    t = Thread(target=client1)
    t.start()
相關文章
相關標籤/搜索