python_12(併發編程)

第1章 進程
1.1 隊列Queue
1.2 Queue方法
1.2.1 q.get([block [,timeout]])
1.2.2 q.get_nowait()
1.2.3 q.put(item [, block [timeout]])
1.2.4 q.size()
1.2.5 q.empty()
1.2.6 q.full()
1.2.7 q.close()
1.2.8 q.cancel_join_thread()
1.2.9 q.join_thread()
1.2.10 例:相關參數應用
1.2.11 例2)
1.2.12 生產者消費者模型
1.3 進程小結
第2章 線程
2.1 理論
2.2 使用方法
2.3 建立線程
2.4 進程和線程
2.5 效率測試
2.6 缺點
2.7 get_ident
2.8 current_thread
2.9 enumerate
2.10 terminate
2.11 守護線程
2.12 使用面向對象方法開啓線
2.13 線程鎖
2.14 鎖的方法及種類lock
2.14.1 同步鎖的引用
2.14.2 互斥鎖與join的區別
2.14.3 解釋說明
2.14.4 死鎖與遞歸鎖
2.14.5 小結
2.15 線程隊列queue()
2.15.1 調用方法
2.15.2 規則
第3章 池
3.1 進程池
3.1.1 multiprocess.Pool模塊
3.1.2 例:進程池開啓socket聊天
3.2 線程池
3.2.1 線程池模塊
3.2.2 線程調用方法
3.2.3 查看cpu個數的方法:
3.2.4 規定線程數量用法
3.2.5 submit
3.2.6 map
3.2.7 shutdown
3.2.8 result
3.2.9 add_done_callback
3.2.10 小結
3.2.11 總例
第4章 協程
4.1 介紹
4.2 特色
4.3 Greenlet模塊
4.3.1 greenlet實現狀態切換
4.4 安裝gevent模塊
4.4.1 安裝
4.4.2 介紹
4.4.3 用法
4.5 join
4.6 gevent.spawn
4.7 Gevent之同步與異步
4.8 Gevent之應用舉例一(爬蟲)
第5章 IO模型
5.1 介紹
5.2 阻塞IO模型
5.3 非阻塞IO
5.4 多路複用
5.5 selectors模塊
5.6 selectors模塊實現聊天python

 

第1章 進程

1.1 隊列Queue

建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。git

Queue([maxsize])github

建立共享的進程隊列web

參數:maxsize是隊列中容許的最大項數,若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。編程

1.2 Queue方法

1.2.1 q.get([block [,timeout]])

解釋:返回q中的一個項目,q爲空將阻塞,直到隊列中有項目,json

      block用於控制阻塞行爲,默認爲true,如爲false將引起queue.empty異常windows

         timeout是可選超時時間,阻塞中若是沒有西南股變爲可用,引起Queue.Empty異常數組

1.2.2 q.get_nowait()

       等同於q.get(False)緩存

1.2.3 q.put(item [, block [timeout]])

 解釋:將item放入隊列,若是隊列已滿,此方法將阻塞至有空間可用爲止,block控制阻塞行爲,                        默認爲True.如block爲False,將引起Queue.Empty異常安全

              timeout指定在阻塞模式中等待可用空間的時間長短,超時引起Queue.Full異常

1.2.4 q.size()

解釋:返回隊列中目前項目的正確數量,結果不可靠,因在返回結果過程當中可能隊列又增長了項目,在某些系統上可能引起NOT ImplementedError異常

1.2.5 q.empty()

若是調用方法時 q爲空,返回True. 若是其餘進程或者線程正在往隊列中添加項目,結果是不可靠的。

1.2.6 q.full()

若是q已滿,返回爲True,因爲線程的存在,結果也多是不可靠的

1.2.7 q.close()

關閉隊列,防止隊列加入更多數據,調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉,若是q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,若是某個使用者正被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。

1.2.8 q.cancel_join_thread()

不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。

 

1.2.9 q.join_thread()

鏈接隊列的後臺線程。此方法用於在調用q.close()方法後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread()方法能夠禁止這種行爲。

 

1.2.10 例:相關參數應用

import time

from multiprocessing import Process ,Queue

 

def f(q):

    q.put([time.asctime(),'from Eva','hello'])

    #調用主函數中p進程傳遞過來的參數 put函數爲向隊列中添加的一條數據

 

if __name__ == '__main__':

    q = Queue()#建立一個隊列對象

    p = Process(target=f,args=(q,))#建立一個進程

    p.start()

    print(q.get())

    p.join()

輸出

C:\python3\python3.exe D:/python/untitled2/Course_selection_system/conf/lession.py

['Mon Aug  6 17:03:02 2018', 'from Eva', 'hello']

上面是一個queue的簡單應用,使用隊列q對象調用get函數來取得隊列中最早進入的數據。 接下來看一個稍微複雜一些的例子:

1.2.11 例2)

 批量生產數據放入隊列再批量獲取結果

import os

import time

import multiprocessing

 

# 向queue中輸入數據的函數

def inputQ(queue):

    info = str(os.getpid()) + '(put):' + str(time.asctime())

    queue.put(info)

 

# 向queue中輸出數據的函數

def outputQ(queue):

    info = queue.get()

    print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info))

 

# Main

if __name__ == '__main__':

    multiprocessing.freeze_support()

    record1 = []   # store input processes

    record2 = []   # store output processes

    queue = multiprocessing.Queue(3)

 

    # 輸入進程

    for i in range(10):

        process = multiprocessing.Process(target=inputQ,args=(queue,))

        process.start()

        record1.append(process)

 

    # 輸出進程

    for i in range(10):

        process = multiprocessing.Process(target=outputQ,args=(queue,))

        process.start()

        record2.append(process)

 

    for p in record1:

        p.join()

 

    for p in record2:

        p.join()

 

輸出:

C:\python3\python3.exe D:/python/untitled2/Course_selection_system/conf/lession.py

5996(get):5740(put):Mon Aug  6 18:05:07 2018

4516(get):8144(put):Mon Aug  6 18:05:07 2018

6112(get):5064(put):Mon Aug  6 18:05:07 2018

5408(get):4340(put):Mon Aug  6 18:05:07 2018

5240(get):2768(put):Mon Aug  6 18:05:07 2018

2904(get):7720(put):Mon Aug  6 18:05:08 2018

7032(get):7316(put):Mon Aug  6 18:05:08 2018

6900(get):8032(put):Mon Aug  6 18:05:08 2018

4360(get):8036(put):Mon Aug  6 18:05:08 2018

2320(get):6360(put):Mon Aug  6 18:05:08 2018

 

Process finished with exit code 0

1.2.12 生產者消費者模型

生產者數據與消費者數據存在一種供需關係,當供大於需或需大於供,都會致使以防阻塞等待,解決這樣的狀況,應用了阻塞隊列,就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

 

 

實例:

from multiprocessing import Process,Queue

import time,random,os

def consumer(q):

    while True:

        res=q.get()

        time.sleep(random.randint(1,3))

        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):

    for i in range(10):

        time.sleep(random.randint(1,3))

        res='包子%s' %i

        q.put(res)

        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))

 

if __name__ == '__main__':

    q=Queue()

    #生產者

    p1=Process(target=producer,args=(q,))

    #消費

    c1=Process(target=consumer,args=(q,))

    p1.start()

    c1.start()

    print('主')

輸出

C:\python3\python3.exe D:/python/untitled2/Course_selection_system/conf/lession.py

8080 生產了 包子0

8080 生產了 包子1

8080 生產了 包子2

8592 吃 包子0

8592 吃 包子1

8592 吃 包子2

8080 生產了 包子3

8080 生產了 包子4

8592 吃 包子3

8592 吃 包子4

8080 生產了 包子5

8592 吃 包子5

8080 生產了 包子6

8592 吃 包子6

8080 生產了 包子7

8080 生產了 包子8

8592 吃 包子7

8080 生產了 包子9

8592 吃 包子8

8592 吃 包子9

 

爬蟲

 

 

1.3 進程小結

注意事項:

n  多進程不適合作讀寫,多io型,由於多進程是用來解決計算用的

n  進程的開銷是比較大的,多進程可以充分利用多核

        

特色:

l  進程開啓的數量是有限的

      密切的和CPU的個數相關,進程數 應該在cpu的1-2倍之間

l  進程的開啓和銷燬都須要比較大的時間開銷

l  進程越多操做系統調度起來就消耗的資源多

l  實際上多進程主要就是 利用多個cpu,且同一時間最多隻能執行和CPU相等的進程

       CPU只能用來作計算,

       高計算性的程序適用多進程

       但高IO型的層序不適合多進程

第2章 線程

2.1 理論

l  進程源於多道程序出現

        數據隔離

        資源分配

       進程是計算機中資源分配的最小單位

l  線程屬於進程

    是用來執行程序的

       線程是計算機中cpu調度最小的單位

l  線程做用:

       爲了節省操做系統的資源

       在實現併發的時候能減小時間開銷

l  線程圖示

 

2.2 使用方法

建立方式選擇

l  thread (底層)

l  threading(推薦更高級)

2.3 建立線程

from threading import Thread

import time

def kk(name):

    time.sleep(2)

    print('%s say hello' %name)

 

if __name__ == '__main__':

    t=Thread(target=kk,args=('huhu',))

    t.start()

    # t.join()

    print('主線程')

輸出:

C:\python3\python3.exe D:/python/untitled2/lianxi/10.py

huhu say hello

主線程

 

開啓多線程也能夠支持併發

import  os

import time

from threading  import Thread

 

def func(i):

    time.sleep(1)

    print(i,os.getpid())

 

print('main',os.getpid())

for i in range(10):

    t = Thread(target=func,args=(i,))

    t.start()

輸出

main 6220

0 6220

1 6220

5 6220

2 6220

6 6220

9 6220

3 6220

4 6220

8 6220

7 6220

Process finished with exit code 0

2.4 進程和線程

l  進程和線程都實現了併發

l  python階段 進程和線程之間的區別

l  進程pid,多進程的時候每一個子進程有本身的pid

l  多個線程共享一個進程id

l  數據隔離和共享,多進程之間數據隔離

l  線程之間全局變量都是共享的

l  main:進程必須寫if __name == '__main__'

l  線程因爲共享進程的代碼,不須要再執行文件中的代碼

l  效率差:

 線程的開啓和銷燬耗時遠小於進程

 

import os

import  time

from  threading  import Thread

 

n = 10

def func(i):

    global n

    n -= 1

    time.sleep(1)

    print(i,os.getpid())

 

print('main',os.getpid())

t_lst = []

for i in  range(10):

    t = Thread(target=func,args=(i,))

    t.start()

    print(t)

    t_lst.append(t)

    for t in t_lst:t.join()

print(n)

 

輸出

C:\python3\python3.exe D:/python/untitled2/lianxi/10.py

main 6344

<Thread(Thread-1, started 7796)>

0 6344

<Thread(Thread-2, started 6704)>

1 6344

<Thread(Thread-3, started 7000)>

2 6344

<Thread(Thread-4, started 1508)>

3 6344

<Thread(Thread-5, started 7880)>

4 6344

<Thread(Thread-6, started 8068)>

5 6344

<Thread(Thread-7, started 4240)>

6 6344

<Thread(Thread-8, started 6292)>

7 6344

<Thread(Thread-9, started 3920)>

8 6344

<Thread(Thread-10, started 6664)>

9 6344

0

例:每一個線程加join

import os

import  time

from  threading  import Thread

 

n = 10

def func(i):

    global n

    n -= 1

    time.sleep(1)

    print(i,os.getpid())

 

print('main',os.getpid())

t_lst = []

for i in  range(10):

    t = Thread(target=func,args=(i,))

    t.start()

    print(t)

    t_lst.append(t)

    for t in t_lst:t.join()

print(n)

輸出

C:\python3\python3.exe D:/python/untitled2/lianxi/10.py

main 6564

0 6564

1 6564

2 6564

3 6564

4 6564

5 6564

6 6564

7 6564

8 6564

2.5 效率測試

例:

from  threading import Thread

from  multiprocessing import  Process

import time

import os

 

n = 10

def func(i):

    # time.sleep(1)

    global n

    n -= 1

 

if __name__ == '__main__':

    start = time.time()

 

    t_lst = []

    for  i  in  range(100):

        t = Thread(target=func,args=(i,))

        t.start()

        t_lst.append(t)

    for t in t_lst:t.join()

    print('線程',time.time()- start)

 

    start = time.time()

    p_lst = []

    for i in  range(100):

        p = Process(target=func,args=(i,))

        p.start()

        p_lst.append(p)

    for p in p_lst:p.join()

    print('進程: ' ,time.time() - start)

輸出:

C:\python3\python3.exe D:/python/untitled2/lianxi/10.py

線程 0.01700115203857422

進程:  5.134293556213379

2.6 缺點

GIL鎖的是線程

cpython解釋器的CIL:致使python不能有效利用多核

jpython pypy解釋可以充分利用多核

即實現併發又實現多核的方法

多進程 + 多線程

2.7  get_ident

解釋:查看線程id

from threading import Thread

from threading import get_ident

 

def func(arg1,arg2):

    print(arg1,arg2,get_ident())

print(get_ident())

t = Thread(target=func,args=(1,2))

t.start()

輸出

C:\python3\python3.exe D:/python/untitled2/lianxi/10.py

主進程ID號 6564

線程ID號:  1 2 7344

2.8 current_thread

解釋:查看線程pid

from threading import Thread

from threading import get_ident,current_thread

def func(arg1,arg2):

    print('線程ID號: ',arg1,arg2,get_ident(),current_thread().name)

print('主進程ID號',get_ident(),current_thread().ident)

t = Thread(target=func,args=(1,2))

t.start()

輸出

C:\python3\python3.exe D:/python/untitled2/lianxi/10.py

主進程ID號 7184 7184

線程ID號:  1 2 7724 Thread-1

2.9 enumerate

解釋:返回一個包含正在運行的線程list,正在運行指線程後臺運行,結束前,不包含啓動前和終止後

import time

from threading import Thread

from threading import get_ident,current_thread,enumerate

 

def func(arg1,arg2):

    print('線程ID號: ',arg1,arg2,get_ident(),current_thread().name)

    time.sleep(2)

for i in range(10):

    t = Thread(target=func,args=(1,2))

    t.start()

print(len(enumerate()))

輸出:

C:\python3\python3.exe D:/python/untitled2/lianxi/10.py

線程ID號:  1 2 6416 Thread-1

線程ID號:  1 2 8052 Thread-2

線程ID號:  1 2 1688 Thread-3

線程ID號:  1 2 6920 Thread-4

線程ID號:  1 2 3548 Thread-5

線程ID號:  1 2 3560 Thread-6

線程ID號:  1 2 7720 Thread-7

線程ID號:  1 2 2020 Thread-8

線程ID號:  1 2 7680 Thread-9

線程ID號:  1 2 4048 Thread-10

2.10 terminate???

解釋:不能被強制終止

 

 

2.11 守護線程

l  守護進程會隨着主進程的代碼結束而結束

l  守護線程會隨着主線程的結束而結束

      而主線程也會等待子線程結束才結束,因此守護線程會等待包括子線程以內的全部線程都結束以後才結束

import time

from threading  import Thread

 

def func1():

    print('start func1')

    time.sleep(0.5)

    print('in func1' )

 

def func2():

    print('start func2')

    time.sleep(5)

    print('end func2')

t = Thread(target=func1)

t.start()

t2 = Thread(target=func2)

t2.start()

輸出:

C:\python3\python3.exe D:/python/untitled2/lianxi/10.py

start func1

start func2

in func1

end func2

 

守護線程後

import time

from threading  import Thread

 

def func1():

    print('start func1')

    time.sleep(0.5)

    print('in func1' )

 

def func2():

    print('start func2')

    time.sleep(5)

    print('end func2')

t = Thread(target=func1)

 

t.start()

t2 = Thread(target=func2)

t2.daemon = True

t2.start()

輸出

C:\python3\python3.exe D:/python/untitled2/lianxi/10.py

start func1

start func2

in func1

例2)

from threading import Thread

import time

def sayhi(name):

    time.sleep(2)

    print('%s say hello' %name)

 

if __name__ == '__main__':

    t=Thread(target=sayhi,args=('egon',))

    t.setDaemon(True) #必須在t.start()以前設置

    t.start()

 

    print('主線程')

    print(t.is_alive())

輸出

C:\python3\python3.exe D:/python/untitled2/lianxi/10.py

主線程

True

 

2.12 使用面向對象方法開啓線程

import time

from threading import  Thread

 

class MyThread(Thread):

    def __init__(self,arg,arg2):

        super().__init__()

        self.arg = arg

        self.arg2 = arg2

    def run(self):

        print('start func2',self.arg)

        time.sleep(5)

        print('end func2',self.arg2)

mt = MyThread('a','b')

mt.start()

2.13 線程鎖

n  數據不安全

       多個線程、進程同時操做一個數據

       保證數據安全-------基於文件形式

       線程中調用緩存的數據沒有被及時釋放或者數據被其餘線程更改後調用時數據發生變化

       GIL鎖機制鎖的是線程,沒有鎖住內存中的數據因此數據不安全和GIL不要緊

n  避免數據不安全

       就要對全局變量的修改必需要枷鎖

       並不能影響效率

       不是必須共享的數據不要設置爲全局變量

2.14 鎖的方法及種類lock

調用方法:(互斥鎖)

from threading  import Lock

2.14.1 同步鎖的引用

from threading import Thread,Lock

import os,time

def work():

    global n

    lock.acquire()

    temp=n

    time.sleep(0.1)

    n=temp-1

    lock.release()

if __name__ == '__main__':

    lock=Lock()

    n=100

    l=[]

    for i in range(100):

        p=Thread(target=work)

        l.append(p)

        p.start()

    for p in l:

        p.join()

    print(n) #結果確定爲0,由原來的併發執行變成串行,犧牲了執行效率保證了數據安全

2.14.2 互斥鎖與join的區別

例1:不加鎖:併發執行,速度快,數據不安全

from threading import current_thread,Thread,Lock

import os,time

def task():

    global n

    print('%s is running' %current_thread().getName())

    temp=n

    time.sleep(0.5)

    n=temp-1

# n=5

# task()

if __name__ == '__main__':

    n=100

    lock=Lock()

    threads=[]

    start_time=time.time()

    for i in range(100):

        t=Thread(target=task)

        threads.append(t)

        t.start()

    for t in threads:

        t.join()

    stop_time=time.time()

    print('主:%s n:%s' %(stop_time - start_time,n))

輸出

Thread-95 is running

Thread-96 is running

Thread-97 is running

Thread-98 is running

Thread-99 is running

Thread-100 is running

主:0.5180294513702393 n:99

例2:不加鎖:未加鎖部分併發執行,加鎖部分串行執行,速度慢,數據安全

from threading import current_thread,Thread,Lock

import os,time

def task():

    #未加鎖的代碼併發運行

    time.sleep(3)

    print('%s start to run' %current_thread().getName())

    global n

    #加鎖的代碼串行運行

    lock.acquire()

    temp=n

    time.sleep(0.5)

    n=temp-1

    lock.release()

if __name__ == '__main__':

    n=100

    lock=Lock()

    threads=[]

    start_time=time.time()

    for i in range(100):

        t=Thread(target=task)

        threads.append(t)

        t.start()

    for t in threads:

        t.join()

    stop_time=time.time()

    print('主:%s n:%s' %(stop_time-start_time,n))

輸出

'''

Thread-1 is running

Thread-2 is running

......

Thread-100 is running

主:53.294203758239746 n:0

 
例3:用jion不加鎖

from threading import current_thread,Thread,Lock

import os,time

def task():

    time.sleep(3)

    print('%s start to run' %current_thread().getName())

    global n

    temp=n

    time.sleep(0.5)

    n=temp-1

 

 

if __name__ == '__main__':

    n=100

    lock=Lock()

    start_time=time.time()

    for i in range(100):

        t=Thread(target=task)

        t.start()

        t.join()

    stop_time=time.time()

    print('主:%s n:%s' %(stop_time-start_time,n))

 

'''

Thread-1 start to run

Thread-2 start to run

......

Thread-100 start to run

主:350.6937336921692 n:0 #耗時是多麼的恐怖

'''

2.14.3 解釋說明

既然加鎖會讓運行變成串行,那麼我在start以後當即使用join,就不用加鎖了啊,也是串行的效果啊

#沒錯:在start以後馬上使用jion,確定會將100個任務的執行變成串行,毫無疑問,最終n的結果也確定是0,是安全的,但問題是

#start後當即join:任務內的全部代碼都是串行執行的,而加鎖,只是加鎖的部分即修改共享數據的部分是串行的

#單從保證數據安全方面,兩者均可以實現,但很明顯是加鎖的效率更高

2.14.4 死鎖與遞歸鎖

解釋說明:

所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程,以下就是死鎖

 

遞歸鎖特色:

       若是能在第一個acquire的地方經過,那麼在一個線程中後面全部acquire都能經過

       可是其餘全部的線程都會在第一個acquire處阻塞

       在這個線程中acquire了多少次,就必須release多少次

       若是acquire的次數和release的次數不相等,那麼其餘線程也不能繼續向下執行

 

解決方法:

遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。下面的例子若是使用RLock代替Lock,則不會發生死鎖:

 

例:lock

from threading import Lock as Lock

import time

mutexA=Lock()

mutexA.acquire()

mutexA.acquire()

print(123)

mutexA.release()

mutexA.release()

 

例:Rlock

from threading import RLock as Lock

import time

mutexA=Lock()

mutexA.acquire()

mutexA.acquire()

print(123)

mutexA.release()

mutexA.release()

 

例1)科學家吃麪

#科學家吃麪

import time

from threading import  Thread,Lock

noodle_lock = Lock()

fork_lock = Lock()

def eat1(name):

    noodle_lock.acquire()

    print('%s 搶到了麪條' %name)

    fork_lock.acquire()

    print('%s 搶到了叉子' %name)

    print('%s 吃麪' %name )

    fork_lock.release()

    noodle_lock.release()

 

def eat2(name):

    fork_lock.acquire()

    print('%s 搶到了叉子' %name)

    time.sleep(1)

    noodle_lock.acquire()

    print('%s 搶到了麪條' %name)

    print('%s 吃麪' %name)

    noodle_lock.release()

    fork_lock.release()

 

for  name in ['a','b','c']:

    t1 = Thread(target=eat1,args=(name,))

    t2 = Thread(target=eat2,args=(name,))

    t1.start()

    t2.start()

輸出:

C:\python3\python3.exe D:/python/untitled2/lianxi/11.py

a 搶到了麪條

a 搶到了叉子

a 吃麪

a 搶到了叉子

b 搶到了麪條

例2):解決僵死狀況辦法

import time

from threading import  Thread,RLock

 

# noodle_lock = Lock()

fork_lock = noodle_lock =  RLock()

def eat1(name):

    noodle_lock.acquire()

    print('%s 搶到了麪條' %name)

    fork_lock.acquire()

    print('%s 搶到了叉子' %name)

    print('%s 吃麪' %name )

    fork_lock.release()

    noodle_lock.release()

 

def eat2(name):

    fork_lock.acquire()

    print('%s 搶到了叉子' %name)

    time.sleep(1)

    noodle_lock.acquire()

    print('%s 搶到了麪條' %name)

    print('%s 吃麪' %name)

    noodle_lock.release()

    fork_lock.release()

 

for  name in ['a','b','c']:

    t1 = Thread(target=eat1,args=(name,))

    t2 = Thread(target=eat2,args=(name,))

    t1.start()

    t2.start()

輸出

C:\python3\python3.exe D:/python/untitled2/lianxi/11.py

a 搶到了麪條

a 搶到了叉子

a 吃麪

a 搶到了叉子

a 搶到了麪條

a 吃麪

b 搶到了麪條

b 搶到了叉子

b 吃麪

b 搶到了叉子

b 搶到了麪條

b 吃麪

c 搶到了麪條

c 搶到了叉子

c 吃麪

c 搶到了叉子

c 搶到了麪條

c 吃麪

2.14.5 小結

遞歸鎖能夠解決資源佔用狀況,但仍沒法根本解決能夠暫時解決問題後續再改進,作好的解決方法是在開發設計過程當中就避免資源佔用的狀況發生

2.15 線程隊列queue()

在數據安全中,線程隊列是自帶線程鎖的容器

2.15.1 調用方法

import queue

2.15.2 規則

class queue.Queue(maxsize=0)

先進先出

import queue

 

q=queue.Queue()

q.put('first')

q.put('second')

q.put('third')

 

print(q.get())

print(q.get())

print(q.get())

'''

結果(先進先出):

first

second

third

'''

class queue.LifoQueue(maxsize=0) #last in fisrt out

後進先出

import queue

 

q=queue.LifoQueue()

q.put('first')

q.put('second')

q.put('third')

 

print(q.get())

print(q.get())

print(q.get())

'''

結果(後進先出):

third

second

first

存儲數據時可設置優先級的隊列

class queue.PriorityQueue(maxsize=0)

做用:可應用於識別網站會員

 

1PriorityQueue

import queue

 

q=queue.PriorityQueue()

#put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高

q.put((20,'a'))

q.put((10,'b'))

q.put((30,'c'))

 

print(q.get())

print(q.get())

print(q.get())

結果(數字越小優先級越高,優先級高的優先出隊):

(10, 'b')

(20, 'a')

(30, 'c')

 

2

 

import queue

q= queue.PriorityQueue()

q.put(('a'))

q.put(('z'))

q.put(('h'))

print(q.get())

print(q.get())

print(q.get())

結果(字母ACSII數值越靠前,優先級高的優先出隊):

C:\python3\python3.exe D:/python/untitled2/lianxi/lianxi.py

a

h

z

 

字符串和數字混在一塊兒報錯

 

import queue

 

q= queue.PriorityQueue()

q.put(('a'))

q.put((2))

q.put((3))

print(q.get())

print(q.get())

print(q.get())

print(q.get())

報錯信息

  File "D:/python/untitled2/lianxi/lianxi.py", line 877, in <module>

    q.put((2))

  File "C:\python3\lib\queue.py", line 143, in put

    self._put(item)

  File "C:\python3\lib\queue.py", line 227, in _put

    heappush(self.queue, item)

TypeError: '<' not supported between instances of 'int' and 'str'

 

第3章 池

3.1 進程池

任務多的狀況下,無限開啓進程/線程,浪費不少的時間開啓和銷燬,還要佔用系統的調度資源

爲了開啓有限的線程及進程,來完成無線的任務,這樣可以最大化的保證併發維護操做系統資源協調

3.1.1 multiprocess.Pool模塊

建立進程池

Pool([numprocess [,initializer [, initargs]]])

 

numprocess:要建立的進程數,若是省略將默認使用cpu_count()的值

inittializer:每一個工做進程啓動時要執行的可調用對象,默認爲None

initargs:是要傳給initializer的參數組

主要方法

p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。

'''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()'''

 

p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。

'''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。'''

  

p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成

 

P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用

其餘方法(瞭解)

方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法

obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。

obj.ready():若是調用完成,返回True

obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常

obj.wait([timeout]):等待結果變爲可用。

obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數

 

例:異步進程池

import os

import time

import random

from multiprocessing import Pool

 

def work(n):

    print('%s run' %os.getpid())

    time.sleep(random.random())

    return n**2

 

if __name__ == '__main__':

    p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務

    res_l=[]

    for i in range(10):

        res=p.apply_async(work,args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行

                                          # 返回結果以後,將結果放入列表,歸還進程,以後再執行新的任務

                                          # 須要注意的是,進程池中的三個進程不會同時開啓或者同時結束

                                          # 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。 

        res_l.append(res)

 

    # 異步apply_async用法:若是使用異步提交的任務,主進程須要使用jion,等待進程池內任務都處理完,而後能夠用get收集結果

    # 不然,主進程結束,進程池可能還沒來得及執行,也就跟着一塊兒結束了

    p.close()

    p.join()

    for res in res_l:

        print(res.get()) #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get

 

進程池的異步調用

 

3.1.2 例:進程池開啓socket聊天

from multiprocessing import Process,Queue

import time,random,os

def consumer(q):

    while True:

        res=q.get()

        time.sleep(random.randint(1,3))

        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):

    for i in range(10):

        time.sleep(random.randint(1,3))

        res='包子%s' %i

        q.put(res)

        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))

 

if __name__ == '__main__':

    q=Queue()

    #生產者

    p1=Process(target=producer,args=(q,))

    #消費

    c1=Process(target=consumer,args=(q,))

    p1.start()

    c1.start()

    print('主')

 

客戶端

from socket import *

 

client=socket(AF_INET,SOCK_STREAM)

client.connect(('127.0.0.1',8081))

 

while True:

    msg=input('>>: ').strip()

    if not msg:continue

 

    client.send(msg.encode('utf-8'))

    msg=client.recv(1024)

    print(msg.decode('utf-8'))

3.2 線程池

n  使用進程池的條件:

       任務數超過了CPU個數的兩倍

       進程的個數就不該和任務數相等

n  使用線程池的條件:

       任務數超過了CPU個數的5倍

       線程的個數就不該該和任務數相等

3.2.1 線程池模塊

concurrent.futures模塊提供了高度封裝的異步調用接口

ThreadPoolExecutor:線程池,提供異步調用

ProcessPoolExecutor: 進程池,提供異步調用

3.2.2 線程調用方法

from  concurrent.futures import  ThreadPoolExecutor

進程池調用方法

from concurrent.futures import ProcessPoolExecutor

3.2.3 查看cpu個數的方法:

import os

print(os.cpu_count())

3.2.4 規定線程數量用法

import  os

from  concurrent.futures import  ThreadPoolExecutor

# print(os.cpu_count())

ThreadPoolExecutor(os.cpu_count()*5)

3.2.5 submit

解釋:異步提交任務

方法:submit(fn, *args, **kwargs)

3.2.6 map

方法:map(func, *iterables, timeout=None, chunksize=1)

解釋:取代for循環submit的操做

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

 

import os,time,random

def task(n):

    print('%s is runing' %os.getpid())

    time.sleep(random.randint(1,3))

    return n**2

 

if __name__ == '__main__':

 

    executor=ThreadPoolExecutor(max_workers=3)

 

    # for i in range(11):

    #     future=executor.submit(task,i)

 

    executor.map(task,range(1,12)) #map取代了for+submit

 

map的用法

 

3.2.7 shutdown

方法:shutdown(wait=True)

解釋:至關於進程池的pool.close()+pool.join()操做

l  wait=True,等待池內全部任務執行完畢回收完資源後才繼續

l  wait=False,當即返回,並不會等待池內的任務執行完畢

l  但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢

注:

submit和map必須在shutdown以前

 

例:

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random

 

def task(n):

    print('%s is runing' %os.getpid())

    time.sleep(random.randint(1,3))

    return n**2

if __name__=='__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]

    for i in range(11):

        future=executor.submit(task,i)

        futures.append(future)

    executor.shutdown(True)

    print('+++>')

    for future in futures:

        print(future.result())

輸出

:\python3\python3.exe D:/python/untitled2/爬蟲.py

2520 is runing

7600 is runing

8560 is runing

7600 is runing

8560 is runing

2520 is runing

7600 is runing

8560 is runing

2520 is runing

7600 is runing

8560 is runing

+++>

0

1

4

9

16

25

36

49

64

81

100

3.2.8 result

方法:result(timeout=None)

解釋:取得結果,返回值

from threading import get_ident

from concurrent.futures import ThreadPoolExecutor

import os

import time

import random

def func(i):

    time.sleep(random.randint(1,2))

    print(get_ident(),i)

    return '*'*i*i

 

def call_bak(ret):

    print(get_ident(),len(ret.result()))

 

t_pool = ThreadPoolExecutor(os.cpu_count()*1)

for i in range(20):

    t_pool.submit(func,i)

t_pool.shutdown() # 阻塞

獲取併發返回值

ret_l = []

for i in range(20):

    ret = t_pool.submit(func,i)

    ret_l.append(ret)

for ret in ret_l:print(ret.result())    # 阻塞

3.2.9 add_done_callback

方法:add_done_callback(fn)

解釋:回調函數,沒有返回值

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

from multiprocessing import Pool

import requests

import json

import os

 

def get_page(url):

    print('<進程%s> get %s' %(os.getpid(),url))

    respone=requests.get(url)

    if respone.status_code == 200:

        return {'url':url,'text':respone.text}

 

def parse_page(res):

    res=res.result()

    print('<進程%s> parse %s' %(os.getpid(),res['url']))

    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))

    with open('db.txt','a') as f:

        f.write(parse_res)

 

 

if __name__ == '__main__':

    urls=[

        'https://www.baidu.com',

        'https://www.python.org',

        'https://www.openstack.org',

        'https://help.github.com/',

        'http://www.sina.com.cn/'

    ]

 

    # p=Pool(3)

    # for url in urls:

    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)

    # p.close()

    # p.join()

 

    p=ProcessPoolExecutor(3)

    for url in urls:

        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,須要用obj.result()拿到結果

3.2.10 小結

1)cpython解釋器下
  進程:利用多核一併行,數據不共享;開啓和切換和銷燬的開銷大,數據不安全

  線程:不能利用多核-併發,數據共享;開啓和切換和銷燬的開銷小,數據不安全

進程的數量很是有限:cpu的個數 +1

線程的數量也要限制:cpu的個數*5

以上操做都由池來完成

2)4覈計算機

5個進程 * 每一個進程20個線程 =100 個併發

多進程可以利用多核:搞計算性應該開多進程

多線程可以實現併發:高IO型應該開多線

最後的選擇仍是要看測試環境的測試速度

3.2.11 總例

import os

import time

from concurrent.futures import ProcessPoolExecutor

 

def func(i):

    time.sleep(1)

    print(i,os.getpid())

    return '*'*i

 

def wahaha(ret):

    print(os.getpid(),ret.result())

 

if __name__ == '__main__':

    #有兩個任務須要同步執行須要回調函數

    p = ProcessPoolExecutor(5)

    # for  i in  range(20):

        #異步執行任務按每五個任務爲一組執行

        # p.submit(func,i)

    #join整個任務列表:等待全部工做進程退出

    # p.shutdown()

    #至關於submit+for

    # p.map(func,range(10))

    # print('main process')

    #獲取結果

    # ret_l = []

    # for i in  range(1,20):

    #     #異步執行任務

    #     ret = p.submit(func,i)

    #     ret_l.append(ret)

    #

    # for i in ret_l:

    #     print(i.result)

 

    #回調函數-是由主進程執行的

   for i in range(1,20):

    # 兩個任務要同步執行 須要用到回調函數

    ret = p.submit(func,i).add_done_callback(wahaha)

第4章 協程

4.1 介紹

協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。

l  一條線程分紅幾個任務執行

l  每一個任務執行一會

l  再切到下一個任務

l  單純的切換會浪費時間

l  切換任務是由程序來完成而不是有操做系統控制的

4.2 特色

總結協程特色:

l  必須在只有一個單線程裏實現併發

l  修改共享數據不需加鎖

l  用戶程序裏本身保存多個控制流的上下文棧

l  附加:一個協程遇到IO操做自動切換到其它協程(如何實現檢測IO,yield、greenlet都沒法實現,就用到了gevent模塊(select機制))

優勢以下:

#1. 協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級

#2. 單線程內就能夠實現併發的效果,最大限度地利用cpu

缺點以下:

#1. 協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程

#2. 協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程

 

 一:其中第二種狀況並不能提高效率,只是爲了讓cpu可以雨露均沾,實現看起來全部任務都被「同時」執行的效果,若是多個任務都是純計算的,這種切換反而會下降效率。

#!/usr/bin/env python

# -*- coding:utf-8 -*-

import time

def consumer(res):

    '''

    任務1:接收數據,處理數據

    :param res:

    :return:

    '''

    pass

def producer():

    '''

    任務2:生產數據

    :return:

    '''

    res = []

    for i in range(10000000):

        res.append(i)

    return res

 

start=time.time()

# res=producer()

#串行執行

res=producer()

consumer(res)

# consumer(producer())會下降執行效率

stop=time.time()

print(stop-start)

二:第一種狀況的切換。在任務一遇到io狀況下,切到任務二去執行,這樣就能夠利用任務一阻塞的時間完成任務二的計算,效率的提高就在於此。

import time

def consumer():

    '''任務1:接收數據,處理數據'''

    while True:

        x=yield

 

def producer():

    '''任務2:生產數據'''

    g=consumer()

    next(g)

    for i in range(10000000):

        g.send(i)

        time.sleep(2)

 

start=time.time()

producer() #併發執行,可是任務producer遇到io就會阻塞住,並不會切到該線程內的其餘任務去執行

 

stop=time.time()

print(stop-start)

 

yield沒法作到遇到io阻塞

 

協程的本質就是在單線程下,由用戶本身控制一個任務遇到io阻塞了就切換另一個任務去執行,以此來提高效率。爲了實現它,咱們須要找尋一種能夠同時知足如下條件的解決方案:

 

#1. 能夠控制多個任務之間的切換,切換以前將任務的狀態保存下來,以便從新運行時,能夠基於暫停的位置繼續執行。

#2. 做爲1的補充:能夠檢測io操做,在遇到io操做的狀況下才發生切換

4.3 Greenlet模塊

pip3 install greenlet

4.3.1 greenlet實現狀態切換

from greenlet import greenlet

 

def eat(name):

    print('%s eat 1' %name)

    g2.switch('egon')

    print('%s eat 2' %name)

    g2.switch()

def play(name):

    print('%s play 1' %name)

    g1.switch()

    print('%s play 2' %name)

 

g1=greenlet(eat)

g2=greenlet(play)

 

g1.switch('egon')#能夠在第一次switch時傳入參數,之後都不須要

單純的切換(在沒有io的狀況下或者沒有重複開闢內存空間的操做),反而會下降程序的執行速度

 

#順序執行

import time

def f1():

    res=1

    for i in range(100000000):

        res+=i

 

def f2():

    res=1

    for i in range(100000000):

        res*=i

 

start=time.time()

f1()

f2()

stop=time.time()

print('run time is %s' %(stop-start)) #10.985628366470337

 

#切換

from greenlet import greenlet

import time

def f1():

    res=1

    for i in range(10000000):

        res+=i

        g2.switch()

 

def f2():

    res=1

    for i in range(10000000):

        res*=i

        g1.switch()

 

start=time.time()

g1=greenlet(f1)

g2=greenlet(f2)

g1.switch()

stop=time.time()

print('run time is %s' %(stop-start))

對比結果

C:\python3\python3.exe D:/python/untitled/123.py

run time is 13.106749773025513

run time is 7.793445825576782

greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時若是遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提高效率的問題

 

4.4 安裝gevent模塊

4.4.1 安裝

pip3 install  gevent     注意python3.7還不支持gevent

4.4.2 介紹

Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調

4.4.3 用法

g1=gevent.spawn(func,1,,2,3,x=4,y=5)建立一個協程對象g1,spawn括號內第一個參數是函數名,如eat,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數eat的

 

g2=gevent.spawn(func2)

 

g1.join() #等待g1結束

 

g2.join() #等待g2結束

 

#或者上述兩步合做一步:gevent.joinall([g1,g2])

 

g1.value#拿到func1的返回值

 

 

4.5 join

g1.join() #等待g1結束

g2.join() #等待g2結束

#或者上述兩步合做一步:

gevent.joinall([g1,g2])

4.6 gevent.spawn

例:遇到io主動切換

import gevent

def eat(name):

    print('%s eat 1' %name)

    gevent.sleep(2)

    print('%s eat 2' %name)

 

def play(name):

    print('%s play 1' %name)

    gevent.sleep(1)

    print('%s play 2' %name)

 

 

g1=gevent.spawn(eat,'egon')

g2=gevent.spawn(play,name='egon')

g1.join()

g2.join()

#或者gevent.joinall([g1,g2])

print('主')

輸出

C:\python3\python3.exe D:/python/untitled/123.py

egon eat 1

egon play 1

egon play 2

egon eat 2

 

上例gevent.sleep(2)模擬的是gevent能夠識別的io阻塞,而time.sleep(2)或其餘的阻塞,gevent是不能直接識別的須要用下面一行代碼,打補丁,就能夠識別了

from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊以前

或者咱們乾脆記憶成:要用gevent,須要將from gevent import monkey;monkey.patch_all()到文件的開頭

 

from gevent import monkey;monkey.patch_all()

 

import gevent

import time

def eat():

    print('eat food 1')

    time.sleep(2)

    print('eat food 2')

 

def play():

    print('play 1')

    time.sleep(1)

    print('play 2')

 

g1=gevent.spawn(eat)

g2=gevent.spawn(play)

gevent.joinall([g1,g2])

print('主')

4.7 Gevent之同步與異步

from gevent import spawn,joinall,monkey;monkey.patch_all()

 

import time

def task(pid):

    """

    Some non-deterministic task

    """

    time.sleep(0.5)

    print('Task %s done' % pid)

 

 

def synchronous():  # 同步

    for i in range(10):

        task(i)

 

def asynchronous(): # 異步

    g_l=[spawn(task,i) for i in range(10)]

    joinall(g_l)

    print('DONE')

   

if __name__ == '__main__':

    print('Synchronous:')

    synchronous()

    print('Asynchronous:')

    asynchronous()

#  上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。

#  初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,

#  後者阻塞當前流程,並執行全部給定的greenlet任務。執行流程只會在 全部greenlet執行完後纔會繼續向下走。

 

4.8 Gevent之應用舉例一(爬蟲)

from gevent import monkey;monkey.patch_all()

import gevent

import requests

import time

 

def get_page(url):

    print('GET: %s' %url)

    response=requests.get(url)

    if response.status_code == 200:

        print('%d bytes received from %s' %(len(response.text),url))

 

 

start_time=time.time()

gevent.joinall([

    gevent.spawn(get_page,'https://www.python.org/'),

    gevent.spawn(get_page,'https://www.yahoo.com/'),

    gevent.spawn(get_page,'https://github.com/'),

])

stop_time=time.time()

print('run time is %s' %(stop_time-start_time))

 

第5章 IO模型

5.1 介紹

l  阻塞IO(blocking IO)

blocking IO的特色就是在IO執行的兩個階段(等待數據和拷貝數據兩個階段)都被block了

l  非阻塞IO(non-blocking IO)

在非阻塞式IO中,用戶進程實際上是須要不斷的主動詢問kernel數據準備好了沒有。

l  多路複用IO(IO multiplexing)

 

l  異步IO(Asynchronous I/O)

對於一個network IO (這裏咱們以read舉例),它會涉及到兩個系統對象,一個是調用這個IO的process (or thread),另外一個就是系統內核(kernel)。當一個read操做發生時,該操做會經歷兩個階段

1)等待數據準備 (Waiting for the data to be ready)

2)將數據從內核拷貝到進程中(Copying the data from the kernel to the process)

 

5.2 阻塞IO模型

 

 

 

5.3 非阻塞IO

 

 

提升了cpu利用,但也增長了CPU的負載

非阻塞IO實例-服務端

from socket import *

import time

s=socket(AF_INET,SOCK_STREAM)

s.bind(('127.0.0.1',8080))

s.listen(5)

s.setblocking(False) #設置socket的接口爲非阻塞

conn_l=[]

del_l=[]

while True:

    try:

        conn,addr=s.accept()

        conn_l.append(conn)

    except BlockingIOError:

        print(conn_l)

        for conn in conn_l:

            try:

                data=conn.recv(1024)

                if not data:

                    del_l.append(conn)

                    continue

                conn.send(data.upper())

            except BlockingIOError:

                pass

            except ConnectionResetError:

                del_l.append(conn)

 

        for conn in del_l:

            conn_l.remove(conn)

            conn.close()

        del_l=[]

 

#客戶端

from socket import *

c=socket(AF_INET,SOCK_STREAM)

c.connect(('127.0.0.1',8080))

 

while True:

    msg=input('>>: ')

    if not msg:continue

    c.send(msg.encode('utf-8'))

    data=c.recv(1024)

    print(data.decode('utf-8'))

5.4 多路複用

操做系統提供的

 

 

當用戶進程調用了select,那麼整個進程會被block,而同時,kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。

    這個圖和blocking IO的圖其實並無太大的不一樣,事實上還更差一些。由於這裏須要使用兩個系統調用(select和recvfrom),而blocking IO只調用了一個系統調用(recvfrom)。可是,用select的優點在於它能夠同時處理多個connection。

    強調:

    1. 若是處理的鏈接數不是很高的話,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優點並非對於單個鏈接能處理得更快,而是在於能處理更多的鏈接。

    2. 在多路複用模型中,對於每個socket,通常都設置成爲non-blocking,可是,如上圖所示,整個用戶的process實際上是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。

    結論: select的優點在於能夠處理多個鏈接,不適用於單個鏈接

 

select網絡IO模型-服務端

from socket import *

import select

 

s=socket(AF_INET,SOCK_STREAM)

s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)

s.bind(('127.0.0.1',8081))

s.listen(5)

s.setblocking(False) #設置socket的接口爲非阻塞

read_l=[s,]

while True:

    r_l,w_l,x_l=select.select(read_l,[],[])

    print(r_l)

    for ready_obj in r_l:

        if ready_obj == s:

            conn,addr=ready_obj.accept() #此時的ready_obj等於s

            read_l.append(conn)

        else:

            try:

                data=ready_obj.recv(1024) #此時的ready_obj等於conn

                if not data:

                    ready_obj.close()

                    read_l.remove(ready_obj)

                    continue

                ready_obj.send(data.upper())

            except ConnectionResetError:

                ready_obj.close()

                read_l.remove(ready_obj)

 

#客戶端

from socket import *

c=socket(AF_INET,SOCK_STREAM)

c.connect(('127.0.0.1',8081))

 

while True:

    msg=input('>>: ')

    if not msg:continue

    c.send(msg.encode('utf-8'))

    data=c.recv(1024)

    print(data.decode('utf-8'))

                                                    

5.5 selectors模塊

IO複用:爲了解釋這個名詞,首先來理解下複用這個概念,複用也就是共用的意思,這樣理解仍是有些抽象,爲此,我們來理解下複用在通訊領域的使用,在通訊領域中爲了充分利用網絡鏈接的物理介質,每每在同一條網絡鏈路上採用時分複用或頻分複用的技術使其在同一鏈路上傳輸多路信號,到這裏咱們就基本上理解了複用的含義,即公用某個「介質」來儘量多的作同一類(性質)的事,那IO複用的「介質」是什麼呢?爲此咱們首先來看看服務器編程的模型,客戶端發來的請求服務端會產生一個進程來對其進行服務,每當來一個客戶請求就產生一個進程來服務,然而進程不可能無限制的產生,所以爲了解決大量客戶端訪問的問題,引入了IO複用技術,即:一個進程能夠同時對多個客戶請求進行服務。也就是說IO複用的「介質」是進程(準確的說複用的是select和poll,由於進程也是靠調用select和poll來實現的),複用一個進程(select和poll)來對多個IO進行服務,雖然客戶端發來的IO是併發的可是IO所需的讀寫數據多數狀況下是沒有準備好的,所以就能夠利用一個函數(select和poll)來監聽IO所需的這些數據的狀態,一旦IO有數據能夠進行讀寫了,進程就來對這樣的IO進行服務。

 

理解完IO複用後,咱們在來看下實現IO複用中的三個API(select、poll和epoll)的區別和聯繫

select,poll,epoll都是IO多路複用的機制,I/O多路複用就是經過一種機制,能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知應用程序進行相應的讀寫操做。但select,poll,epoll本質上都是同步I/O,由於他們都須要在讀寫事件就緒後本身負責進行讀寫,也就是說這個讀寫過程是阻塞的,而異步I/O則無需本身負責進行讀寫,異步I/O的實現會負責把數據從內核拷貝到用戶空間。三者的原型以下所示:

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

 

 1.select的第一個參數nfds爲fdset集合中最大描述符值加1,fdset是一個位數組,其大小限制爲__FD_SETSIZE(1024),位數組的每一位表明其對應的描述符是否須要被檢查。第二三四參數表示須要關注讀、寫、錯誤事件的文件描述符位數組,這些參數既是輸入參數也是輸出參數,可能會被內核修改用於標示哪些描述符上發生了關注的事件,因此每次調用select前都須要從新初始化fdset。timeout參數爲超時時間,該結構會被內核修改,其值爲超時剩餘的時間。

 

 select的調用步驟以下:

(1)使用copy_from_user從用戶空間拷貝fdset到內核空間

(2)註冊回調函數__pollwait

(3)遍歷全部fd,調用其對應的poll方法(對於socket,這個poll方法是sock_poll,sock_poll根據狀況會調用到tcp_poll,udp_poll或者datagram_poll)

(4)以tcp_poll爲例,其核心實現就是__pollwait,也就是上面註冊的回調函數。

(5)__pollwait的主要工做就是把current(當前進程)掛到設備的等待隊列中,不一樣的設備有不一樣的等待隊列,對於tcp_poll 來講,其等待隊列是sk->sk_sleep(注意把進程掛到等待隊列中並不表明進程已經睡眠了)。在設備收到一條消息(網絡設備)或填寫完文件數 據(磁盤設備)後,會喚醒設備等待隊列上睡眠的進程,這時current便被喚醒了。

(6)poll方法返回時會返回一個描述讀寫操做是否就緒的mask掩碼,根據這個mask掩碼給fd_set賦值。

(7)若是遍歷完全部的fd,尚未返回一個可讀寫的mask掩碼,則會調用schedule_timeout是調用select的進程(也就是 current)進入睡眠。當設備驅動發生自身資源可讀寫後,會喚醒其等待隊列上睡眠的進程。若是超過必定的超時時間(schedule_timeout 指定),仍是沒人喚醒,則調用select的進程會從新被喚醒得到CPU,進而從新遍歷fd,判斷有沒有就緒的fd。

(8)把fd_set從內核空間拷貝到用戶空間。

 

總結下select的幾大缺點:

(1)每次調用select,都須要把fd集合從用戶態拷貝到內核態,這個開銷在fd不少時會很大

 

(2)同時每次調用select都須要在內核遍歷傳遞進來的全部fd,這個開銷在fd不少時也很大

 

(3)select支持的文件描述符數量過小了,默認是1024

 

2.  poll與select不一樣,經過一個pollfd數組向內核傳遞須要關注的事件,故沒有描述符個數的限制,pollfd中的events字段和revents分別用於標示關注的事件和發生的事件,故pollfd數組只須要被初始化一次。

 

 poll的實現機制與select相似,其對應內核中的sys_poll,只不過poll向內核傳遞pollfd數組,而後對pollfd中的每一個描述符進行poll,相比處理fdset來講,poll效率更高。poll返回後,須要對pollfd中的每一個元素檢查其revents值,來得指事件是否發生。

 

3.直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。

 

epoll既然是對select和poll的改進,就應該能避免上述的三個缺點。那epoll都是怎麼解決的呢?在此以前,咱們先看一下epoll 和select和poll的調用接口上的不一樣,select和poll都只提供了一個函數——select或者poll函數。而epoll提供了三個函 數,epoll_create,epoll_ctl和epoll_wait,epoll_create是建立一個epoll句柄;epoll_ctl是注 冊要監聽的事件類型;epoll_wait則是等待事件的產生。

 

  對於第一個缺點,epoll的解決方案在epoll_ctl函數中。每次註冊新的事件到epoll句柄中時(在epoll_ctl中指定 EPOLL_CTL_ADD),會把全部的fd拷貝進內核,而不是在epoll_wait的時候重複拷貝。epoll保證了每一個fd在整個過程當中只會拷貝 一次。

 

  對於第二個缺點,epoll的解決方案不像select或poll同樣每次都把current輪流加入fd對應的設備等待隊列中,而只在 epoll_ctl時把current掛一遍(這一遍必不可少)併爲每一個fd指定一個回調函數,當設備就緒,喚醒等待隊列上的等待者時,就會調用這個回調 函數,而這個回調函數會把就緒的fd加入一個就緒鏈表)。epoll_wait的工做實際上就是在這個就緒鏈表中查看有沒有就緒的fd(利用 schedule_timeout()實現睡一會,判斷一會的效果,和select實現中的第7步是相似的)。

 

  對於第三個缺點,epoll沒有這個限制,它所支持的FD上限是最大能夠打開文件的數目,這個數字通常遠大於2048,舉個例子, 在1GB內存的機器上大約是10萬左右,具體數目能夠cat /proc/sys/fs/file-max察看,通常來講這個數目和系統內存關係很大。

 

總結:

 

(1)select,poll實現須要本身不斷輪詢全部fd集合,直到設備就緒,期間可能要睡眠和喚醒屢次交替。而epoll其實也須要調用 epoll_wait不斷輪詢就緒鏈表,期間也可能屢次睡眠和喚醒交替,可是它是設備就緒時,調用回調函數,把就緒fd放入就緒鏈表中,並喚醒在 epoll_wait中進入睡眠的進程。雖然都要睡眠和交替,可是select和poll在「醒着」的時候要遍歷整個fd集合,而epoll在「醒着」的 時候只要判斷一下就緒鏈表是否爲空就好了,這節省了大量的CPU時間,這就是回調機制帶來的性能提高。

 

(2)select,poll每次調用都要把fd集合從用戶態往內核態拷貝一次,而且要把current往設備等待隊列中掛一次,而epoll只要 一次拷貝,並且把current往等待隊列上掛也只掛一次(在epoll_wait的開始,注意這裏的等待隊列並非設備等待隊列,只是一個epoll內 部定義的等待隊列),這也能節省很多的開銷。

 

select,poll,epoll

這三種IO多路複用模型在不一樣的平臺有着不一樣的支持,而epoll在windows下就不支持,好在咱們有selectors模塊,幫咱們默認選擇當前平臺下最合適的

5.6 selectors模塊實現聊天

基於selectors模塊實現聊天-服務端

from socket import *

import selectors

 

sel=selectors.DefaultSelector()

def accept(server_fileobj,mask):

    conn,addr=server_fileobj.accept()

    sel.register(conn,selectors.EVENT_READ,read)

 

def read(conn,mask):

    try:

        data=conn.recv(1024)

        if not data:

            print('closing',conn)

            sel.unregister(conn)

            conn.close()

            return

        conn.send(data.upper()+b'_SB')

    except Exception:

        print('closing', conn)

        sel.unregister(conn)

        conn.close()

 

 

 

server_fileobj=socket(AF_INET,SOCK_STREAM)

server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)

server_fileobj.bind(('127.0.0.1',8088))

server_fileobj.listen(5)

server_fileobj.setblocking(False) #設置socket的接口爲非阻塞

sel.register(server_fileobj,selectors.EVENT_READ,accept) #至關於網select的讀列表裏append了一個文件句柄server_fileobj,而且綁定了一個回調函數accept

 

while True:

    events=sel.select() #檢測全部的fileobj,是否有完成wait data的

    for sel_obj,mask in events:

        callback=sel_obj.data #callback=accpet

        callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)

 

#客戶端

from socket import *

c=socket(AF_INET,SOCK_STREAM)

c.connect(('127.0.0.1',8088))

 

while True:

    msg=input('>>: ')

    if not msg:continue

    c.send(msg.encode('utf-8'))

    data=c.recv(1024)

    print(data.decode('utf-8'))

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息