進程 線程 協程

進程html

 

進程 :
什麼是進程?
是操做系統的發展過程當中,爲了提升CPU的利用率,在操做系統同時運行多個程序的時候,爲了數據的安全\代碼不混亂而被創造出來的一個概念
每個程序運行起來都至少是一個進程.
進程是計算機中最小的資源分配單位
進程被操做系統調度的,有不少相關的算法 - 早期的操做系統
進程之間是數據隔離的
進程的三狀態 就緒 運行 阻塞
同步異步
    同步 : 一個任務的執行依賴另外一個事務的結束
            join lock
    異步 : 一個任務的執行不依賴另外一個事務的結束
            start terminate
阻塞非阻塞
    阻塞   : accept recv recvfrom queue.get join
    非阻塞 : setblocking = False
併發並行
    並行是特殊的併發
    並行就是 同一時刻 兩個以上的程序同時在cpu上執行
    併發就是 同一時段 兩個以上的程序看起來在同時執行
IO概念 : 文件操做 數據庫操做 網絡傳輸 用戶輸入輸出
    Input  獲得bytes/str
    Output 發送數據/輸出數據

由於進程與進程之間本質上是異步且數據隔離
守護進程 : 子進程等待主進程的代碼結束就結束了
同步控制
    join
    鎖 - 互斥鎖 : 多個進程同時對一個數據進行操做的時候 操做同一個文件/數據庫/管道/Manager.dict
    信號量
    事件
數據共享 - 數據不安全
    Manager
IPC-進程之間通訊
    管道
    隊列 - 生產者消費者模型(爲了解決數據的生產和處理的效率問題)
    第三方工具(消息隊列,消息中間件)
進程池
    解決大量任務 開啓多個進程的開銷過大的問題
    節省資源,提升併發效率的
    通常開進程數 cpu_count * 1 or 2
進程? 時間片? 阻塞非阻塞,同步異步,進程三狀態,併發並行,數據隔離,IO概念//進程池,IPC,鎖,事件,信號量 ...
Python並不支持真正意義上的多線程。Python中提供了多線程模塊,但若是想經過多線程提升代碼的速度,並不推薦使用多線程模塊。Python中有一個全局鎖Global Interpreter Lock(GIL),全局鎖會確保任什麼時候候多個線程中只有一個會被執行。線程的執行速度很是快,會誤覺得線程是並行執行的,但實際上都是輪流執行。通過GIL處理後,會增長線程執行的開銷。
全局鎖 GIL(Global interpreter lock) 並非 Python 的特性,而是在實現 Python 解析器(CPython)時所引入的一個概念。Python有CPython,PyPy,Psyco 等不一樣的 Python 執行環境,其中 JPython 沒有GIL。CPython 是大部分環境下默認的 Python 執行環境,GIL 並非 Python 的特性,Python 徹底能夠不依賴於 GIL。
GIL 限制了同一時刻只能有一個線程運行,沒法發揮多核 CPU 的優點。GIL 本質是互斥鎖,都是將併發運行變成串行,以此來控制同一時間內共享數據只能被一個任務所修改,進而保證數據安全。在一個 Python 的進程內,不只有主線程或者由主線程開啓的其它線程,還有解釋器開啓的垃圾回收等解釋器級別的線程。進程內,全部數據都是共享的,代碼做爲一種數據也會被全部線程共享,多個線程先訪問到解釋器的代碼,即拿到執行權限,而後將 target 的代碼交給解釋器的代碼去執行,解釋器的代碼是全部線程共享的,因此垃圾回收線程也可能訪問到解釋器的代碼而去執行,所以爲了保證數據安全須要加鎖處理,即 GIL。 
因爲GIL 的存在,同一時刻同一進程中只有一個線程被執行。多核 CPU能夠並行完成計算,所以多核能夠提高計算性能,但 CPU 一旦遇到 I/O 阻塞,仍然須要等待,因此多核CPU對 I/O 密集型任務提高不明顯。根據執行任務是計算密集型仍是I/O 密集型,不一樣場景使用不一樣的方法,對於計算密集型任務,多進程佔優點,對於 I/O 密集型任務,多線程佔優點
GIL

 

全部的程序 - 任務
全部的任務 - 進程
進程 ?  進行中的程序  PID進程ID
    進程是計算機中資源分配的最小單位
    進程間數據隔離   要通訊的話運用socket
  進程調度:--先來先服務調度算法,短做業優先,時間片輪轉法,多級反饋機制

併發 資源有限的狀況下,AB程序交替使用cpu目的是提升效率
並行 同一時刻都在執行 多核  (是併發裏的一種特殊狀況)  更苛刻的條件

同步:  程序順序執行,多個任務之間串行執行 (洗衣完--作飯完--洗碗)
異步:  多個任務同時運行  (在同一時間內洗衣作飯洗碗)

阻塞: 程序因爲不符合某個條件或者等待某個條件知足 而在某一個地方進入等待狀態
非阻塞: 程序正常執行
同步阻塞
一件事兒一件事兒的作
中間還要被阻塞
同步非阻塞 : 費力不討好
一件事兒一件事兒的作
可是不阻塞
異步阻塞
同時進行的
每一件事兒都會遇到阻塞事件
異步非阻塞
幾個事情同時進行
每一件事都不阻塞
 
import os,time
print(os.getpid())  #獲取當前進程號
print(os.getppid()) #獲取當前父進程id

time.sleep(5)
print(os.getpid()) 
查看進程號
import multiprocessing  #是個包

from multiprocessing import Process
import os
def son_process():
    # 這個函數的代碼實在子進程執行的
    print('執行我啦',os.getpid(),os.getppid())
    print()

if __name__ == '__main__':
    print('1-->',os.getpid()) #主進程
    p = Process(target=son_process) #實例化
    p.start()  #進程開始
下面是打印結果
#  1--> 6416
#  執行我啦 7388 6416
python 開啓一個子進程
import time
from multiprocessing import Process
# 經過併發實現一個有併發效果的socket server

# 1.開啓了一個子進程就已經實現了併發: 父進程(主進程)和子進程併發(同時執行)
def son_process():
    print('son start')
    time.sleep(1)
    print('son end')

if __name__ == '__main__':
    p = Process(target=son_process) #建立子進程
    p.start() #通知操做系統開啓一個子進程  os響應需求 分配資源 執行進程中的代碼
    print('主進程')
併發的主/子進程
import time
from multiprocessing import Process
# 經過併發實現一個有併發效果的socket server

# 1.開啓了一個子進程就已經實現了併發: 父進程(主進程)和子進程併發(同時執行)
def son_process():
    print('son start')
    time.sleep(1)
    print('son end')

if __name__ == '__main__':
    p = Process(target=son_process) #建立子進程
    p.start() #通知操做系統開啓一個子進程  os響應需求 分配資源 執行進程中的代碼

    for i in range(5):
        print('主進程')
        time.sleep(0.3)
時間測試 主/子進程
import time
from multiprocessing import Process
def son_process():
    print('son start')
    time.sleep(1)
    print('son end')

if __name__ == '__main__':
    for i in range(3):
        p = Process(target=son_process)
        p.start()
開啓多個子進程
import time
from multiprocessing import Process
def son_process(i):
    print('son start',i)
    time.sleep(1)
    print('son end',i)

if __name__ == '__main__':
    for i in range(10):
        p = Process(target=son_process,args=(i,)) #args必須元組
        p.start()  # 通知操做系統 start並不意味着子進程已經開始了
子進程中傳參數
import time
from multiprocessing import Process
def son_process(i):
    print('son start',i)
    time.sleep(1)
    print('son end',i)

if __name__ == '__main__':
    for i in range(10):
        p = Process(target=son_process,args=(i,))  #子進程不支持返回值
        p.start()
    print('主進程的代碼執行完畢')
# 主進程會等待子進程結束以後才結束
# 爲何?
# 父進程負責建立子進程,也負責回收子進程的資源
主進程和子進程之間的關係
 - server.py-
import socket,time
from multiprocessing import Process
def talk(conn):
    conn, addr = sk.accept()
    print(conn)
    while True:
        msg = conn.recv(1024).decode()
        time.sleep(10)
        conn.send(msg.upper().encode())

if __name__ == '__main__':
    # 這句話下面的全部代碼都只在主進程中執行
    sk = socket.socket()
    sk.bind(('127.0.0.1',9000))
    sk.listen()
    while True:
        conn,addr = sk.accept()
        Process(target=talk,args=(sk,)).start()

# 卡 大量的while True 而且代碼中並無太多的其餘操做
# 若是咱們使用socketserver,不會這麼卡
# 多進程確實能夠幫助咱們實現併發效果,可是還不夠完美
# 操做系統沒開啓一個進程要消耗大量的資源
# 操做系統要負責調度進程 進程越多 調度起來就越吃力

 - clitent.py-
import socket

sk = socket.socket()

sk.connect(('127.0.0.1',9000))
while True:
    sk.send(b'hello')
    print(sk.recv(1024))
多進程實現socketserver
def son_process(i):
    while True:
        print('son start',i)
        time.sleep(0.5)
        print('son end',i)

if __name__ == '__main__':
    p = Process(target=son_process, args=(1,))
    p.start()           # 開啓一個子進程,異步的
    print('主進程的代碼執行完畢')
    print(p.is_alive())  # 子進程還活着
    p.terminate()        # 結束一個子進程,異步的
    print(p.is_alive())  # 子進程還在活着
    time.sleep(0.1)
    print(p.is_alive()) # False
主進程可不能夠直接結束一個子進程?
n = [100]
def sub_n():  # 減法
    global n  # 子進程對於主進程中的全局變量的修改是不生效的
    n.append(1)
    print('子進程n : ',n)   #子進程n :  [100, 1]
if __name__ == '__main__':
    p = Process(target = sub_n)
    p.start()
    p.join()     # 阻塞 直到子進程p結束
    print('主進程n : ',n)  #主進程n :  [100]
數據隔離
# 主進程裏的print('主進程n : ',n)這句話在十個子進程執行完畢以後才執行
n = [100]
import random
def sub_n():
    global n  # 子進程對於主進程中的全局變量的修改是不生效的
    time.sleep(random.random())
    n.append(1)
    print('子進程n : ',n)
if __name__ == '__main__':
    p_lst = []  #進程添加進列表
    for i in range(10):
        p = Process(target = sub_n) #建立
        p.start() #通知os 開啓
        p_lst.append(p)
    for p in p_lst:p.join()  # 阻塞 只有一個條件是可以讓我繼續執行 這個條件就是子進程結束
    print('主進程n : ',n)
開啓十個進程執行subn
n = [100]
def sub_n():
    global n  # 子進程對於主進程中的全局變量的修改是不生效的
    n.append(1)
    print('子進程n : ',n)
    time.sleep(10)
    print('子進程結束')

if __name__ == '__main__':
    p = Process(target = sub_n)
    p.start()
    p.join(timeout = 5)     # 若是不設置超時時間 join會阻塞直到子進程p結束
#     # timeout超時
#     # 若是設置的超時時間,那麼意味着若是不足5s子進程結束了,程序結束阻塞
#     # 若是超過5s尚未結束,那麼也結束阻塞
    print('主進程n : ',n)
    p.terminate()  # 也能夠強制結束一個子進程
join拓展 超時時間
# 設置子進程爲守護進程,守護進程會隨着主進程代碼的結束而結束
# 因爲主進程要負責給全部的子進程收屍,因此主進程必須是最後結束,守護進程只能在主進程的代碼結束以後就認爲主進程結束了
# 守護進程在主進程的代碼結束以後就結束了,不會等待其餘子進程結束
#
# 但願守護進程必須等待全部的子進程結束以後才結束
# ????
# import time
# from multiprocessing import Process
# def alive():
#     while True:
#         print('鏈接監控程序,而且發送報活信息')
#         time.sleep(0.6)
#
# def func():
#     '主進程中的核心代碼'
#     while True:
#         print('選擇的項目')
#         time.sleep(1)
#         print('根據用戶的選擇作一些事兒')
#
# if __name__ == '__main__':
#     p = Process(target=alive)
#     p.daemon = True   # 設置子進程爲守護進程,守護進程會隨着主進程代碼的結束而結束
#     p.start()
#     p = Process(target=func)
#     p.start()
#     p.join()   # 在主進程中等待子進程結束,守護進程就能夠幫助守護其餘子進程了


# 守護進程
# 1.守護進程會等待主進程的代碼結束而結束,不會等待其餘子進程的結束
# 2.要想守護進程等待其餘子進程,只須要在主進程中加上join
守護進程
for i in range(5):
    pass
print(i)  # i=4

lst = []
for i in range(5):
    p = Process()
    lst.append(p)
    p.start()
for p in lst:
    p.join()
    p.terminate()
操做多個子進程的結束terminate和join阻塞
import os
from multiprocessing import Process
class MyProcess(Process): #繼承
    def __init__(self,參數):
        super().__init__() #父類 初始化
        self.一個屬性 = 參數 

    def run(self):
        print('子進程中要執行的代碼')

if __name__ == '__main__':
    conn = '一個連接'
    mp = MyProcess(conn)
    mp.start()
面向對象 開啓子進程的方法
當多個進程使用同一份數據資源的時候,就會引起數據安全或順序混亂問題。
接下來,咱們以  模擬搶票  爲例,來看看數據安全的重要性。
import json
import time
from multiprocessing import Process,Lock

def search(name):
    '''查詢餘票的功能'''
    with open('ticket') as f:  # 'r'
        dic = json.load(f)  # 讀取  dict
        print(name , dic['count'])

def buy(name): # 買票
    with open('ticket') as f:
        dic = json.load(f)
    time.sleep(0.1)
    if dic['count'] > 0:
        print(name,'買到票了')
        dic['count'] -= 1
    time.sleep(0.1)
    with open('ticket','w') as f:
        json.dump(dic,f)  # 寫進去

def get_ticket(name,lock): #整個操做
    search(name) # 先查
    lock.acquire()  # 只有第一個到達的進程才能獲取鎖,剩下的其餘人都須要在這裏阻塞  上鎖
    buy(name)  # 再買
    lock.release()  # 有一我的還鎖,會有一我的再結束阻塞拿到鑰匙  還鎖

if __name__ == '__main__':
    lock = Lock()  # 實例化鎖
    for i in range(10):  # 10個進程
        p = Process(target=get_ticket,args=('name%s'%i,lock)) # 建立
        p.start() # 通知os 開啓

# tips : ticket 裏面的數據結構 {"count": 0}

    # 模擬過程描述:
# 第一個來的人 取鑰匙 開門 進門 關門 帶着鑰匙反鎖
# 第一個拿到鑰匙的人  開門 出門 鎖門 掛鑰匙
鎖 multiprocess.Lock 模擬搶票 例子
進程 狀態碼 Z/z 殭屍進程 linux命令
主進程中控制子進程的方法:
    p = Process(target,args) #建立這一刻 根本沒有通知操做系統
    p.start() #通知os 開啓子進程  異步非阻塞
    p.terminate() #通知os,關閉子進程,異步非阻塞
    p.is_alive() # 查看子進程是否還活着
    p.join(timeout=10) # 阻塞 直到子進程結束  超時時間理解
    
# 守護進程
    # 守護進程是一個子進程
    # 守護進程會在主進程代碼結束以後才結束
    # 爲何會這樣?
        # 因爲主進程必需要回收全部的子進程的資源
        # 因此主進程必須在子進程結束以後才能結束
        # 而守護進程就是爲了守護主進程存在的
        # 不能守護到主進程結束,就只能退而求其次,守護到代碼結束了
    # 守護到主進程的代碼結束,意味着若是有其餘子進程沒有結束,守護進程沒法繼續守護
    # 解決方案 : 在主進程中加入對其餘子進程的join操做,來保證守護進程能夠守護全部主進程和子進程的執行
    # 如何設置守護進程
        # 子進程對象.daemon = True 這句話寫在start以前

#
    # 爲何要用鎖?
    # 因爲多個進程的併發,致使不少數據的操做都在同時進行
    # 因此就有可能產生多個進程同時操做 : 文件\數據庫 中的數據
    # 致使數據不安全
    # 因此給某一段修改數據的程序加上鎖,就能夠控制這段代碼永遠不會被多個進程同時執行
    # 保證了數據的安全
# Lock 鎖(互斥鎖)
# 鎖其實是把你的某一段程序變成同步的了,下降了程序運行的速度,爲了保證數據的安全性
# 沒有數據安全的效率都是耍流氓
進程 總結
# 對於鎖 保證一段代碼同一時刻只能有一個進程執行
# 對於信號量 保證一段代碼同一時刻只能有n個進程執行
# 流量控制

import time
import random
from multiprocessing import Process,Semaphore
def ktv(name,sem):
    sem.acquire() #拿鎖
    print("%s走進了ktv"%name)
    time.sleep(random.randint(5,10))
    print("%s走出了ktv" % name)
    sem.release() #還鎖

if __name__ == '__main__':
    sem = Semaphore(4) # 同時只能有4個進程執行
    for i in range(25):
        p = Process(target=ktv,args = ('name%s'%i,sem))
        p.start()

# 信號量原理 : 鎖 + 計數器實現的
# 普通的鎖 acquire 1次
# 信號量   acquire 屢次
# count計數
# count = 4
# acquire count -= 1
# 當count減到0的時候 就阻塞
# release count + = 1
# 只要count不爲0,你就能夠繼續acquire
信號量 Semaphore
# from multiprocessing import Event
# Event 事件類
# e = Event()
# e 事件對象
# 事件自己就帶着標識 : False
# wait 阻塞
# 它的阻塞條件是 對象標識爲False
# 結束阻塞條件是 對象標識爲True

# 對象的標識相關的 :
# set  將對象的標識設置爲True
# clear 將對象的標識設置爲False
# is_set 查看對象的標識是否爲True

import time
import random
from multiprocessing import Event,Process
def traffic_light(e):
    print('\033[1;31m紅燈亮\033[0m')
    while True:
        time.sleep(2)
        if e.is_set():   # 若是當前是綠燈
            print('\033[1;31m紅燈亮\033[0m') # 先打印紅燈亮
            e.clear()                        # 再把燈改爲紅色
        else :           # 當前是紅燈
            print('\033[1;32m綠燈亮\033[0m') # 先打印綠燈亮
            e.set()                          # 再把燈變綠色
#
def car(e,carname):
    if not e.is_set():  # False
        print('%s正在等待經過'%carname)
        e.wait()  #阻塞
    print('%s正在經過'%carname)

if __name__ == '__main__':
    e = Event()  #建立
    p = Process(target=traffic_light,args = (e,)) #建立進程
    p.start() #開始
    for i in range(100):  #100輛車
        time.sleep(random.randrange(0,3)) #隨機
        p = Process(target=car, args=(e,'car%s'%i))
        p.start()

# 太複雜了
# 在咱們進行併發操做的時候不多用到這麼複雜的場景

# Event事件
# 放到進程中的代碼必定不止一段
# 這兩個操做之間 存在同步關係
# 一個操做去確認另外一個操做的執行條件是否完成

# 標識 控制wait是否阻塞的關鍵
# 如何修改這個標識 : clear set
# 如何查看這個標識 : is_set
事件Event
# 管道數據不安全 管道加鎖就是隊列 
from multiprocessing import Pipe,Process

def f(conn):  #接收子conn
    conn.send('hello world') #發消息
    conn.close()

if __name__ == '__main__':
    parent_conn,child_conn = Pipe()
    p = Process(target=f,args=(child_conn,)) #傳子conn
    p.start()
    print(parent_conn.recv()) #父conn接收
    p.join()
Pipe 管道 IPC數據間通訊 是隊列的底層
# 進程之間 數據隔離
# 憑什麼判斷 子進程是否執行完畢了????
# lock對象
# a進程 acquire了 b進程在acquire的地方一直阻塞直到a release
# 你在b進程 如何知道a進程release了?
# 你以前學習的lock semaphore event實際上都用到了進程之間的通訊
# 只不過這些通訊都是很是簡單而固定的信號
# 在你使用這些工具的過程當中並感知不到

# 對於用戶來說 : 就但願可以去進行一些更加複雜的 不固定的內容的交互
# 這種狀況下使用lock semaphore event就不可行了

# 進程間通訊 IPC
# IPC Inter-Process Communication
# 實現進程之間通訊的兩種機制:
    # 管道 Pipe  數據不安全
    # 隊列 Queue  管道+鎖

# from multiprocessing import Queue,Process
#
# def consumer(q):
#     print(
#        '子進程 :', q.get()
#     )
#
#
# if __name__ == '__main__':
#     q = Queue()
#     p = Process(target=consumer,args=(q,))
#     p.start()
#     q.put('hello,world')

# 生產者消費者模型
import time
from multiprocessing import Queue,Process

def producer(name,food,num,q):
    '''生產者'''
    for i in range(num):
        time.sleep(0.3)
        foodi = food + str(i)
        print('%s生產了%s'%(name,foodi))
        q.put(foodi)

def consumer(name,q):
    while True:
        food = q.get()   # 等待接收數據
        if food == None:break
        print('%s吃了%s'%(name,food))
        time.sleep(1)

if __name__ == '__main__':
    q = Queue(maxsize=10)
    p1 = Process(target=producer,args = ('寶元','泔水',20,q))
    p2 = Process(target=producer,args = ('戰山','魚刺',10,q))
    c1 = Process(target=consumer, args=('alex', q))
    c2 = Process(target=consumer, args=('wusir', q))
    p1.start()   # 開始生產
    p2.start()   # 開始生產
    c1.start()
    c2.start()
    p1.join()    # 生產者結束生產了
    p2.join()    # 生產者結束生產了
    q.put(None)  # put None 操做永遠放在全部的生產者結束生產以後
    q.put(None)  # 有幾個消費者 就put多少個None

# 爲何隊列爲空 爲滿 這件事情不夠準確
# q.qsize()  隊列的大小
# q.full()   是否滿了 滿返回True
# q.empty()  是否空了 空返回True
進程間通訊 IPC
import  time
from multiprocessing import JoinableQueue,Process

def consumer(name,q):
    while True:
        food = q.get()
        time.sleep(1)
        print('%s消費了%s'%(name,food))
        q.task_done()

def producer(name,food,num,q):
    '''生產者'''
    for i in range(num):
        time.sleep(0.3)
        foodi = food + str(i)
        print('%s生產了%s'%(name,foodi))
        q.put(foodi)
    q.join()   # 消費者消費完畢以後會結束阻塞
if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer, args=('寶元', '泔水', 20, q))
    c1 = Process(target=consumer, args=('alex', q))
    c2 = Process(target=consumer, args=('wusir', q))
    c1.daemon = True
    c2.daemon = True
    p1.start()
    c1.start()
    c2.start()
    p1.join()

# 消費者每消費一個數據會給隊列發送一條信息
# 當每個數據都被消費掉以後 joinablequeue的join阻塞行爲就會結束
# 以上就是爲何咱們要在生產完全部數據的時候發起一個q.join()

# 隨着生產者子進程的執行完畢,說明消費者的數據都消費完畢了
# 這個時候主進程中的p1.join結束
# 主進程的代碼結束
# 守護進程也結束了
JoinableQueue 消費者模型
from multiprocessing import Manager,Process,Lock
def work(d,lock):
    # with lock: #不加鎖而操做共享的數據,確定會出現數據錯亂
    #     d['count']-=1
    ''' 等價於下面的代碼 '''
    lock.acquire()
    d['count'] -= 1
    lock.release()

if __name__ == '__main__':
    lock=Lock()
    m = Manager()
    dic=m.dict({'count':100})
    p_l=[]
    for i in range(100):
        p=Process(target=work,args=(dic,lock))
        p_l.append(p)
        p.start()
    for p in p_l:
        p.join()
    print(dic)

# Manager是一個類 內部有一些數據類型可以實現進程之間的數據共享
# dict list這樣的數據 內部的數字進行自加 自減 是會引發數據不安全的,這種狀況下 須要咱們手動加鎖完成
# 所以 咱們通常狀況下 不適用這種方式來進行進程之間的通訊
# 咱們寧肯使用Queue隊列或者其餘消息中間件 來實現消息的傳遞 保證數據的安全
進程之間的數據共享 Manager類
import time
from multiprocessing import Process

def func(i):
    i -= 1

if __name__ == '__main__':
    start = time.time()  #計時開始
    l = []
    for i in range(100):
        p = Process(target=func,args=(i,))
        p.start()
        l.append(p)
    for p in l:
        p.join()
    print(time.time() - start) #計時結束
100個進程 計算時間
計算時間差
import time
from multiprocessing import Pool  #
def func(i):
    i -= 1

if __name__ == '__main__':
    start = time.time()
    p = Pool(5)  #池  你的池中打算放多少個進程,個數cpu的個數 * 1/2
    p.map(func,range(100))  # 自動帶join
    print(time.time()-start)
Pool 進程池
from multiprocessing import Pool
def f(i):
    i -= 1
    return i**2

if __name__ == '__main__':
    p = Pool(5)  #池  你的池中打算放多少個進程,個數cpu的個數 * 1/2
    ret = p.map(f,range(100))  # 自動帶join
    print(ret)
Pool 獲取程序執行的返回值
import time
from multiprocessing import Pool  #
def func(i):
    i -= 1
    time.sleep(0.5)
    return i**2

if __name__ == '__main__':
    p = Pool(5) #你的池中打算放多少個進程,個數cpu的個數 * 1|2
    for i in range(100):
        ret = p.apply(func,args=(i,))  # 自動帶join 串行/同步 apply就是同步提交任務
        print(ret)
apply 同步方式向進程池提交任務
import time
from multiprocessing import Pool  #
def func(i):
    i -= 1
    time.sleep(0.3)
    # print(i)
    return i**2
if __name__ == '__main__':
    p = Pool(5)
    lst = []
    for i in range(100):
        ret = p.apply_async(func,args=(i,))  # 自動帶join 異步的 apply_async異步提交任務
        lst.append(ret)
    # p.close()   # 關閉進程池的任務提交 今後以後不能再向p這個池提交新的任務
    # p.join()    # 阻塞 一直到全部的任務都執行完
    # print('結束')
    for i in lst :
        print(i.get())
apply_async 異步方式向進程池提交任務
什麼是進程池? 有限的進程的池子
爲何要用進程池?
    任務不少 cpu個數*5個任務以上
    爲了節省建立和銷燬進程的時間 和 操做系統的資源
通常進程池中進程的個數:
    cpu的1-2倍
    若是是高計算,徹底沒有io,那麼就用cpu的個數
    隨着IO操做越多,可能池中的進程個數也能夠相應增長
向進程池中提交任務的三種方式
    map    異步提交任務 簡便算法 接收的參數必須是 子進程要執行的func,可迭代的(可迭代中的每一項都會做爲參數被傳遞給子進程)
        可以傳遞的參數是有限的,因此比起apply_async限制性比較強
    apply  同步提交任務(你刪了吧)
    apply_async 異步提交任務
        可以傳遞比map更豐富的參數,可是比較麻煩
        首先 apply_async提交的任務和主進程徹底異步
        能夠經過先close進程池,再join進程池的方式,強制主進程等待進程池中任務的完成
        也能夠經過get獲取返回值的方式,來等待任務的返回值
            咱們不能在apply_async提交任務以後直接get獲取返回值
               for i in range(100):
                    ret = p.apply_async(func,args=(i,))  # 自動帶join 異步的 apply_async異步提交任務
                    l.append(ret)
                for ret in l:
                    print(ret.get())
進程池總結
回調函數
import os
import time
import random
from multiprocessing import Pool  #
def func(i):     # [2,1,1.5,0,0.2]
    i -= 1
    time.sleep(random.uniform(0,2))
    return i**2

def back_func(args):
    print(args,os.getpid())

if __name__ == '__main__':
    print(os.getpid())
    p = Pool(5)
    l = []
    for i in range(100):
        ret = p.apply_async(func,args=(i,),callback=back_func)  # 5個任務
    p.close()
    p.join()
callback回調函數
主動執行func,而後在func執行完畢以後的返回值,直接傳遞給back_func做爲參數,調用back_func
處理池中任務的返回值
回調函數是由誰執行的? 主進程
回調函數 進程池 回調函數沒有返回值
import re
import json
from urllib.request import urlopen  #請求頁面包
from multiprocessing import Pool

def get_page(i):  #頁面
    ret = urlopen('https://movie.douban.com/top250?start=%s&filter='%i).read()
    ret = ret.decode('utf-8')
    return ret

def parser_page(s): #頁面數據分析
    com = re.compile(
        '<div class="item">.*?<div class="pic">.*?<em .*?>(?P<id>\d+).*?<span class="title">(?P<title>.*?)</span>'
        '.*?<span class="rating_num" .*?>(?P<rating_num>.*?)</span>.*?<span>(?P<comment_num>.*?)評價</span>', re.S)

    ret = com.finditer(s)
    with open('file','a',encoding='utf-8') as f:
        for i in ret:
            dic = {
                "id": i.group("id"),
                "title": i.group("title"),
                "rating_num": i.group("rating_num"),
                "comment_num": i.group("comment_num"),
            }
            f.write(json.dumps(dic,ensure_ascii=False)+'\n')

if __name__ == '__main__':
    p = Pool(5)
    count = 0
    for i in range(10):
        p.apply_async(get_page,args=(count,),callback=parser_page)
        count += 25
    p.close()
    p.join()
進程池 爬蟲 豆瓣 例子 處理中文
import json
with open('file2','w',encoding='utf-8') as f:
    json.dump({'你好':'alex'},f,ensure_ascii=False)
ensure_ascii=False

線程 - python

線程
    輕量級 進程  解決併發 總體效率高於進程
    在進程中數據共享  資源共享
    是進程的一部分,不能獨立存在的 
    被CPU調度的最小單位
使用場景 socketserver web的框架 django flask tornado 多線程來接收用戶併發的請求
python裏  同一個進程中的多個線程能不能同時使用多個cpu

在整個程序界:
    若是你的程序須要數據隔離 : 多進程
    若是你的程序對併發的要求很是高 : 多線程

python
初期 面向單核的 一個cpu
做爲一門腳本語言  解釋型語言

線程鎖這件事兒是由Cpython解釋器完成
對於python來講 同一時刻只能有一個線程被cpu訪問
完全的解決了多核環境下的安全問題
線程鎖 : 全局解釋器鎖 GIL
    1.這個鎖是鎖線程的
    2.這個鎖是解釋器提供的

多線程仍然有它的優點
    你的程序中用到cpu真的多麼
    若是100% 90%的時間都消耗在計算上,那麼cpython解釋器下的多線程對你來講確實沒用
    可是你寫的大部分程序 的時間實際上都消耗在了IO操做上
遇到高計算型
    開進程 4個進程  每一個進程裏開n個線程
    換個解釋器
線程概念
import os
import time
from threading import Thread
def func():
    print('start',os.getpid())
    time.sleep(1)
    print('end')

if __name__ == '__main__':
    t = Thread(target=func)
    t.start()
    for i in range(5):
        print('主線程',os.getpid())
        time.sleep(0.3)
Thread 線程類 初識
import time
from threading import Thread
def func():
    n = 1 + 2 + 3
    n ** 2

if __name__ == '__main__':
    start = time.time()
    lst = []
    for i in range(100):
        t = Thread(target=func)
        t.start()
        lst.append(t)
    for t in lst:
        t.join()
    print(time.time() - start)
#

import time
from multiprocessing import Process as Thread
def func():
    n = 1 + 2 + 3
    n**2

if __name__ == '__main__':
    start = time.time()
    lst = []
    for i in range(100):
        t = Thread(target=func)
        t.start()
        lst.append(t)
    for t in lst:
        t.join()
    print(time.time() - start)
進程與線程的效率對比
from threading import Thread
n = 100
def func():
    global n
    n -= 1

t = Thread(target=func)
t.start()
t.join()
print(n)
線程的數據共享
from threading import Thread
class Mythread(Thread):
    def __init__(self,arg):
        super().__init__()
        self.arg = arg
    def run(self):
        print('in son',self.arg)

t = Mythread(123)
t.start()
開啓線程的另外一種方式 傳參數
import time
from threading import Thread,currentThread,activeCount,enumerate
class Mythread(Thread):
    def __init__(self,arg):
        super().__init__()
        self.arg = arg
    def run(self):
        time.sleep(1)
        print('in son',self.arg,currentThread())
# t = Mythread(123)
# t.start()
# print('主',currentThread()) #當前線程
#
for i in range(10):
    t = Mythread(123)
    t.start()
    # print(t.ident) #當前線程id
print(activeCount())  #幾個活躍的線程  11
print(enumerate()) # 一共幾個線程 []
threading模塊中的其餘功能
- server.py - 
import socket
from threading import Thread

def talk(conn):
    while True:
        msg = conn.recv(1024).decode()  #解碼
        conn.send(msg.upper().encode())  #編碼

sk = socket.socket() #建立
sk.bind(('127.0.0.1',9000)) #bind
sk.listen() #監聽
while True:
    conn,addr = sk.accept() #接收
    Thread(target=talk,args = (conn,)).start() #建立線程並開啓

- client.py -
import socket
sk = socket.socket() #建立

sk.connect(('127.0.0.1',9000)) #鏈接
while True:
    sk.send(b'hello') #
    print(sk.recv(1024)) #
多線程實現socketserver
import time
from threading import Thread

def func():
    while True:
        print('in func')
        time.sleep(0.5)

def func2():
    print('start func2')
    time.sleep(10)
    print('end func2')

Thread(target=func2).start()
t = Thread(target=func)
t.setDaemon(True)  #線程 守護操做
t.start()
print('主線程')
time.sleep(2)
print('主線程結束')

# 守護進程 只守護主進程的代碼,主進程代碼結束了就結束守護,守護進程在主進程以前結束
# 守護線程 隨着主線程的結束才結束,守護線程是怎麼結束的  直到主子線程都結束 進程結束

# 進程 terminate 強制結束一個進程的
# 線程 沒有強制結束的方法
# 線程結束 : 線程內部的代碼執行完畢 那麼就自動結束了
守護線程 線程結束問題
import time
from threading import Thread,currentThread
def func():
    print(currentThread())

print('開始')
for i in range(10):
    Thread(target=func).start()
    # time.sleep(2)
print('主線程')
currentThread
鎖 用來保證數據安全
有了GIL仍是會出現數據不安全的現象,因此仍是要用鎖 
import time
from threading import Thread,Lock
n = 100
def func(lock): #
    global n  #用 全局的n
    # n -= 1
    with lock:
        tmp = n-1  # n-=1
        # time.sleep(0.1)
        n = tmp

if __name__ == '__main__':
    l = []
    lock = Lock() #實例化
    for i in range(100):
        t = Thread(target=func,args=(lock,)) #建立
        t.start() #開啓
        l.append(t)
    for t in l:
        t.join()
    print(n)
鎖 from threading import Thread,Lock
import dis
n = 1  #全局空間的 += -= 操做 都不是數據安全的
def func():
    n = 100  #局部空間內 永遠安全
    n -= 1

dis.dis(func)

# 會出現線程不安全的兩個條件
# 1.是全局變量
# 2.出現 += -=這樣的操做

#下面是解析
LOAD  僅有這個 數據安全
STORE 有load store 就會不安全

# 列表 字典
# 方法 l.append l.pop l.insert dic.update 都是線程安全的
# l[0] += 1  不安全
# d[k] += 1  不安全
dis 模塊 cpu計算底層步驟
# 科學家吃麪問題
import time
from threading import Thread,Lock
# noodle_lock = Lock()
# fork_lock = Lock()
# 死鎖不是時刻發生的,有偶然的狀況整個程序都崩了
# 每個線程之中不止一把鎖,而且套着使用
# 若是某一件事情須要兩個資源同時出現,那麼不該該將這兩個資源經過兩把鎖控制
# 而應看作一個資源
#
# def eat1(name):
#     noodle_lock.acquire()
#     print('%s拿到麪條了'%name)
#     fork_lock.acquire()
#     print('%s拿到叉子了'%name)
#     print('%s開始吃麪'%name)
#     time.sleep(0.2)
#     fork_lock.release()
#     print('%s放下叉子了' % name)
#     noodle_lock.release()
#     print('%s放下面了' % name)
#
# def eat2(name):
#     fork_lock.acquire()
#     print('%s拿到叉子了' % name)
#     noodle_lock.acquire()
#     print('%s拿到麪條了' % name)
#     print('%s開始吃麪' % name)
#     time.sleep(0.2)
#     noodle_lock.release()
#     print('%s放下面了' % name)
#     fork_lock.release()
#     print('%s放下叉子了' % name)
#
# Thread(target=eat1,args=('wei',)).start()
# Thread(target=eat2,args=('hao',)).start()
# Thread(target=eat1,args=('太',)).start()
# Thread(target=eat2,args=('寶',)).start()

lock = Lock()
def eat1(name):
    lock.acquire()
    print('%s拿到麪條了'%name)
    print('%s拿到叉子了'%name)
    print('%s開始吃麪'%name)
    time.sleep(0.2)
    lock.release()
    print('%s放下叉子了' % name)
    print('%s放下面了' % name)

def eat2(name):
    lock.acquire()
    print('%s拿到叉子了' % name)
    print('%s拿到麪條了' % name)
    print('%s開始吃麪' % name)
    time.sleep(0.2)
    lock.release()
    print('%s放下面了' % name)
    print('%s放下叉子了' % name)

Thread(target=eat1,args=('alex',)).start()
Thread(target=eat2,args=('wusir',)).start()
Thread(target=eat1,args=('太白',)).start()
Thread(target=eat2,args=('寶元',)).start()

# 先臨時解決 fork_lock=noodle_lock = Lock()
# 而後再找到死鎖的緣由,再去修改  終極辦法一把鎖
科學家吃麪問題 死鎖現象
from threading import RLock,Lock,Thread

# 互斥鎖
#     不管在相同的線程仍是不一樣的線程,都只能連續acquire一次
#     要想再acquire,必須先release
# 遞歸鎖
#     在同一個線程中,能夠無限次的acquire
#     可是要想在其餘線程中也acquire,
#     必須如今本身的線程中添加和acquire次數相同的release
rlock = RLock()  #每一次acquire都像進去一道門
rlock.acquire()
rlock.acquire()
rlock.acquire()
rlock.acquire()  #直到全都release 才能下我的進門
print('鎖不住')

lock = Lock() #普通鎖/互斥鎖
lock.acquire()
print('1')  #到這裏 hang住了 
lock.acquire()
print('2')
遞歸鎖 RLock 互斥鎖Lock
from threading import RLock
rlock = RLock()
def func(num):
    rlock.acquire() #
    print('aaaa',num)
    rlock.acquire()
    print('bbbb',num)
    rlock.release() #必須  還鎖
    rlock.release() #必須

Thread(target=func,args=(1,)).start()
Thread(target=func,args=(2,)).start()
# aaaa 1
# bbbb 1
# aaaa 2
# bbbb 2
遞歸鎖 1
import time
from threading import RLock,Lock,Thread
noodle_lock = fork_lock = RLock()
def eat1(name):
    noodle_lock.acquire()
    print('%s拿到麪條了'%name)
    fork_lock.acquire()
    print('%s拿到叉子了'%name)
    print('%s開始吃麪'%name)
    time.sleep(0.2)
    fork_lock.release()
    print('%s放下叉子了' % name)
    noodle_lock.release()
    print('%s放下面了' % name)

def eat2(name):
    fork_lock.acquire()
    print('%s拿到叉子了' % name)
    noodle_lock.acquire()
    print('%s拿到麪條了' % name)
    print('%s開始吃麪' % name)
    time.sleep(0.2)
    noodle_lock.release()
    print('%s放下面了' % name)
    fork_lock.release()
    print('%s放下叉子了' % name)

Thread(target=eat1,args=('alex',)).start()
Thread(target=eat2,args=('wusir',)).start()
Thread(target=eat1,args=('太白',)).start()
Thread(target=eat2,args=('寶元',)).start()
遞歸鎖 科學家吃麪 會多用佔用資源   建議互斥鎖
import time
from threading import Semaphore,Thread

def func(name,sem):
    sem.acquire()
    print(name,'start')
    time.sleep(1)
    print(name,'stop')
    sem.release()

sem = Semaphore(5)
for i in range(20):
    Thread(target=func,args=(i,sem)).start()
信號量 線程 不太好 比池
from threading import Event
# 事件
# wait() 阻塞 到事件內部標識爲True就中止阻塞
# 控制標識
    # set
    # clear
    # is_set

# 鏈接數據庫
import time
import random
from threading import Thread,Event
def connect_sql(e):
    count = 0
    while count < 3:
        e.wait(0.5)
        if e.is_set():
            print('鏈接數據庫成功')
            break
        else:
            print('數據庫未鏈接成功')
            count += 1

def test(e):
    time.sleep(random.randint(0,3))
    e.set()

e = Event()
Thread(target=test,args=(e,)).start() #測試
Thread(target=connect_sql,args=(e,)).start() #鏈接
事件Event 2個事情,一個事情要確認另外一個事情才能開始作
from threading import Timer

def func():
    print('執行我啦')

t = Timer(3,func)
# 如今這個時間點我不想讓它執行,而是預估一下大概多久以後它執行比較合適
t.start()
print('主線程的邏輯')
t.join()
print('ok ')
定時器
# wait      阻塞
# notify(n) 給信號

# 假如如今有20個線程
# 全部的線程都在wait這裏阻塞
# notify(n) n傳了多少
# 那麼wait這邊就能得到多少個解除阻塞的通知

# notifyall
# acquire
# release

import threading

def run(n):
    con.acquire()
    con.wait()
    print("run the thread: %s" % n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition() #條件
    for i in range(10): #10個線程
        t = threading.Thread(target=run, args=(i,))
        t.start() #開啓

    while True:
        inp = input('>>>')
        if inp == 'q':
            break
        con.acquire()
        con.notify(int(inp))

        con.release()
        print('****')

# 設置某個條件
# 若是知足這個條件 就能夠釋放線程
# 監控測試個人網速
# 20000個任務
# 測試個人網速 /系統資源
# 發現系統資源有空閒,我就放行一部分任務
條件 condition
import queue
#  線程隊列 線程之間數據安全
q = queue.Queue()
# # 普通隊列
# q.put(1)
# # print(q.get())
# try:
#     q.put_nowait(2)
# except queue.Full:
#     print('您丟失了一個數據2')
# print(q.get_nowait()) # 若是有數據我就取,若是沒數據不阻塞而是報錯


# 非阻塞的狀況下
q.put(10)
print(q.get(timeout=2))
#
# # 算法裏 棧
# lfq = queue.LifoQueue()   # 棧
# lfq.put(1)
# lfq.put(2)
# lfq.put(3)
# print(lfq.get())
# print(lfq.get())
# print(lfq.get())
#
# # 優先級隊列,是根據第一個值的大小來排定優先級的
# # ascii碼越小,優先級越高
# q = queue.PriorityQueue()
# q.put((2,'a'))
# q.put((1,'c'))
# q.put((1,'b'))
#
# print(q.get())

# 線程+隊列 實現生產者消費者模型
線程隊列
concurrent.futures模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor: 進程池,提供異步調用

import time
import random
from threading import currentThread
from concurrent.futures import ThreadPoolExecutor  #線程池
# from concurrent.futures import ProcessPoolExecutor as Pool #進程池

def func(num):
    print('in %s func'%num,currentThread())
    time.sleep(random.random())
    return num**2

tp = ThreadPoolExecutor(5)  #5個線程
ret_l = []
for i in range(30):
    ret = tp.submit(func,i) #提交
    ret_l.append(ret)
for ret in ret_l:  #取值
    print(ret.result())
線程池1
import time
import random
from threading import currentThread
from concurrent.futures import ThreadPoolExecutor as Pool
import os
def func(num):
    # print('in %s func'%num,currentThread())
    print('in %s func'%num,os.getpid())
    time.sleep(random.random())
    return num**2
if __name__ == '__main__':

    # tp = ThreadPoolExecutor(5)
    tp = Pool(5)
    ret = tp.map(func,range(30))
    # print(list(ret))
    for i in ret:
        print(i)
線程池 map 簡單用法
# 回調函數 add_done_callback
import time
import random
from threading import currentThread
from concurrent.futures import ThreadPoolExecutor as Pool
def func1(num):
    print('in func1 ',num,currentThread())
    return num*'*'

def func2(ret):
    print('--->',ret.result(),currentThread())
tp = Pool(5)
print('主 : ',currentThread())
for i in range(10):
    tp.submit(func1,i).add_done_callback(func2)
# 回調函數收到的參數是須要使用result()獲取的
# 回調函數是由誰執行的? 主線程
線程 回調函數
import time
import random
from threading import currentThread
from concurrent.futures import ThreadPoolExecutor as Pool 
from urllib.request import urlopen
def func(name,url):
    content = urlopen(url).read()
    return name,content

def parserpage(ret):
    name,content = ret.result()
    with open(name,'wb') as f:
        f.write(content)

urls = {
    # 'baidu.html':'https://www.baidu.com',
    # 'python.html':'https://www.python.org',
    # 'openstack.html':'https://www.openstack.org',
    'github.html':'https://help.github.com/',
    'sina.html':'http://www.sina.com.cn/'
}

tp = Pool(2)
for k in urls:
    tp.submit(func,k,urls[k]).add_done_callback(parserpage)
多線程爬蟲
線程
鎖
    爲何有了GIL以後還須要鎖
        多個線程同時操做全局變量的時候
        當出現"非原子性操做",例如+= -= *= /=
    l.append(1) 原子性操做
    a += 1  a = a + 1
        tmp = a +1
        a = tmp
死鎖現象
    什麼是死鎖現象?
        兩個以上的線程爭搶同一把鎖,
        其中一個線程獲取到鎖以後不釋放
        另外的其餘線程就都被鎖住了
        比較容易出現問題的狀況 : 兩把鎖套在一塊兒用了
        死鎖現象的本質 :代碼邏輯問題
遞歸鎖
    一把鎖在同一個線程中acquire屢次而不被阻塞
    若是另外的線程想要使用,必須release相同的次數,
    才能釋放鎖給其餘線程
信號量
    控制幾個線程同一時刻只能有n個線程執行某一段代碼
    鎖 + 計數器
事件
    兩件事情
    一件事情要想執行依賴於另外一個任務的結果
條件
    n個線程在某處阻塞
    由另外一個線程控制這n個線程中有多少個線程能繼續執行
定時器
    規定某一個線程在開啓以後的n秒以後執行
隊列\棧\優先級隊列
    import queue
    線程之間數據安全
    多個線程get不可能同時取走一個數據,致使數據的重複獲取
    多個線程put也不可能同時存入一個數據,致使數據的丟失
    隊列 先進先出
    棧   先進後出
    優先級 優先級高的先出
線程池
    concurrent.futrues
    ThreadPoolExcuter
    ProcessPoolExcuter
submit 異步提交任務
shutdown 等待池內任務完成
result  獲取進程函數的返回值
map     異步提交任務的簡便用法
add_done_callback 回調函數
    進程 主進程執行
    線程
線程總結

協程 - linux

進程 計算機中最小的資源分配單位
線程 計算機中能被CPU調度的最小單位
線程是由操做系統建立的,開啓和銷燬仍然佔用一些時間
調度
    1.一條線程陷入阻塞以後,這一整條線程就不能再作其餘事情了
    2.開啓和銷燬多條線程以及cpu在多條線程之間切換仍然依賴操做系統
你瞭解協程 ?
    瞭解
    協程(纖程,輕型線程)
    對於操做系統來講協程是不可見的,不須要操做系統調度
    協程是程序級別的操做單位
協程效率高不高
    和操做系統自己沒有關係,和線程也沒有關係
    而是看程序的調度是否合理
協程指的只是在同一條線程上可以互相切換的多個任務
遇到io就切換其實是咱們利用協程提升線程工做效率的一種方式
協程 只和程序相關 代碼級別的
# 切換 + 狀態保存  yield
import time
def consumer(res):
    '''任務1:接收數據,處理數據'''
    pass

def producer():
    '''任務2:生產數據'''
    res=[]
    for i in range(100000):  #1億次
        res.append(i)
    return res
#
start=time.time()
res=producer()
consumer(res) # 寫成consumer(producer())會下降執行效率
stop=time.time()
print(stop-start)


import time
def consumer():
    while True:
        res = yield

def producer():
    g = consumer()
    next(g)
    for i in range(100000):
        g.send(i)
start =time.time()
producer()
print(time.time() - start)

# yield這種切換 就已經在一個線程中出現了多個任務,這多個任務以前的切換 本質上就是協程,consumer是一個協程,producer也是一個協程
# 單純的切換還會消耗時間
# 可是若是可以在阻塞的時候切換,而且多個程序的阻塞時間共享,協程可以很是大限度的提升效率
原生協程
協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的
# greenlet  協程模塊 在多個任務之間來回切換
# gevent 基於greenlet實現的,多個任務交給gevent管理,遇到IO就使用greenlet進行切換

import time
from greenlet import greenlet

def play():
    print('start play')
    g2.switch()  # 開關
    time.sleep(1)
    print('end play')
def sleep():
    print('start sleep')
    time.sleep(1)
    print('end sleep')
    g1.switch()
g1 = greenlet(play)
g2 = greenlet(sleep)
g1.switch()  # 開關
代碼 協程
import time
import gevent
def play():   # 協程1
    print(time.time())
    print('start play')
    gevent.sleep(1)
    print('end play')
def sleep():  # 協程2
    print('start sleep')
    print('end sleep')
    print(time.time())

g1 = gevent.spawn(play)
g2 = gevent.spawn(sleep)
# g1.join()
# g2.join()  # 精準的控制協程任務,必定是執行完畢以後join當即結束阻塞
gevent.joinall([g1,g2])
greelet 
from gevent import monkey;monkey.patch_all()
import time
import gevent
url_lst = ['https://www.python.org/','https://www.yahoo.com/','https://github.com/']

def get_page(url):
    ret = urlopen(url).read()
    return ret.decode('utf-8')
start = time.time()
g_l = []
for url in url_lst:
    g = gevent.spawn(get_page,url)
    g_l.append(g)
#
gevent.joinall(g_l)
print(time.time()-start)
協程 爬蟲
- server.py -
from gevent import monkey;monkey.patch_all()
import socket
import gevent

def talk(conn):
    while True:
        msg = conn.recv(1024).decode()
        conn.send(msg.upper().encode())

sk = socket.socket()
sk.bind(('127.0.0.1',9000))
sk.listen()

while True:
    conn,addr = sk.accept()
    gevent.spawn(talk,conn)

- client.py - 
import socket
import threading
def task():
    sk = socket.socket()
    sk.connect(('127.0.0.1',9000))
    while True:
        sk.send(b'hello')
        print(sk.recv(1024))

for i in range(500):
    threading.Thread(target=task).start()
協程 socket 併發 
協程
一條線程在多個任務之間相互切換
數據安全的
不能利用多核
可以規避一個線程上的IO阻塞

一條線程可以起500個協程
4c的機器
5個進程
每個進程20個線程
每個線程500個協程
5*20*500 = 50000
協程總結 一條線程上的 規避IO
相關文章
相關標籤/搜索