python--Selectors模塊/隊列

Selectors模塊/隊列

一 Selectors模塊

IO多路複用實現機制python

Win: selectlinux

Linux:select(效率低)  poll  epoll(最好)默認選擇epoll安全

select缺點:dom

1 每次調用select都要講全部的fd(文件描述符)拷貝到內核空間致使效率降低;socket

2 遍歷全部fd,是否有數據訪問(最重要的問題);函數

3 最大鏈接數(1024)。spa

Poll缺點:線程

1 最大鏈接數沒有限制。code

epoll:對象

經過三個函數實現:

1 第一個函數:建立epoll句柄:將全部的fd(文件描述符)拷貝到內核空間,可是隻需拷貝一次;

2 回調函數:某一個函數或者某一個動做成功完成以後,會觸發的函數;

  爲全部的fd綁定一個回調函數,一旦有數據訪問,觸發該回調函數,回調函數將fd放到列表中。

3 第三個函數 判斷列表是否爲空

最大鏈接數沒有上限

selectors:

#服務端
import selectors
import socket
sock=socket.socket()

sock.bind(("127.0.0.1",8000))
sock.listen(5)
sock.setblocking(False)

sel=selectors.DefaultSelector()  ##根據具體平臺選擇最佳IO多路機制,好比在linux,選擇epoll

def read(conn,mask):
    try:
        data=conn.recv(1024)
        print(data.decode("utf8"))
        msg=input(">>")
        conn.send(msg.encode("utf8"))
    except Exception:
        sel.unregister(conn)
def accept(sock,mask):
    conn,addr=sock.accept()
    sel.register(conn,selectors.EVENT_READ,read)
sel.register(sock,selectors.EVENT_READ,accept) #註冊事件
while True:
    print("waiting......")
    events=sel.select() #監聽  [(key,mask)]
    for key,mask in events:
        print(key.data)
        print(key.fileobj)
        func=key.data  #拿到的是函數
        obj=key.fileobj  #拿到的是sock對象文件描述符(fd)
        func(obj,mask)


#客戶端
import socket
sock=socket.socket()
sock.connect(("127.0.0.1",8000))
while True:
    msg=input('>>')
    sock.send(msg.encode('utf8'))
    data=sock.recv(1024)
    print(data.decode('utf8'))

 

二 隊列

queue特色(線程是安全),也是數據類型

'''

建立一個「隊列」對象

import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過Queue的構造函數的可選參數
maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。

將一個值放入隊列中
q.put(10)
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;
第二個block爲可選參數,默認爲
1。若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲0,
put方法將引起Full異常。

將一個值從隊列中取出
q.get()
調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲True。若是隊列爲空且
block爲True,get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常。

'''

默認(先進先出)(FIFO)  

import queue
q=queue.PriorityQueue()
q.put(111)
q.put(222)
q.put(333)

print(q.get())
print(q.get())
print(q.get())

執行結果:
111
222
333

先進後出(lifo last in first out)  

import queue
q=queue.LifoQueue()
q.put(111)
q.put(222)
q.put(333)
print(q.get())

執行結果:
333

優先級

import queue
q=queue.PriorityQueue()
q.put([4,"hello3"])
q.put([1,"hello"])
q.put([3,"hello1"])

while True:
    data=q.get()
    print(data)


執行結果:
[1, 'hello']
[3, 'hello1']
[4, 'hello3']

join與task_done方法

join()阻塞進程,直到全部任務完成,須要配合另外一個task_done。

task_done()表示某個任務完成。每一條get語句後須要一條task_done。

import queue
q=queue.Queue(5)
q.put(111)
q.put(222)
print(q.get())
q.task_done()
print(q.get())
q.task_done()

q.join()
print("ending")
執行結果:
111
222
ending

  

三 生產者模型

生產者消費者模式是經過一個容器來解決生產者和消費者的強藕和問題,生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

import time,random
import queue,threading

q = queue.Queue()

def Producer(name):
  count = 0
  while count <10:
    print("making........")
    time.sleep(random.randrange(3))
    q.put(count)
    print('Producer %s has produced %s baozi..' %(name, count))
    count +=1
    #q.task_done()
    #q.join()
    print("ok......")
def Consumer(name):
  count = 0
  while count <10:
    time.sleep(random.randrange(4))
    if not q.empty():
        data = q.get()
        #q.task_done()
        #q.join()
        print(data)
        print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
    else:
        print("-----no baozi anymore----")
    count +=1

p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
# c2 = threading.Thread(target=Consumer, args=('C',))
# c3 = threading.Thread(target=Consumer, args=('D',))
p1.start()
c1.start()
# c2.start()
# c3.start()
相關文章
相關標籤/搜索