溫故而知新--day4

溫故而知新--day4

進程與線程

進程本質上就是一段程序的運行過程,由程序、數據集、進程控制塊組成。每一個進程都有本身的地址空間、數據棧以及其餘用於跟蹤進程執行的輔助數據。操做系統管理全部的進程,併爲他們合理分配資源。
線程是進程中的執行單元,能夠共享進程中的資源。
進程之間是相互獨立的,因此進程是最小的資源單位。html

關於並行和併發python

並行:系統能同時處理多個任務
併發:系統能夠處理多個任務git

線程

簡單使用

import threading
import os


def work(num1, num2, name, **kwargs):
    print(num1, num2)   # 12 123
    print(name)         # lczmx
    print(kwargs)       # {'age': 20}
    print("pid:", os.getpid())  # pid: 12932


if __name__ == "__main__":

    t1 = threading.Thread(target=work, args=(12, 123),
                          kwargs={"name": "lczmx", "age": 20})
    t2 = threading.Thread(target=work, args=(1, 3),
                          kwargs={"name": "xxx", "age": 20})

    t1.start()	# 開始線程活動
    t1.join()	# 等待,直到線程終結

    t2.start()
    t2.join()
    print("pid:", os.getpid())  # pid: 12932

定義一個類,繼承threading.Thread、重寫run方法也能夠
重寫__init__方法的話要super.__init__()github

import threading


class MyThread(threading.Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        pass

    def func2(self):
        pass


if __name__ == '__main__':
    t = MyThread()
    t.start()
    t.join()

Threading對象的方法web

is_alive(): 返回線程是否活動的。
getName(): 返回線程名,也能夠在建立時經過name參數指定。
setName(): 設置線程名。算法

守護線程

默認狀況下,主線程會等到全部子線程執行完以後纔會退出,但守護線程並不會。
線程
守護線程就是跟隨主線程一塊兒結束的線程,守護線程經過setDaemon方法實現,其內部時設置daemon屬性,能夠被繼承,因此daemon默認爲False。編程

import threading
import time


def work(sleep_time=0.5):
    time.sleep(sleep_time)
    print("sleep time: ", sleep_time)


if __name__ == "__main__":

    t1 = threading.Thread(target=work, args=(1,))
    t1.setDaemon(True)      # setDaemon要在start以前
    t1.start()
    print("exit")

注意如下例子segmentfault

import threading
import time


def work(sleep_time=0.5):
    time.sleep(sleep_time)
    print("sleep time: ", sleep_time)


if __name__ == "__main__":

    t1 = threading.Thread(target=work, args=(1,))       # 1秒
    t2 = threading.Thread(target=work, args=(3,))       # 3秒
    t1.setDaemon(True)
    t2.setDaemon(True)

    t1.start()
    t2.start()
    time.sleep(2)                                       # 2秒

    print(t1.is_alive())    # False
    print(t2.is_alive())    # True

鎖主要時用來解決在cpu切換時造程序取得的數據不一樣步的問題。
好比這個例子:windows

from threading import Thread
import os
import time


def work():
    global n
    temp = n
    time.sleep(0.01)
    n = temp - 1


if __name__ == '__main__':
    n = 100
    l = []
    for i in range(100):
        p = Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n)  # 結果可能爲99或98,但幾乎不爲零

爲了解決這個問題,咱們可使用串行的方式讓全部的代碼按照順序執行,可是這就失去了多線程的意義。那麼只要串行部分代碼就既能享受多線程的優點,又能夠保證數據的安全了。也就是說,鎖作的工做就是使操做數據的那部分代碼串行。安全

互斥鎖

使用threading.Lock獲取一把鎖,它由一個acquire()release()方法控制鎖定和釋放。

from threading import Thread, Lock
import os
import time

lock = Lock()


def work():
    global n
    lock.acquire()
    temp = n
    time.sleep(0.01)
    n = temp - 1
    lock.release()


if __name__ == '__main__':
    n = 100
    l = []
    for i in range(100):
        p = Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n)  # 如今結果爲0

死鎖

上面說過,鎖就是把部分代碼變爲串行,只有當鎖被釋放後才能執行後面的代碼。死鎖的一個緣由是互斥,還有多是粗枝大葉,忘記release()了。

from threading import Thread, Lock
import time

lockA = Lock()
lockB = Lock()


class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        lockA.acquire()
        print("%s得到鎖A" % self.name)

        lockB.acquire()
        print("%s得到鎖B" % self.name)
        lockB.release()
        print("%s釋放鎖B" % self.name)
        lockA.release()
        print("%s釋放鎖A" % self.name)

    def func2(self):
        lockB.acquire()
        print("%s得到鎖B" % self.name)
        time.sleep(2)

        lockA.acquire()
        print("%s得到鎖A" % self.name)
        lockA.release()
        print("%s釋放鎖A" % self.name)

        lockB.release()
        print("%s釋放鎖B" % self.name)


if __name__ == '__main__':
    for i in range(10):
        t = MyThread(name="線程%d" % i)
        t.start()
        """ 線程0得到鎖A 線程0得到鎖B 線程0釋放鎖B 線程0釋放鎖A 線程0得到鎖B 線程1得到鎖A 卡死了 """

解決死鎖的好方式就是用遞歸鎖,而使用通常的鎖的話能夠用with關鍵詞,以防忘記釋放鎖了。

遞歸鎖

遞歸鎖也是鎖,其內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:

from threading import Thread, RLock
import time

lockA = lockB = RLock()


class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        lockA.acquire()
        print("%s得到鎖A" % self.name)

        lockB.acquire()
        print("%s得到鎖B" % self.name)
        lockB.release()
        print("%s釋放鎖B" % self.name)
        lockA.release()
        print("%s釋放鎖A" % self.name)

    def func2(self):
        lockB.acquire()
        print("%s得到鎖B" % self.name)
        time.sleep(2)

        lockA.acquire()
        print("%s得到鎖A" % self.name)
        lockA.release()
        print("%s釋放鎖A" % self.name)

        lockB.release()
        print("%s釋放鎖B" % self.name)


if __name__ == '__main__':
    for i in range(10):
        t = MyThread(name="線程%d" % i)
        t.start()

semaphore 信號量

信號量就是一把鎖,以前咱們說的threading.Lock是互斥鎖(Mutual exclusion,縮寫 Mutex)實質上就是信號量爲一的情景。信號量能夠用來限定某些資源能夠同時由幾個線程訪問,訪問時一樣要acquire,出來時一樣也release。

import threading
import time

sm = threading.Semaphore(5)


def foo():
    sm.acquire()
    # 打印當前線程的名字
    print("%s ..." % threading.current_thread().getName())
    time.sleep(1)
    sm.release()


if __name__ == "__main__":
    for i in range(9):
        t = threading.Thread(target=foo)
        t.start()

GIL

關於GIL(global interpreter lock),點擊這裏

線程間通訊

  1. event 同步條件
    因爲線程之間是相互獨立的,彼此不能直接確認狀態,爲此python提供了threading.Event對象,能夠在不一樣線程間傳遞狀態,其由一下方法:

    • .wait(timeout=None),event變爲True,timeout爲None時,爲阻塞;反之則爲等待秒數(非阻塞)

    • .set(),設置event的值爲True

    • .clear(),恢復event的狀態值爲False。

    • .is_set,返回event狀態值

      根據上述,能夠把Event的狀況分爲如下幾種:
      event

    import threading
    import time
    e = threading.Event()
    
    
    def foo():
    	print("event狀態:", e.is_set())
    	print("等待。。。。")
    	if e.wait():    # 默認阻塞
    		print("event狀態:", e.is_set())
    		print("收到同步條件,ok")
    
    
    def bar():
    	time.sleep(2)
    	e.set()
    
    
    if __name__ == '__main__':
    	f = threading.Thread(target=foo)
    	b = threading.Thread(target=bar)
    
    	f.start()
    	b.start()
  2. queue 線程隊列
    線程隊列特別適用於消息必須安全地在多線程間交換的線程編程,線程隊列有三種類型,在實例化的時候根據需求指定:

    • 先進先出(FILO):queue.Queue(maxsize=0)
    • 後進先出(LIFO):queue.LifoQueue(maxsize=0)
    • 按優先級,使用heapq(堆隊列算法),肯定優先級:queue.PriorityQueue(maxsize=0)

    注:maxsize參數指定隊列的大小,當maxsize <= 0 時,隊列的元素個數沒有限制。

    這三者都返回queue.Queue對象的方法,由於LifoQueuePriorityQueue都繼承queue.Queue,Queue對象擁有如下方法:

    方法 說明
    .put(item, block=True, timeout=None) 將 item 放入隊列,block默認爲True,表示阻塞。優先級隊列的item要包含優先級如:q.put([2, "abc"])
    .get(block=True, timeout=None) 從隊列中移除並返回一個項目。block默認爲True,表示阻塞
    .qsize() 返回隊列的大體大小
    .empty() 若是隊列爲空,返回 True ,不然返回 False
    .full() 若是隊列是滿的返回 True ,不然返回 False 。
    .task_done() 完成一個任務後,向隊列發信號(join()用到)。
    .join() 阻塞到 隊列中全部的元素 都 被 接 收 和 處 理 完畢(根據收到的task_done信號肯定)。

    關於task_done與join:
    Queue內部有一個unfinished_tasks屬性(默認爲0),put時自增1,task_done調用時自減1
    join的邏輯是while self.unfinished_tasks: self.all_tasks_done.wait(),當unfinished_tasks爲0的時候就跳出循環,中止阻塞狀態。

    import threading
    import queue
    import time
    
    q = queue.Queue(5)      # 只存5個元素
    
    
    def worker():
    	while True:
    		print("qsize: ", q.qsize())
    		item = q.get()          # 隊列爲空時會阻塞
    
    		print(f'Working on {item}')
    		time.sleep(0.5)         # 模擬處理數據的時間
    		print(f'Finished {item}')
    		q.task_done()           # 已經執行
    
    
    # 開啓爲worker線程,處理隊列,設置爲守護線程
    threading.Thread(target=worker, daemon=True).start()
    
    
    print("隊列是否爲空:", q.empty())
    
    # 往隊列中添加元素
    for item in range(10):
    	if q.full():
    		print("已經滿了,阻塞。。。。")
    	q.put(item)     # 隊列滿的時候會阻塞
    
    print('所有元素已經放入隊列中')
    
    # 會一直阻塞,知道unfinished_tasks爲0
    q.join()
    print('所有任務已完成')

    PriorityQueue有點特殊,單獨舉例:

    import queue
    
    
    q = queue.PriorityQueue()
    
    q.put([3, "c"])
    q.put([1, "a"])
    q.put([2, "b"])
    
    print(q.get())  # [1, 'a']
    print(q.get())  # [2, 'b']
    print(q.get())  # [3, 'c']

進程

python中使用multiprocessing模塊來實現多進程。

簡單使用

import multiprocessing


def work(num, name, age):
    print(f"num: {num}, name: {name}, age: {age}")


if __name__ == '__main__':      # 不要省略了這個,不然報錯
    p = multiprocessing.Process(target=work, args=(
        1, ), kwargs={"name": "lczmx", "age": 22})

    p.start()
    p.join()

方法二

要重寫__init__方法的話要super.__init__()

import multiprocessing


class MyProcess(multiprocessing.Process):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        print("func1")

    def func2(self):
        print("func2")


if __name__ == "__main__":
    p = MyProcess()
    p.start()

一些經常使用方法

  • multiprocessing.set_start_method('spawn') 設置啓動方法,關於啓動方法類型及介紹見文檔
  • Process對象.terminate() 當即終止進程
  • Process對象.pid 返回進程ID。在生成該進程以前,這將是 None
  • Process對象.daemon 設置守護進程,和守護線程同樣,能夠在建立進程的時候經過daemon形參來設置。
  • Process對象.name 與threading相似
  • Process對象.is_alive 與threading相似
  • Process對象.join 與threading相似

主進程建立守護進程
其一:守護進程會在主進程代碼執行結束後就終止
其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止

進程間通訊

使用多進程時,通常使用消息機制實現進程間通訊,儘量避免使用鎖等同步原語。

  1. 進程隊列 multiprocessing.Queue(maxsize=0)
    進程隊列的方法與queue.Queue的方法很像,經常使用的方法中就沒有task_donejoin(multiprocessing.JoinableQueue(maxsize=0)有這兩個方法,但要必需要手動調用task_done,不然用於統計未完成任務的信號量最終會溢出並拋出異常)
    from multiprocessing import Process, Queue
    
    
    def f(q):
    	q.put([42, None, 'hello'])
    
    
    if __name__ == '__main__':
    	q = Queue()
    	p = Process(target=f, args=(q,))
    	p.start()
    	print(q.get())    # [42, None, 'hello']
    	p.join()
  2. 管道 multiprocessing.Pipe
    conn1, conn2 = multiprocessing.Pipe([duplex])conn1和conn2是一對 Connection 對象, 分別表示管道的兩端。
    若是 duplex 被置爲 True (默認值),那麼該管道是雙向的。若是 duplex 被置爲 False ,那麼該管道是單向的,即 conn1 只能用於接收消息,而 conn2 僅能用於發送消息。
    Connection對象的經常使用方法:
    • send(obj)
      將一個對象發送到鏈接的另外一端,能夠用 recv() 讀取。
      發送的對象必須是能夠序列化的,過大的對象 ( 接近 32MiB+ ,這個值取決於操做系統 ) 有可能引起 ValueError 異常。
    • recv()
      返回一個由另外一端使用 send() 發送的對象。該方法會一直阻塞直到接收到對象。 若是對端關閉了鏈接或者沒有東西可接收,將拋出 EOFError 異常。
    • fileno()
      返回由鏈接對象使用的描述符或者句柄。
    • close()
      關閉鏈接對象。當鏈接對象被垃圾回收時會自動調用。
      更多方法詳見文檔
    from multiprocessing import Process, Pipe
    
    
    def f(conn):
    	print(conn.recv())  # [1, '12', True]
    
    
    if __name__ == '__main__':
    	conn1, conn2 = Pipe()   # 默認爲雙向
    
    	p = Process(target=f, args=(conn2,))
    	p.start()
    	conn1.send([1, "12", True])
    	p.join()

線程池和進程池

線程和進程的建立、切換、關閉都須要必定的成本,對於某些重複次數多且聲明週期短的任務可使用線/進程池,線/進程池的數量並非越多越好,太多可能得不償失,甚至致使python解釋器崩潰。
使用線程池要用到concurrent.futures.ThreadPoolExecutor
使用進程池要用到concurrent.futures.ProcessPoolExecutor
線程池和進程池都提供瞭如下經常使用方法:

  • submit(fn, *args, **kwargs):將 fn 函數提交給線/進程池。
    *args 表明傳給 fn 函數的參數,*kwargs 表明以關鍵字參數的形式爲 fn 函數傳入參數。

  • map(func, *iterables, timeout=None, chunksize=1):該函數相似於全局函數 map(func, *iterables)
    該函數將會啓動多個線程,以異步方式當即對 iterables 執行 map 處理。

  • shutdown(wait=True, *, cancel_futures=False)
    當待執行的 future 對象完成執行後向執行者發送信號,它就會釋放正在使用的任何資源。
    python3.9纔開始增長cancel_futures參數

    以線程池爲例(與with搭配使用更好

    from concurrent.futures import ThreadPoolExecutor
    
    
    pool = ThreadPoolExecutor(8)
    
    
    def work(num, name="unknown"):
    	print(name, num)
    
    
    for i in range(10):
    	pool.submit(work, i, name="work-%s" % i)
    
    
    pool.map(work, [1, 2, 3, 45])
    
    # pool.shutdown(wait=True)

Future對象是submit方法的返回值,其自己也有一些實用的方法:

  • result(timeout=None)
    返回 函數的返回值。若是調用還沒完成那麼這個方法將等待 timeout 秒。超時則觸發`concurrent.futures.TimeoutError

  • exception(timeout=None)
    返回由調用函數引起的異常。若是調用還沒完成那麼這個方法將等待 timeout 秒。超時則觸發concurrent.futures.TimeoutError

  • add_done_callback(fn)
    回調函數,將 fn 附加到future對象。當 future 對象被取消或完成運行時,將會調用 fn,而這個future 對象將做爲它惟一的參數

    import time
    from concurrent.futures import ThreadPoolExecutor
    
    
    def callback(future):
    	""" 回調函數,future是concurrent.futures._base.Future對象 """
    	print("result", future.result())    # result 123
    	print("exception", future.exception())  # exception None
    
    
    def work(num):
    	if not isinstance(num, int):
    		raise TypeError
    	return num
    
    
    with ThreadPoolExecutor(8) as executor:
    	res = executor.submit(work, 123)
    	res.add_done_callback(callback)

    進程池

    # ...略
    def main():
        with ProcessPoolExecutor() as executor:
            res = executor.submit(work, 123)
            res.add_done_callback(callback)
    
    
    if __name__ == '__main__': # 不要省略了這個,不然報錯
        main()

生產者、消費者模型

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題,經過一個容器來解決生產者和消費者的強耦合問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。在通常的高併發程序中一般就會有這樣的場景出現:生產多快,處理不過來;生產太慢,等半天沒得處理。因此要引入生產者消費者模型:

  • 生產者向阻塞隊列中添加數據,隊列爲滿的時候就等
  • 消費者從阻塞隊列中拿數據,隊列爲空的時候也要等
  • 消費者通常是死循環處理數據,當不消費時,能夠發送信號,讓消費者退出

生產者與消費者模型
生產者與消費者模型

多線程實現:

import threading
import queue
import time
import random


class Consumer(threading.Thread):

    """ 消費者 """

    def __init__(self, q, lock, name):
        super().__init__()
        self.q = q          # 阻塞消息隊列
        self.lock = lock    # 互斥鎖
        self.name = "消費者-" + str(name)
        self.daemon = True  # 設置爲守護線程

    def run(self):

        while True:
            item = self.q.get()  # 有則取,無則阻塞

            with self.lock:         # 使用上下文管理協議使用鎖
                print(f"{self.name}: 處理{item}....")
                time.sleep(random.uniform(1, 2))     # 模擬處理時間1~2的浮點數
            self.q.task_done()      # 調用task_done()


class Producer(threading.Thread):
    """ 生產者 """

    def __init__(self, q, count, name):
        super().__init__()
        self.q = q      # 阻塞隊列
        self.count = count  # 生產幾個數據
        self.name = "生產者-" + str(name)

    def run(self):
        for num in range(self.count):
            data = "data-%d" % num
            print(f"{self.name}: 生成數據 {data}")
            time.sleep(random.random())     # 模擬處理時間0~1的浮點數
            self.q.put(data)     # 添加數據,滿則阻塞

        # 由於消費者是守護線程,其是否能夠退出要看生產者
        self.q.join()       # 等待全部的數據都處理完了,才退出


if __name__ == '__main__':
    q = queue.Queue()
    lock = threading.Lock()

    # 使用map生成,並啓動消費者
    list(map(lambda name: Consumer(q, lock, name).start(), ["甲", "乙", "丙"]))

    # 生成者列表
    producer_list = map(lambda name: Producer(q, 20, name), ["大廚", "小廚"])

    for p in producer_list:
        p.start()
    for p in producer_list:
        p.join()
        # 等結束

多進程實現:

import multiprocessing
import time
import random


class Consumer(multiprocessing.Process):

    """ 消費者 """

    def __init__(self, q, lock, name):
        super().__init__()
        self.q = q          # 阻塞消息隊列
        self.lock = lock    # 互斥鎖
        self.name = "消費者-" + str(name)
        self.daemon = True  # 設置爲守護進程

    def run(self):

        while True:
            item = self.q.get()  # 有則取,無則阻塞

            with self.lock:         # 使用上下文管理協議使用鎖
                print(f"{self.name}: 處理{item}....")
                time.sleep(random.uniform(1, 2))     # 模擬處理時間1~2的浮點數
            self.q.task_done()      # 調用task_done()


class Producer(multiprocessing.Process):
    """ 生產者 """

    def __init__(self, q, count, name):
        super().__init__()
        self.q = q      # 阻塞隊列
        self.count = count  # 生產幾個數據
        self.name = "生產者-" + str(name)

    def run(self):
        for num in range(self.count):
            data = "data-%d" % num
            print(f"{self.name}: 生成數據 {data}")
            time.sleep(random.random())     # 模擬處理時間0~1的浮點數
            self.q.put(data)     # 添加數據,滿則阻塞

        # 由於消費者是守護進程,其是否能夠退出要看生產者
        self.q.join()       # 等待全部的數據都處理完了,才退出


if __name__ == '__main__':
    q = multiprocessing.JoinableQueue()
    lock = multiprocessing.Lock()

    # 啓動兩個消費者進程

    list(map(lambda name: Consumer(q, lock, name).start(), ["甲", "乙"]))

    # 生成者只開一個進程
    p = Producer(q, 20, "大廚")
    p.start()

    p.join()
    # 等結束

協程

點擊查看如何使用協程

aiohttp

aiohttp是一個基於asyncio實現對http協議支持的第三方庫,點擊查看如何使用aiohttp

IO

進程的執行是要靠操做系統調度的,爲了保證不影響後面程序的運行,因此在執行過程當中遇到阻塞或超過期間輪詢時cpu會切換不一樣的進程執行

當咱們寫的程序須要數據即有IO的時候可使用如下的四種模式來解決問題。

IO模式
IO模式

IO多路複用實現

不一樣平臺有不一樣的實現IO多路複用的模塊,windows下支持select僅適用於套接字;Linux下至此selectpollepoll 函數的訪問,這些函數在大多數操做系統中是可用的;在 Solaris下爲devpoll; BSD 上可用kqueue;在這些操做系統上,適用於套接字和其餘文件類型。

水平觸發和邊緣觸發

水平觸發
對於讀:只要緩衝內容不爲空返回讀就緒
對於寫:只要緩衝區還不滿返回寫就緒
邊緣觸發
對於讀:緩衝區由空變爲不空 或 數據變多 等時候返回讀就緒
對於寫:緩衝區由滿變爲空 或 數據變少 等時候返回寫就緒
select和poll都是使用的水平觸發方式。epoll既支持水平觸發也支持邊緣觸發,默認是水平觸發。

在python中要實現IO多路複用,可使用selectselectorsselectors是對select的進一步封裝,使用selectors.DefaultSelector()能夠自動選擇當前平臺最高效的接口。因此推薦使用selectors模塊。

使用selectors模塊主要要用到如下幾個方法:

  • selectors.DefaultSelector() 以kqueue > epoll > devpoll > poll > select等優先級返回選擇器類
  • 選擇器類.register(fileobj, event, data=None) 註冊一個用於選擇的文件對象,在其上監視 I/O 事件。
    • fileobj 是要監視的文件對象。 它能夠是整數形式的文件描述符或者具備 fileno方法(返回文件描述符)的對
    • events 是要監視的事件的位掩碼。指明哪些 I/O 事件要在給定的文件對象上執行等待(selectors.EVENT_READ:可讀, selectors.EVENT_WRITE:可寫)
    • data 是一個不透明對象。
  • 選擇器類.unregister(fileobj)
    註銷對一個文件對象的選擇,移除對它的監視。 在文件對象被關閉以前應當先將其註銷。
  • 選擇器類.select(timeout=None)
    等待直到有已註冊的文件對象就緒,或是超過期限。
    這將返回由 (key, events) 元組構成的列表,每項各表示一個就緒的文件對象。
    key是namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data']),fd是文件描述符,其餘的都是register函數的參數
  • 選擇器類.close()
    關閉選擇器。確保釋放下層資源。

來自python官方文檔的例子

import selectors
import socket

sel = selectors.DefaultSelector()


def accept(sock, mask):
    conn, addr = sock.accept()  # 等鏈接是讀
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)     # 設置非阻塞
    sel.register(conn, selectors.EVENT_READ, read)


def read(conn, mask):
    data = conn.recv(1000)  # 接收消息是讀
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)
    else:                   # 斷開鏈接
        print('closing', conn)
        sel.unregister(conn)
        conn.close()


sock = socket.socket()
sock.bind(('localhost', 1234))

sock.listen(100)
sock.setblocking(False)      # 設置非阻塞
sel.register(sock, selectors.EVENT_READ, accept)

while True:
    events = sel.select()
    for key, mask in events:    # 一直阻塞,直到有數據來
        print("event 循環")
        # mask是位掩碼EVENT_READ或EVENT_WRITE

        # 獲得回調函數,這裏是read或accept
        callback = key.data
        callback(key.fileobj, mask)

運行這段代碼,並用其它終端鏈接:

>>> import socket
>>> sock = socket.socket()
>>> sock.connect(("localhost", 1234))
>>> sock.send(b"hello world")
11

異步非阻塞實現

自定義異步非阻塞web框架

個人github
個人博客
個人筆記

相關文章
相關標籤/搜索