Python併發編程之協程

協程

1、協程的本質:

單線程實現併發,在應用程序裏控制多個任務的切換+保存狀態python

2、協程的目的:

  • 想要在單線程下實現併發
  • 併發指的是多個任務看起來是同時運行的
  • 併發=切換+保存狀態

3、補充:

  • yiled能夠保存狀態,yield的狀態保存與操做系統的保存線程狀態很像,可是yield是代碼級別控制的,更輕量級
  • send能夠把一個函數的結果傳給另一個函數,以此實現單線程內程序之間的切換
  • 如何實現檢測IO,yield、greenlet都沒法實現,就用到了gevent模塊(select機制)

4、優勢

  • 應用程序級別速度要遠遠高於操做系統的切換

5、缺點

  • 多個任務一旦有一個阻塞沒有切,整個線程都阻塞在原地,該線程內的其餘的任務都不能執行了
  • 一旦引入協程,就須要檢測單線程下全部的IO行爲,實現遇到IO就切換,少一個都不行,由於若是一個任務阻塞了,整個線程就阻塞了,其餘的任務即使是能夠計算,可是也沒法運行了

注意:單純地切換反而會下降運行效率

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#併發執行
import time
 
def producer():
     g = consumer()
     next (g)
     for i in range ( 100 ):
         g.send(i)
 
def consumer():
     while True :
         res = yield
 
start_time = time.time()
producer()
stop_time = time.time()
print (stop_time - start_time)
 
#串行
import time
 
def producer():
     res = []
     for i in range ( 10000000 ):
         res.append(i)
     return res
 
 
def consumer(res):
     pass
 
start_time = time.time()
res = producer()
consumer(res)
stop_time = time.time()
print (stop_time - start_time)

greenlet

greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時若是遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提高效率的問題。web

注意:單純的切換(在沒有io的狀況下或者沒有重複開闢內存空間的操做),反而會下降程序的執行速度

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#pip3 install greenlet
from greenlet import greenlet
import time
 
def eat(name):
     print ( '%s eat 1' % name)
     time.sleep( 2 )
     g2.switch( 'tom' )
     print ( '%s eat 2' % name)
     g2.switch()
 
def play(name):
     print ( '%s play 1' % name )
     g1.switch()
     print ( '%s play 2' % name )
 
g1 = greenlet(eat)
g2 = greenlet(play)
 
g1.switch( 'tom' )
 
"""
tom eat 1
tom play 1
tom eat 2
tom play 2
"""

gevent

遇到IO阻塞時會自動切換任務redis

1、用法:

  • g1=gevent.spawn(func,1,,2,3,x=4,y=5)建立一個協程對象g1,spawn括號內第一個參數是函數名,如eat,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數eat的
  • g2=gevent.spawn(func2)
  • g1.join() #等待g1結束
  • g2.join() #等待g2結束
  • 或者上述兩步合做一步:gevent.joinall([g1,g2])
  • g1.value#拿到func1的返回值

2、補充:

  • gevent.sleep(2)模擬的是gevent能夠識別的io阻塞,
  • 而time.sleep(2)或其餘的阻塞,gevent是不能直接識別的須要用下面一行代碼,打補丁,就能夠識別了
  • from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊以前或者咱們乾脆記憶成:要用gevent,須要將from gevent import monkey;monkey.patch_all()放到文件的開頭
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#pip3 install gevent
from gevent import monkey;monkey.patch_all()
import gevent
import time
 
def eat(name):
     print ( '%s eat 1' % name)
     time.sleep( 3 )
     print ( '%s eat 2' % name)
 
def play(name):
     print ( '%s play 1' % name)
     time.sleep( 2 )
     print ( '%s play 2' % name)
 
start_time = time.time()
g1 = gevent.spawn(eat, 'tom' )
g2 = gevent.spawn(play, 'rose' )
 
g1.join()
g2.join()
stop_time = time.time()
print (stop_time - start_time)
"""
tom eat 1
rose play 1
rose play 2
tom eat 2
3.003171920776367
"""
 
 
 
from gevent import monkey;monkey.patch_all()
import gevent
import time
 
def eat(name):
     print ( '%s eat 1' % name)
     time.sleep( 3 )
     print ( '%s eat 2' % name)
 
def play(name):
     print ( '%s play 1' % name)
     time.sleep( 2 )
     print ( '%s play 2' % name)
 
g1 = gevent.spawn(eat, 'tom' )
g2 = gevent.spawn(play, 'rose' )
 
# g1.join()
# g2.join()
gevent.joinall([g1,g2])

3、經過gevent實現單線程下的socket併發

from gevent import monkey;monkey.patch_all()必定要放到導入socket模塊以前,不然gevent沒法識別socket的阻塞算法

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
"""
服務端
#基於gevent實現
"""
from gevent import monkey,spawn;monkey.patch_all()
from socket import *
 
def communicate(conn):
     while True :
         try :
             data = conn.recv( 1024 )
             if not data: break
             conn.send(data.upper())
         except ConnectionResetError:
             break
 
     conn.close()
 
def server(ip,port):
     server = socket(AF_INET, SOCK_STREAM)
     server.bind((ip,port))
     server.listen( 5 )
 
     while True :
         conn, addr = server.accept()
         spawn(communicate,conn)
 
     server.close()
 
if __name__ = = '__main__' :
     g = spawn(server, '127.0.0.1' , 8090 )
     g.join()
     
     
"""
客戶端
"""
from socket import *
from threading import Thread,currentThread
 
def client():
     client = socket(AF_INET,SOCK_STREAM)
     client.connect(( '127.0.0.1' , 8090 ))
 
     while True :
         client.send(( '%s hello' % currentThread().getName()).encode( 'utf-8' ))
         data = client.recv( 1024 )
         print (data.decode( 'utf-8' ))
 
     client.close()
 
if __name__ = = '__main__' :
     for i in range ( 500 ):
         t = Thread(target = client)
         t.start()
"""
網絡編程
B/S和C/S架構
osi五層協議
應用層
傳輸層  端口/服務相關  四次路由器 四層交換機
網絡層 ip 路由器 三層交換機
數據鏈路層 arp mac相關 網卡 交換機
物理層
socket/socketserver
tcp/udp
    tcp協議:面向鏈接的,可靠的,全雙工的,流失傳輸協議
    三次握手
    四次揮手
    粘包現象
        在接受度粘 發送信息無邊界,在接受端沒有及時接受,在緩存端粘在一塊兒
        在發送端粘,時間短,因爲優化機制粘在一塊兒的
    udp協議:面向數據報的,不可靠 無鏈接的協議
ip
arp

I/O操做 輸入會讓輸出
    input輸入到內存 input  read  recv  recvfrom   accept   connect  close
    output 從內存輸出 print write  send   sendto   accept   connect  close

阻塞和非阻塞
阻塞: cpu不工做 accept recv connect等待
非阻塞:cpu工做


##############################

人機矛盾
    cpu利用率低
磁帶存儲+批處理
    下降數據的讀取時間
    提升cpu的利用率
多道操做系統------在一個任務遇到io的時候主動讓出cpu
    數據隔離
    時空複用
    可以在一個任務遇到io操做的時候主動把cpu讓出來,給其餘的任務使用
        切換:佔時間,
        切換:操做系統作的
分時操做系統------給時間分片,讓多個任務輪流使用cpu
    cpu的輪轉
    每個程序分配一個時間片
    切換:佔時間
    反而下降來cpu的使用率
    提升了用戶體驗
分時操做系統+多道操做系統+實時操做系統
    多個程序一塊兒在計算機中執行
    一個程序若是遇到io操做,切出去讓出cpu
    一個程序沒有遇到io,可是時間片到時了,切出去讓出cpu
網絡操做系統
分佈式操做系統



##############################
進程:運行中的程序
程序和進程之間的區別
    程序只是一個文件
    進程是這個文件被cpu運行起來了
進程是計算機中最小的資源分配單位
每個進程都有一個在操做系統中的惟一標識符:pid

操做系統調度進程的算法
    短做業優先
    先來先服務
    時間片輪轉
    多級反饋算法(少在程序的一開始就使用io)

操做系統負責什麼?
    調度進程前後執行的順序,控制執行的時間等等
    資源的分配

並行與併發
    並行:兩個程序兩個cpu,每一個程序分別佔用一個cpu本身執行本身的
         看起來是同時執行,實際在每個時間點上都在各自執行着
    併發:兩個程序一個cpu每一個程序交替在一個cpu上執行,看起來在同時執行,可是實際上仍然是串行
同步:
    調用這個函數,並等待這個函數結束
異步:
    調用一個函數,不等待這個方法結束,也不關心這個方法作了什麼
阻塞:cpu不工做
非阻塞:cpu工做
同步阻塞:
    conn.recv
    調用函數的這個過程,有io操做
同步非阻塞:
    func() 沒有io操做
    socket 非阻塞的tcp協議的時候
    調用函數(這個函數內部不存在io操做)
異步非阻塞:
    把func仍到其餘任務裏去執行裏,沒有io操做
    自己的任務和func任務各自執行各自的,沒有io操做
異步阻塞:


雙擊圖標發生的事情:
程序在開始運行以後,並非當即開始執行代碼,而是會進入就緒狀態,等待操做系統調度開始運行
操做系統爲其建立進程,分配一塊空間,pid---》就緒(ready)---》運行(runing)---》阻塞(blocking)
運行(沒有遇到io操做時間片到了)---》就緒
運行(遇到io)--》阻塞
運行--》運行完了--》結束進程
阻塞io結束---》就緒
阻塞影響了程序運行的效率

##############################
進程是計算機中最小的資源分配單位(進程負責圈資源)
    數據隔離的
    建立進程,時間開銷大
    銷燬進程,時間開銷大
    進程之間切換,時間開銷大
線程:是計算機中能被cpu調度的最小單位(線程是負責執行具體代碼的)
    線程的建立,也須要一些開銷(一個存儲局部變量的結構,記錄狀態)
    建立,銷燬,切換開銷遠遠小於進程
    每一個進程中至少有一個線程
    一個進程中的多個線程是能夠共享這個進程的數據的

是進程中的一部分
每個進程中至少有一個線程,線程是負責執行具體代碼的
只負責執行代碼,不負責存儲共享的數據,也不負責資源分配


python中的線程比較特殊,因此進程也有可能被用到。

pid子進程
ppid父進程
在pycharm中啓動的全部py程序都是pycharm的子進程


tcp: 可靠
https: 安全
單核 併發
多核 多個程序跑在多個cpu上,在同一時刻並行

##############################

操做系統
    1。計算機中全部的資源都是由操做系統分配的
    2。操做系統調度任務:時間分片,多道機制
    3。cpu的利用率是咱們努力的指標

進程:開銷大 數據隔離 資源分配單位 cpython下能夠利用多核  由操做系統調度的
    進程的三狀態:就緒 運行 阻塞
    multiprocessing模塊
    Process-開啓進程
    Lock-互斥鎖
        爲何要在進程中加鎖
        由於進程操做文件也會發生數據不安全
    Queue -隊列 IPC機制(Pipe,redis,memcache,rabbitmq,kafka)
    Manager-提供數據共享機制

線程:開銷小 數據共享 cpu調度單位 cpython下不能利用多核  由操做系統調度的
    GIL鎖:全局解釋器鎖,Cpython解釋器提供的,致了一個進程中多個線程同一時刻只有一個線程能訪問CPU--多線程不能利用多核
    Thread類--能開啓線程start,等待線程結束join
    Lock-互斥鎖  互斥鎖能在一個線程中連續acquire,效率相對高
    Rlock--遞歸鎖 能夠在一個線程中連續acquire,效率相對低
    死鎖現象如何發生,如何避免?
    線程隊列queue模塊
        Queue
        LifoQueue
        PriorityQueue
池
    實例化一個池
    提交任務到池中,返回一個對象
        使用這個對象獲取返回值
        回調函數
    阻塞等待池中的任務都結束

數據安全問題
數據隔離和通訊


協程:多個任務在一條線程上來回切換,用戶級別的,由咱們本身寫的python代碼來控制切換的,是操做系統不可見的
在Cpython解釋器下----協程和線程都不能利用多核,都是在一個CPU上輪流切換
    因爲多線程自己就不能利用多核
    因此即使是開啓來多個線程也只能輪流在一個cpu上執行
    協程若是把全部任務的IO操做都規避掉,只剩下須要使用CPU的操做
    就意味着協程就能夠作到提升CPU利用率的效果

多線程和協程
    線程 切換須要操做系統,開銷大 給操做系統的壓力大
        操做系統對IO操做的感知更加靈敏
    協程 切換須要 python代碼 開銷小,用戶操做可控  徹底不會增長操做系統的壓力
        用戶級別可以對IO操做的感知比較低

協程:可以在一個線程下的多個任務之間來回切換,那麼每個任務都是一個協程
兩種切換方式
    原生python完成  yield asyncio
    c語言完成的python模塊 greenlet  gevent

1。一個線程中的阻塞都被其餘的各類任務佔滿了
2。讓操做系統以爲這個線程很忙
儘可能的減小這個線程進入阻塞的狀態
提升了單線程對CPU的利用率
3。多個任務在同一個線程中執行
也達到了一個併發的效果
規避了每個任務的io操做
減小了線程的個數,減輕了操做系統的負擔



咱們寫協程:在一條線程上最大限度的提升CPU的使用率
           在一個任務中遇到IO的時候就切換到其餘任務
特色:
    開銷很小,是用戶級的,只能從用戶級別感知的IO操做
    不能利用多核,數據共享,數據安全
模塊和用法:
    gevent (擴展模塊)基於greenlet(內置模塊)切換   aiohttp爬蟲模塊基於asyncio    sanic異步框架
        先導入模塊
        導入monkey,執行patch all
        寫一個函數看成協程要執行的任務
        協程對象 = gevent.spawn(函數名,參數,)
        協程對象.join(),gevent.joinall([g1,g2...])

    分辨gevent是否識別了咱們寫的代碼中的io操做的方法
        在patchall以前打印一下涉及到io操做的函數地址
        在patchall以後打印一下涉及到io操做的函數地址
        若是兩個地址一致,說明gevent沒有識別這個io,若是不一致說明識別了

    asyncio 基於yield機制切換的
    async 標識一個協程函數
    await 後面跟着一個asyncio模塊提供的io操做的函數
    loop 事件循環,負責在多個任務之間進行切換的

    最簡單的協程函數是如何完成的


"""






import os
import time
from multiprocessing import Process
from multiprocessing import Queue
def func(exp,q):
    print('start',os.getpid())
    ret = eval(exp)
    q.put(ret)
    time.sleep(1)
    print('end',os.getpid())

if __name__ == '__main__': #window下須要加上這一句
    q=Queue() #先進先出
    p=Process(target=func,args=('1+2+3',q)) #建立一個即將要執行func函數的進程對象
    # p.daemon=True #守護進程是隨着主進程的代碼的結束而結束的
    p.start() #異步非阻塞    調用開啓進程的方法,可是並不等待這個進程真的開啓
    # p.join() #同步阻塞直到p對應的進程結束以後才結束阻塞
    # print(p.is_alive())
    # p.terminate() # 異步非阻塞  強制結束一個子進程
    # print(p.is_alive()) #子進程還活着,由於操做系統還沒來得及關閉進程
    # time.sleep(0.01)
    # print(p.is_alive())#操做系統已經響應了咱們要關閉進程的需求,再去檢測的時候,獲得的結束是進程已經結束了
    print('main',os.getpid()) #主進程代碼執行完畢,可是主進程沒有結束,等待子進程結束
    print(q.get())

"""
操做系統建立進程的方式不一樣
Windows操做系統執行開啓進程的代碼
    實際上新的子進程須要經過import父進程的代碼來完成數據的導入工做
    因此有一些內容咱們只但願在父進程中完成,就寫在if __name__ == '__main__': 下面

iOS和Linux是直接從內存級別去拷貝代碼(fork),從父進程拷貝數據到子進程

主進程沒有結束,等待子進程結束
主進程負責回收子進程的資源
若是子進程執行結束,父進程沒有回收資源,那麼這個子進程會變成一個殭屍進程    

主進程的結束邏輯:
    主進程的代碼結束
    全部的子進程結束
    給子進程回收資源
    主進程結束
主進程怎麼知道子進程結束的呢? 監控文件
    基於網絡,文件    
join方法
    把一個進程的結束事件封裝成一個join方法
    執行join方法的效果就是阻塞直到這個子進程執行結束就結束阻塞
    
    在多個子進程中執行join

p=Process(target=函數名,args=(參數,))
進程對象和進程並無直接的關係,只是存儲來一些和進程相關的內容,此時此刻,操做系統尚未接到建立進程的指令 
p.start()開啓來一個進程----這個方法至關於給來操做系統一條指令
satrt方法的非阻塞和異步的特色
    在執行開啓進程這個方法的時候
    咱們既不等待這個進程開啓,也不等待操做系統給咱們的相應
    這裏只是負責通知操做系統去開啓一個進程
    開啓了一個子進程以後,主進程的代碼和子進程的代碼徹底異步
    
p.daemon 守護進程是隨着主進程代碼的結束而結束的
全部的子進程都必須在主進程結束以前結束,由主進程來負責回收資源 

p.is_alive()
p.terminate() 強制結束一個子進程的


什麼是異步非阻塞?
    terminate,start  
什麼是同步阻塞
    join      
    
1.若是在一個併發的場景下,涉及到某部份內容是須要修改一些全部進程共享的數據資源,須要加鎖來維護數據的安全
2.在數據安全的基礎上,才考慮效率問題
3.同步存在的意義就是數據安全   

在主進程中實例化 lock=Lock()
在子進程中,對須要加鎖的代碼進行with lock:
    with lock: 至關於lock.acquire()和lock.release()
在進程中須要加鎖的場景
    1.操做共享的數據資源(文件,數據庫)
    2.對資源進行修改,刪除操做    


進程之間的通信--IPC(inter process communication) 
隊列和管道都是IPC機制,隊列進程之間數據安全,管道進程之間數據不安全
第三方工具(軟件)提供給咱們的IPC機制,集羣的概念(高可用)
redis
memcache
kafka
rabbitmq
併發需求,高可用,斷電保存數據,解耦


from multiprocessing import Queue
Queue(天生就是數據安全的) 基於 文件家族的socket,pickle和lock實現的
隊列就是管道加鎖
get():阻塞的
get_nowait():非阻塞的

下面三個方法不推薦使用,多進程中不許確()
q.empty() 判斷隊列是否爲空
q.qsize() 返回隊列中的數據個數
q.full() 判斷隊列是否爲滿



pipe 管道(不安全的)基於 文件家族的socket,pickle實現的
from multiprocessing import Pipe
pip=Pipe()
pip.send()
pip.recv()

解耦:修改  複用  可讀性
把寫在一塊兒的大的功能分開成多個小的功能處理

joinablequeue
q.join() 阻塞  直到這個隊列中全部的內容都被取走且task_done

multiprocessing中有一個manager類
封裝了全部和進程相關的,數據共享,數據傳遞相關的數據類型
可是對於字典列表這一類的數據操做的時候會產生數據不安全
須要加鎖解決問題,而且須要儘可能少的使用這種方式

線程自己是能夠利用多核的
cpython解釋器中不能實現多線程利用多核 
垃圾回收機制 gc  引用計數+分代回收
專門有一條線程來完成垃圾回收
對每個在程序中的變量統計引用計數

鎖:GIL全局解釋器鎖
cpython解釋器中的機制,致使了在同一個進程中多個線程不能同時利用多核。
python的多線程只能是併發不能是並行

線程即使有GIL,也會出現數據不安全的問題
1.操做的是全局變量
2.作如下操做:
    +=,-=,*=,/=先計算再賦值才容易出現數據不安全的問題
    包括list[0]+=1,dic['key']-=1



a=0
def func():
    global a
    a+=1

import dis
dis.dis(func)  #查看func的cpu指令

互斥鎖是鎖中的一種:在同一個線程中,不能連續acquire屢次,(在一個線程中連續屢次acquire會死鎖)
死鎖現象:有多把鎖+多把鎖交替使用,互斥鎖在一個線程中連續acquire
遞歸鎖 (在一個線程中連續屢次acquire不會死鎖)
好:在同一個進程中屢次acquire也不會發生阻塞
很差:佔用了更多資源

遞歸鎖---將多把互斥鎖變成了一把遞歸鎖,遞歸鎖也會發生死鎖現象(多把遞歸鎖交替使用的時候),之後的代碼儘可能用一把鎖。
mutexB=mutexA=RLock()
注意不是:mutexB=RLock()  mutexA=RLock() 否則仍是同樣會發生死鎖現象



保證了整個python程序中,只能有一個線程被CPU執行
緣由;cpython解釋器中特殊的垃圾回收機制
GIL鎖致使來線程不能並行,可是能夠併發
因此使用多線程並不影響高io型的操做
只會對高計算型的程序有效率上的影響
遇到高計算的:多進程+多線程  或者 分佈式


遇到IO操做的時候
5億條cpu指令/s
5-6cpu指令==一句python代碼
幾千萬條python代碼
web框架,幾乎都是多線程


主線程何時結束?等待全部子線程結束以後才結束
主線程若是結束了,主進程也就結束了
t.ident  線程id
from threading import Thread,current_thread,active_count,enumerate
active_count()==len(enumerate())
current_thread.ident  線程id   在哪個線程裏,current_thread獲得的就是這個當前線程的信息
在線程中不能從主線程結束一個子線程 沒有terminate
不要在子線程裏隨便修改全局變量
守護線程一直等到全部的非守護線程都結束以後才結束
除了守護了主線程的代碼以外也會守護子線程

p.daemon 守護進程是隨着主進程代碼的結束而結束的
非守護線程不結束,主線程也不結束,主線程結束了,主進程也結束
結束順序:
非守護線程結束--》主線程結束--》主進程結束——》守護線程也結束



什麼是生產者消費者模型
把一個產生數據而且處理數據的過程解耦
讓生產數據的過程和處理數據的過程達到一個工做效率上的平衡
中間的容器,在多進程中使用隊列或者可被join的隊列,作到控制數據的量
當數據過剩的時候,隊列的大小會控制消費者的行爲
當數據嚴重不足的時候,隊列會控制消費者的行爲
而且還能夠經過按期檢測隊列中元素的個數來調節生產者消費者的個數
爬蟲:
    請求網頁的平均時間是0.3s
    處理網頁代碼的時候是0.003s
    100倍,每啓動100個線程生產數據,就啓動一個線程來處理數據
web程序的server端
    每秒鐘有6w條請求
    一個服務每s中只能處理2000條
    先寫一個web程序,只負責一件事情,就是接收請求,而後把請求放到隊列中(消息中間件)
    再寫不少個server端,從隊列中獲取請求,而後處理,而後返回結果

# from queue import Queue #先進先出隊列
import queue  #線程之間的通訊,線程安全

先進先出:寫一個server全部的用戶的請求放在隊列裏,先來先服務的思想  q=queue.Queue(3)
後進先出:算法中    q=queue.LifoQueue(3) #後進先出->堆棧
優先級隊列:自動的排序(vip號碼段)告警級別    q=queue.PriorityQueue(3) #優先級隊列   q.put((10,'one'))

池:預先的開啓固定個數的進程數,當任務來臨的時候,直接提交給已經開好的進程/線程,讓其去執行就能夠了
節省了進程,線程的開啓 關閉 切換鎖花費的時間
而且減輕了操做系統調度的負擔

shutdown 關閉池以後就不能繼續提交任務,而且會阻塞,直到已經提交的任務完成
result() 同步阻塞
submit() 異步非阻塞

5*20*500

是單獨開啓線程進程仍是池?
若是隻是開啓一個子進程作一件事情,就能夠單獨開線程
有大量的任務等待程序去作,要達到必定的併發數,開啓線程池
根據你程序的io操做也能夠斷定是用池仍是不用池
socket大量的阻塞io ,socket server就沒有用池,用的是線程
爬蟲的時候,池

回調函數:執行完子線程任務以後直接調用對應的回調函數
        爬取網頁 須要等待數據傳輸和網絡上的響應高IO的--子線程
        分析網頁,沒有什麼IO操做--這個操做不必在子線程完成,交給回調函數
兩件事情:存在IO操做的事情,基本不存在IO操做的事情
obj=tp.submit(io操做對應的函數)
obj.result() 是一個阻塞方法 獲取返回值
obj.add_done_callback(計算型的事情)

obj=tp.submit(須要在子線程執行的函數名,參數)
obj.add_done_callback(子線程執行完畢以後要執行的代碼對應的函數)

進程和線程都有鎖
全部在線程中能工做的基本都不能在進程中工做
在進程中可以使用的基本在線程中也可使用

沒一個進程下面都有一個獨立的垃圾回收機制的線程,不須要擔憂開了多進程之後,垃圾回收機制怎麼處理


"""

from multiprocessing import process
from threading import Thread
import os
def tfunc():
    print(os.getpid())

def pfunc():
    print('pfunc-->',os.getpid())
    Thread(target=tfunc).start()

if __name__ == '__main__':
    Process(target=pfunc).start()




from multiprocessing import Queue
import queue
q=Queue(3)
q.put(1)
q.put(2)
q.put(3)
print('aaa')
# q.put(44444)  #當隊列爲滿的時候再向隊列中放數據,隊列會阻塞
try:
    q.put_nowait(6) #當隊列爲滿的時候再向隊列中放數據,會報錯而且會丟失數據
except queue.Full:
    pass
print('bbb')

print(q.get())
print(q.get())
print(q.get())
# print(q.get()) #在隊列爲空的時候會發生阻塞
try:
    q.get_nowait() #當隊列爲空的時候直接報錯
except queue.Empty:
    pass





"""
##############################
協程:多個任務在一條線程上來回切換
咱們寫協程:在一條線程上最大限度的提升CPU的使用率
           在一個任務中遇到IO的時候就切換到其餘任務
特色:
    開銷很小,是用戶級的,只能從用戶級別感知的IO操做
    不能利用多核,數據共享,數據安全
模塊和用法:
    gevent 基於greenlet切換
        先導入模塊
        導入monkey,執行patch all
        寫一個函數看成協程要執行的任務
        協程對象 = gevent.spawn(函數名,參數,)
        協程對象.join(),gevent.joinall([g1,g2...])

    分辨gevent是否識別了咱們寫的代碼中的io操做的方法
        在patchall以前打印一下涉及到io操做的函數地址
        在patchall以後打印一下涉及到io操做的函數地址
        若是兩個地址一致,說明gevent沒有識別這個io,若是不一致說明識別了

    asyncio 基於yield機制切換的
    async 標識一個協程函數
    await 後面跟着一個asyncio模塊提供的io操做的函數
    loop 事件循環,負責在多個任務之間進行切換的

    最簡單的協程函數是如何完成的
"""



from gevent import monkey
monkey.patch_all()
import gevent
import time


def eat(name):
    print('%s eat 1' % name)
    time.sleep(3)
    print('%s eat 2' % name)


def play(name):
    print('%s play 1' % name)
    time.sleep(2)
    print('%s play 2' % name)
    return '返回值'

g_l=[]
for i in range(10):
    g = gevent.spawn(eat, 'tom')  #注意要遇到阻塞纔會切換
    g_l.append(g)
gevent.joinall(g_l) #這就是阻塞的

g2=gevent.spawn(play,'rose')
g2.join()
g2.value #獲取返回值,須要注意的是這個是一個屬性,須要在join後才能獲取到,否則就是None  此處打印 返回值


import asyncio
async def func(): #協程方法
    print("start")
    await asyncio.sleep(1)  #阻塞  須要一個關鍵字await告知是阻塞須要執行一個協程了,若是用await須要在函數以前加上async表示是一個協程函數
    print("end")
    return 123

async def func1():
    print("start")
    await asyncio.sleep(1)
    print("end")
    return 456

async def func2():
    print("start")
    await asyncio.sleep(1)
    print("end")
    return 789

#啓動一個任務
loop=asyncio.get_event_loop() #建立一個事件循環
loop.run_until_complete(func()) #把func任務丟到事件循環中去執行    執行一個協程函數  和join差很少意思

#啓動多個任務,而且沒有返回值
loop=asyncio.get_event_loop()
wait_l=asyncio.wait([func(),func1(),func2()]) #執行多個協程函數
loop.run_until_complete(wait_l)


#啓動多個任務,而且有返回值
loop=asyncio.get_event_loop()
f=loop.create_task(func())
f1=loop.create_task(func1())
f2=loop.create_task(func2())
task_l=[f,f1,f2] # 按照這個順序取返回值
wait_l=asyncio.wait([f,f1,f2])
loop.run_until_complete(wait_l)
for i in task_l:
    print(i.result()) #返回值

#誰先回來先取誰的值
import asyncio
async def demo(i):
    print("start")
    await asyncio.sleep(10-i)
    print("end")
    return i,456

async def main():
    task_l=[]
    for i in range(10):
        task = asyncio.ensure_future(demo(i))
        task_l.append(task)

    for ret in asyncio.as_completed(task_l):
        res = await ret
        print(res)

loop=asyncio.get_event_loop()
loop.run_until_complete(main())

"""
await 阻塞 協程函數這裏要切換出去,還能保證一下子在切回來
await必須寫在async函數裏,async函數是協程函數
loop事件循環
全部的協程的執行,調度都離不開這個loop 
"""
asyncio
a=0
def func():
    global a
    a+=1

import dis
dis.dis(func)  #查看func的cpu指令
dis
相關文章
相關標籤/搜索