python 併發

併發概念

# 進程 : 進行中的程序就是一個進程
    佔用資源 須要操做系統調度
    pid : 可以惟一標識一個進程
    計算機中最小的資源分配單位
# 併發
    多個程序同時執行 : 只有一個cpu,多個程序輪流在一個cpu上執行
    宏觀上 : 多個程序在同時執行
    微觀上 : 多個程序輪流在一個cpu上執行 本質上仍是串行
# 並行
    多個程序同時執行,而且同時在多個cpu上執行
# 同步
    在作A事的時候發起B件事,必須等待B件事結束以後才能繼續作A事件
# 異步
    在作A事的時候發起B時間,不須要等待B事件結束就能夠繼續A事件
# 阻塞
    若是CPU不工做  input accept recv recvfrom sleep connect
# 非阻塞
    CPU在工做
# 線程
    線程是進程中的一個單位,不能脫離進程存在
    線程是計算機中可以被CPU調度的最小單位

進程線程協程的區別

 

進程
線程
    正常的開發語言 多線程能夠利用多核
    cpython解釋器下的多個線程不能利用多核 : 規避了全部io操做的單線程
協程
    是操做系統不可見的
    協程本質就是一條線程 多個任務在一條線程上來回切換
    利用協程這個概念實現的內容 : 來規避IO操做,就達到了咱們將一條線程中的io操做降到最低的目的
# 進程 數據隔離 數據不安全  操做系統級別  開銷很是大  能利用多核
# 線程 數據共享 數據不安全  操做系統級別  開銷小      不能利用多核   一些和文件操做相關的io只有操做系統能感知到
# 協程 數據共享 數據安全    用戶級別      更小        不能利用多核   協程的全部的切換都基於用戶,只有在用戶級別可以感知到的io纔會用協程模塊作切換來規避(socket,請求網頁的)

 

 

I/O操做

I/O操做 相對內存來講
輸入Input輸出Output
輸入是怎麼輸入 :鍵盤\input\read\recv
輸出是怎麼輸出 :顯示器 打印機 播放音樂\print\write\send
文件操做 :read write
網絡操做 :send recv recvfrom
函數     :print input
View Code

計算機的工做分爲兩個狀態:

計算機的工做分爲兩個狀態
    CPU工做   : 作計算(對內存中的數據進行操做)的時候工做
    CPU不工做 : IO操做的時候
CPU的工做效率 500000條指令/ms
View Code

多道操做系統

多道操做系統 :一個程序遇到IO就把CPU讓給別人
    順序的一個一個執行的思路變成
    共同存在在一臺計算機中,其中一個程序執行讓出cpu以後,另外一個程序能繼續使用cpu
    來提升cpu的利用率
    單純的切換會不會佔用時間 : 會
    可是多道操做系統的原理總體上仍是節省了時間,提升了CPU的利用率
    時空複用的概念
View Code

分時操做系統

單cpu 分時操做系統 : 把時間分紅很小很小的段,每個時間都是一個時間片,
每個程序輪流執行一個時間片的時間,本身的時間片到了就輪到下一個程序執行 -- 時間片的輪轉
    老教授 24h全是計算 沒有io
        先來先服務 FCFS
    研究生 5min全是計算 沒有io
        短做業優先
    研究生2 5min全是計算 沒有io
    沒有提升CPU的利用率 \ 提升了用戶體驗
View Code

 

 

進程

串行: 全部的進程由cpu一個一個的解決.html

併發:單個cpu,同時執行多個進程(來回切換的),看起來像是同時運行.python

並行:多個cpu,真正的同時運行多個進程.linux

阻塞:遇到IO才叫阻塞.git

一個cpu運行兩個進程,其中一個進程徹底沒有阻塞,github

非阻塞: 沒有IO.web

進程的建立:

什麼是開啓多個進程:socket:server,client兩個進程編程

python中,若是一次想開啓多個進程,必須是一個主進程,開啓多個子進程json

linux,windows:有主進程開啓子進程windows

相同點:主進程開啓子進程,兩個進程都有相互隔離的獨立空間,互不影響安全

不一樣點:

  linux:子進程空間的初始數據徹底是從主(父)進程copy一份

  windows:子進程空間初始數據徹底是從主(父)進程copy一份,可是有所不一樣

進程的三狀態圖

就緒 運行 阻塞

# 就緒 -操做系統調度->運行 -遇到io操做-> 阻塞 -阻塞狀態結束-> 就緒
           -時間片到了-> 就緒

 

multiprocess模塊多進程

multiple 多元化的

processing 進程

multiprocessing 多元的處理進程的模塊

 

python中的多線程沒法利用多核優點,若是想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分狀況須要使用多進程。Python提供了multiprocessing。
    multiprocessing模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊threading的編程接口相似。

  multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。

    須要再次強調的一點是:與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。

 

 

Process類

建立進程的類

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)

強調:
1. 須要使用關鍵字的方式來指定參數
2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號

 

參數介紹:

group參數未使用,值始終爲None
 
target表示調用對象,即子進程要執行的任務

args表示調用對象的位置參數元組,args=(1,2,'egon',)

kwargs表示調用對象的字典,kwargs{'name':'egon','age':18}

name爲子進程的名稱

 

 進程方法

p.start():啓動進程,並調用該子進程中的p.run() 

p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法 p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
p.is_alive():若是p仍然運行,返回True p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程

 

屬性介紹:

p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置
 
p.name:進程的名稱

p.pid:進程的pid

p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)

p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)

 

注意:在windows中Process()必須放到# if __name__ == '__main__':下

建立開啓子進程的兩種方式

函數方法

from multiprocessing import Process
import time
def task(name):
    print('%s is runing' %(name))
    time.sleep(3)
    print('%s is done' % (name))


if __name__ == '__main__':
    p = Process(target=task,args=('壯壯',))
    # p = Process(target=task,kwargs={'name':'壯壯'})  兩種傳參方式
    p.start()
    print('====主')

類方法

from multiprocessing import Process
import time
# 方式二:

class MyProcess(Process):
    def __init__(self,name):
        
        self.name = name
        super().__init__()
        
    def run(self):  # 必須定義一個run方法
        print('%s is runing' % (self.name))
        time.sleep(3)
        print('%s is done' % (self.name))
    
    
if __name__ == '__main__':
    p = MyProcess('小明')
    p.start()
    print('===主')

 

多進程之間的數據隔離

多進程之間的數據隔離
from multiprocessing import Process
n = 0
def func():
    global n
    n += 1

if __name__ == '__main__':
    p_l = []
    for i in range(100):
        p = Process(target=func)
        p.start()
        p_l.append(p)
    for p in p_l:p.join()
    print(n)

 

join方法:

join 主進程等待子進程結束以後,在執行

join開啓一個進程:

from multiprocessing import Process
import time

def task(name):
    time.sleep(1)
    print(f"{name}is running")

if __name__ == '__main__':
     p = Process(target=task,args=("海洋",))
     p.start()
     p.join()           #告知主進程,p進程結束以後,主進程在結束,join有些阻塞的意思
     print("___主進程")

#      p1.start()
#      p2.start()       #p1,p2,p3三個子進程前後運行順序不定,start只是通知一下操做系統
#      p3.start()       #操做系統調用cpu先運行誰,誰先執行

join串行:

from multiprocessing import Process
import time

def task(name,sec):
    time.sleep(sec)
    print(f"{name}is running")

if __name__ == '__main__':
     p1 = Process(target=task, args=("小明",1))
     p2 = Process(target=task, args=("明明",2))
     p3 = Process(target=task ,args=("大明",3))
     start_time = time.time()

     p1.start()
     p1.join()
     p2.start()
     p2.join()
     p3.start()
     p3.join()

     print(f"主進程{time.time() - start_time}")

join併發:

from multiprocessing import Process
import time

def task(sec):
    time.sleep(sec)
    print(f"is running")

if __name__ == '__main__':
     start_time = time.time()
     list = []

     for i in range(1,4):
          p = Process(target=task, args=(i,))
          p.start()
          list.append(p)

     for i in list:
          i.join()

     print(f"主進程{time.time() - start_time}")

進程對象的其餘屬性:

屬性:

from multiprocessing import Process
import time

def task(name):
    print(f"{name}is running")
    time.sleep(3)
    print(f"{name}is done")

if __name__ == '__main__':
     p = Process(target=task,args=("小明",),name="大明")  #name給進程對象設置name屬性
     p.start()
     # print(p.pid)         #獲取到進程號

     time.sleep(1)          #睡一秒,子進程已經執行完成
     p.terminate()          #強制結束子進程,強制執行也會有執行時間
                            #terminate跟start同樣工做原理,都要通知操做系統開啓子進程
                            #內存終止或者開啓都要須要時間的

     time.sleep(1)          #睡一秒,讓terminate殺死
     print(p.is_alive())    #判斷子進程是否存活,只是查看內存中p子進程是否還運行
     print("主進程")

socket server的例子

server

import socket
from multiprocessing import Process
def talk(conn):
    while True:
        msg = conn.recv(1024).decode('utf-8')
        ret = msg.upper().encode('utf-8')
        conn.send(ret)
    conn.close()

if __name__ == '__main__':
    sk = socket.socket()
    sk.bind(('127.0.0.1',9001))
    sk.listen()
    while True:
        conn, addr = sk.accept()
        Process(target = talk,args=(conn,)).start()
    sk.close()
View Code

client

import time
import socket

sk = socket.socket()
sk.connect(('127.0.0.1',9001))

while True:
    sk.send(b'hello')
    msg =sk.recv(1024).decode('utf-8')
    print(msg)
    time.sleep(0.5)

sk.close()
View Code

 

 

殭屍進程:

init是全部進程的父進程:

殭屍進程,殭屍是什麼,死而沒有消失

主進程建立多個短暫週期的子進程,當子進程退出,是須要等待父進程處理,而父進程沒有及時對子進程回收,那麼子進程的進程符仍然保存在系統中,這種進程就是僵死進程

什麼進程描述符:每個進程都有描述符,io請求,數據指針

from multiprocessing import Process
import time
import os

def task(name):
    print(f"{name}is running")
    print(f"子進程開始了:{os.getpid()}")
    time.sleep(50)


if __name__ == '__main__':
    for i in range(100):
        p = Process(target=task, args=("海洋",))
        p.start()
        print(f"___主進程:{os.getpid()}")
View Code

孤兒進程:

孤兒進程:孤兒進程是由於主進程的退出,他下面的全部子進程都變成孤兒進程了,init會對孤兒進行回收,釋        放掉佔用系統的資源,這種回收也是爲了節省內存。

孤兒進程無害,若是殭屍進程掛了,init會對孤兒進程回收,init是全部進程的祖進程,linux中爲1,0系統
View Code

 

守護進程:

將一個子進程設置成守護進程,當父進程結束,子進程必定會結束,避免孤兒進程產生,應爲回收機制

父進程不能建立子進程

函數方法:

#守護進程會在主進程代碼執行結束後終止,守護進程內沒法在開啓子進程

from multiprocessing import Process
import time
import os

def task(name):
    print(f"{name}is running")
    print(f"子進程開始了:{os.getpid()}")
    time.sleep(50)

if __name__ == '__main__':
     p = Process(target=task,args=("海洋",))
     p.daemon = True  #將p子進程設置成守護進程,守護子進程,只要主進程結束
                      #子進程不管執行與否都立刻結束,daemon,開啓在start上面
     p.start()
     print(f"___主進程:{os.getpid()}")

類方法:

from multiprocessing import Process
開啓進程的另外一種方式
class 類名(Process):
    def __init__(self,參數):
        self.屬性名 = 參數
        super().__init__()
    def run(self):
        print('子進程要執行的代碼')
p = 類名()
p.start()

# 守護進程 : 會等待主進程代碼結束以後就當即結束
p = 類名()
p.daemon = True   # 設置守護進程
p.start()
# 通常狀況下,多個進程的執行順序,多是:
    # 主進程代碼結束--> 守護進程結束-->子進程結束-->主進程結束
    # 子進程結束 -->主進程代碼結束-->守護進程結束-->主進程結束
View Code

進程之間通訊(IPC):(重點)

第一種:基於文件+鎖的形式:效率低,麻煩
第二種:基於隊列,推薦的使用形式
第三種:基於管道,管道本身加鎖,底層可能會出現數據丟失損壞,隊列和管道都是將數據存放於內存中

 

進程鎖(互斥鎖)

互斥鎖保證了每次只有一個線程進行寫入操做,只有當這個線程解鎖,在運行其餘資源,上鎖和解鎖都須要本身添加

 

兩種方式

from multiprocessing import Lock

第一種: lock
= Lock() lock.acquire() print(1) lock.release() 第二種 with lock: buy_ticket(i)

 

 

#上鎖:
#必定要是同一把鎖:只能按照這個規律,上鎖一次,解鎖一次

#互斥鎖與join區別:
#共同點:都是完成了進程之間的串行
#區別:join認爲控制進程的串行,互斥鎖是解決搶佔的資源,保證公平性

from multiprocessing import Process
from multiprocessing import Lock
import time
import os
import random

def task1(lock):
    print("test1")                     #驗證CPU遇到IO切換
    lock.acquire()
    print("task1 開始打印")
    time.sleep(random.randint(1,3))
    print("task1 打印完成")
    lock.release()

def task2(lock):
    print("test2")
    lock.acquire()                      #上鎖
    print("task2 開始打印")
    time.sleep(random.randint(1,3))#阻塞,cpu切換任務,別的任務都在鎖,回來繼續執行這個程序
    print("task2 打印完成")
    lock.release()                      #解鎖

def task3(lock):
    print("test2")
    lock.acquire()
    # lock.acquire()                    #死鎖錯誤示例
    print("task3 開始打印")
    time.sleep(random.randint(1,3))
    print("task3 打印完成")
    lock.release()

if __name__ == '__main__':
     lock = Lock()                              #一把鎖

     p1 = Process(target=task1,args=(lock,))    #三個進程哪一個先到先執行
     p2 = Process(target=task2,args=(lock,))
     p3 = Process(target=task3,args=(lock,))

     p1.start()
     p2.start()
     p3.start()

 

 

互斥鎖買票示例:

import json
import time
from multiprocessing import Process,Lock

def search(i):
    with open('ticket',encoding='utf-8') as f:
        ticket = json.load(f)
    print('%s :當前的餘票是%s張'%(i,ticket['count']))

def buy_ticket(i):
    with open('ticket',encoding='utf-8') as f:
        ticket = json.load(f)
    if ticket['count']>0:
        ticket['count'] -= 1
        print('%s買到票了'%i)
    time.sleep(0.1)
    with open('ticket', mode='w',encoding='utf-8') as f:
        json.dump(ticket,f)

def get_ticket(i,lock):
    search(i)
    with lock:   # 代替acquire和release 而且在此基礎上作一些異常處理,保證即使一個進程的代碼出錯退出了,也會歸還鑰匙
        buy_ticket(i)


if __name__ == '__main__':
    lock = Lock()     # 互斥鎖
    for i in range(10):
        Process(target=get_ticket,args=(i,lock)).start()
View Code

 

隊列:

1. 進程之間的通訊最好的方式是基於隊列

2. 隊列是實現進程之間通訊的工具,存在內存中的一個容器,最大的特色是符合先進先出的原則

from multiprocessing import Queue,Process
def pro(q):
    for i in range(10):
        print(q.get())
def son(q):
    for i in range(10):
        q.put('hello%s'%i)

if __name__ == '__main__':
    q = Queue()
    p = Process(target=son,args=(q,))
    p.start()
    p = Process(target=pro, args=(q,))
    p.start()

 

隊列模式:

  多個進程搶佔一個資源:串行,有序以及數據安全,買票

  多個進程實現併發的效果:生產者消費模型

import time
import random
from multiprocessing import Queue,Process

def consumer(q,name): # 消費者:一般取到數據以後還要進行某些操做
    while True:
        food = q.get()
        if food:
            print('%s吃了%s'%(name,food))
        else:break

def producer(q,name,food): # 生產者:一般在放數據以前須要先經過某些代碼來獲取數據
    for i in range(10):
        foodi = '%s%s'%(food,i)
        print('%s生產了%s'%(name,foodi))
        time.sleep(random.random())
        q.put(foodi)

if __name__ == '__main__':
    q = Queue()
    c1 = Process(target=consumer,args=(q,'alex'))
    c2 = Process(target=consumer,args=(q,'alex'))
    p1 = Process(target=producer,args=(q,'大壯','泔水'))
    p2 = Process(target=producer,args=(q,'b哥','香蕉'))
    c1.start()
    c2.start()
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    q.put(None)
    q.put(None)

生產者消費者網頁(爬取網頁) 

import requests
from multiprocessing import Process,Queue
url_dic = {
    'cnblogs':'https://www.cnblogs.com/Eva-J/articles/8253549.html',
    'douban':'https://www.douban.com/doulist/1596699/',
    'baidu':'https://www.baidu.com',
    'gitee':'https://gitee.com/old_boy_python_stack__22/teaching_plan/issues/IXSRZ',
}

def producer(name,url,q):
    ret = requests.get(url)
    q.put((name,ret.text))

def consumer(q):
    while True:
        tup = q.get()
        if tup is None:break
        with open('%s.html'%tup[0],encoding='utf-8',mode='w') as f:
            f.write(tup[1])

if __name__ == '__main__':
    q = Queue()
    pl = []
    for key in url_dic:
        p = Process(target=producer,args=(key,url_dic[key],q))
        p.start()
        pl.append(p)
    Process(target=consumer,args=(q,)).start()
    for p in pl:p.join()
    q.put(None)

    # join n 個進程   n個進程必須都執行完才繼續
    # for i in range(4):
    #     print(q.get())
View Code

數據共享Manager類

# Manager dict list 只要是共享的數據都存在數據不安全的現象
# 須要咱們本身加鎖來解決數據安全問題
from multiprocessing import Process,Manager,Lock

def change_dic(dic,lock):
    with lock:
        dic['count'] -= 1

if __name__ == '__main__':
    # m = Manager()
    with Manager() as m:
        lock = Lock()
        dic = m.dict({'count': 100})
        # dic = {'count': 100}
        p_l = []
        for i in  range(100):
            p = Process(target=change_dic,args=(dic,lock))
            p.start()
            p_l.append(p)
        for p in p_l : p.join()
        print(dic)
View Code

 

 

線程

進程:進程是分配資源的基本單位,內存中開闢空間,爲線程提供資源,一個程序能夠開啓多個進程

線程:CPU調度的最小單位,執行單位,線程也被稱做爲輕量級的進程,動態的

  • 主線程是進程空間存活在內存中的一個必要條件

開啓QQ:開啓一個進程,在內存中開闢空間加載數據,啓動一個線程執行代碼

線程依賴進程的一個進程能夠包含多個線程,可是必定有一個主線程,線程纔是CPU執行的最小單元

進程線程對比:

  • 1,開啓多進程開銷很是大,10-100倍,而開啓線程開銷很是小

  • 2.開啓多進程速度慢,開啓多線程速度快

  • 3.進程之間數據不共享,線程共享數據

全局解釋器鎖 GIL

在cpython解釋器下 :GIL鎖(全局解釋器鎖) 致使了同一個進程中的多個線程不能利用多核

多線程應用場景:

併發:一個CPU能夠來回切換(線程之間切換),多進程併發,多線程的併發

多進程併發:開啓多個進程,併發的執行

多線程併發:開啓線程,併發的執行

若是遇到併發:多線程居多

開啓線程的兩種方式:

  線程絕對要比進程開啓速度快

函數開啓:
#先打印小明,線程要比進程速度快,若是是進程先打印主線程
from threading import Thread

def task(name):
print(f'{name} is running')

if __name__ == '__main__':
t = Thread(target=task,args=("小明",))
t.start()
print("主線程")

#子進程睡眠3秒,先運行主進程
from threading import Thread
import time
x = 1000

def task():
time.sleep(3)
print('子線程....')

def main():
print('111')
print('222')
print('333')

if __name__ == '__main__':
t = Thread(target=task)
t.start()
main()

# 結果是111
222
333
子線程....

 

面向對象的方式起線程

from threading import Thread

class MyThread(Thread):
    def __init__(self,name):
        super().__init__()
        self.name = name

    def run(self):
        print(f'{self.name} is running')

if __name__ == '__main__':
    t = MyThread("小明")
    t.start()
    print("主線程")

線程之間數據共享

from threading import Thread
x = 1000
def task():
    global x
    x = 0

if __name__ == '__main__':
    t = Thread(target=task, )
    t.start()
    t.join()  # 告知主線程,等待子線程運行完畢在執行
    print(f'主線程:{x}')

線程的方法:

from threading import Thread
import threading
import time

def task(name):
    time.sleep(1)
    print(f'{name} is running')

if __name__ == '__main__':
    for i in range(5):
        t = Thread(target=task,args=("海洋",))
        t.start()              #線程對象的方法
    # print(t.is_alive())     #判斷線程是否存活

    #threading模塊的方法
    print(threading.current_thread().name)  #返回線程對象.name
    print(threading.enumerate())            #返回列表,返回的是全部線程對象
    print(threading.active_count())         #獲取活躍的線程數量(包括主線程)
    print("主線程")
View Code

 

守護線程

守護線程必須等待主線程結束才結束,主線程必須等待全部的非守護線程結束才能結束,由於主線程的結束意味着進程的結束,這就是一個守護機制

多線程是同一個空間,同一個進程,進程表明,空間,資源,靜態的:

import time
from threading import Thread

def son():

    while True:
        print('in son')
        time.sleep(1)

def son2():

    for i in range(3):
        print('in son2 ****')
        time.sleep(1)

# flag a   0s
t = Thread(target=son)
t.daemon = True
t.start()
Thread(target=son2).start()
# flag b
# 主線程會等待子線程結束以後才結束
# 主線程結束進程就會結束
# 守護線程隨着主線程的結束而結束
# 守護線程會在主線程的代碼結束以後繼續守護其餘子線程

 

守護進程和守護線程的結束原理不一樣

 

守護進程須要主進程來回收資源
守護線程是隨着進程的結束才結束的
其餘子線程-->主線程結束-->主進程結束-->整個進程中全部的資源都被回收-->守護線程也會被回收

進程是資源分配單位
子進程都須要它的父進程來回收資源
線程是進程中的資源
全部的線程都會隨着進程的結束而被回收的

 

線程鎖(互斥鎖)

多個線程同時操做全局變量/靜態變量 會產生數據不安全現象
互斥鎖
+= -= 說明了線程之間數據的不安全
a = a.strip() 帶返回值的都是先計算後賦值,數據不安全
a = a+1 /a+=1 數據不安全
if\while 數據不安全

append pop 說明了在線程中操做列表中的方法是數據安全的

 

from threading import Thread,Lock
import time
n = []
def append():
    for i in range(500000):
        n.append(1)
def pop(lock):
    for i in range(500000):
        with lock:
            if not n:
                time.sleep(0.0000001)    # 強制CPU輪轉
            n.pop()

t_l = []
lock = Lock()
for i in range(20):
    t1 = Thread(target=append)
    t1.start()
    t2 = Thread(target=pop,args=(lock,))
    t2.start()
    t_l.append(t1)
    t_l.append(t2)
for t in t_l:
    t.join()
print(n)
# 不要操做全局變量,不要在類裏操做靜態變量
# += -= *= /= if while 數據不安全
# queue logging 數據安全的

 

單例模式(鎖)

import time
class A:
    from threading import Lock
    __instance = None
    lock = Lock()
    def __new__(cls, *args, **kwargs):
        with cls.lock:
            if not cls.__instance:
                time.sleep(0.000001)   # cpu輪轉
                cls.__instance = super().__new__(cls)
        return cls.__instance
View Code

互斥鎖和遞歸鎖

# Lock 互斥鎖  效率高
# RLock 遞歸(recursion)鎖 效率相對低
l = Lock()
l.acquire()
print('但願被鎖住的代碼')
l.release()

rl = RLock()  # 在同一個線程中能夠被acquire屢次
rl.acquire()
print('但願被鎖住的代碼')
rl.release()
View Code

遞歸鎖

from threading import Thread,RLock as Lock

def func(i,lock):
    lock.acquire()
    lock.acquire()
    print(i,': start')
    lock.release()
    lock.release()
    print(i, ': end')

lock = Lock()
for i in range(5):
    Thread(target=func,args=(i,lock)).start()
View Code

死鎖現象

死鎖現象是怎麼產生的?
    多把(互斥/遞歸)鎖 而且在多個線程中 交叉使用
            fork_lock.acquire()
            noodle_lock.acquire()

            fork_lock.release()
            noodle_lock.release()
    若是是互斥鎖,出現了死鎖現象,最快速的解決方案把全部的互斥鎖都改爲一把遞歸鎖
         程序的效率會下降的
    遞歸鎖 效率低 可是解決死鎖現象有奇效
    互斥鎖 效率高 可是多把鎖容易出現死鎖現象

    一把互斥鎖就夠了
View Code

 

隊列模式

線程之間數據安全的容器隊列

先進先出 Queue

from queue import Empty  # 不是內置的錯誤類型,而是queue模塊中的錯誤
q = queue.Queue(4)   # fifo 先進先出的隊列
q.get()
q.put(1)
q.put(2)
q.put(3)
q.put(4)
print('4 done')
q.put_nowait(5)
print('5 done')
try:
    q.get_nowait()
except Empty:pass
print('隊列爲空,繼續其餘內容')
# put_nowait: 不會等待隊列有空閒位置再放入數據,若是數據放入不成功就直接崩潰
# get_nowait: 隊列爲空,取值的時候不等待,可是取不到值那麼直接崩潰了
View Code

後進先出 LifoQueue

from queue import LifoQueue   
# last in first out 後進先出 棧
lq = LifoQueue()
lq.put(1)
lq.put(2)
lq.put(3)
print(lq.get())
print(lq.get())
print(lq.get())
View Code

優先級 PriorityQueue

from queue import PriorityQueue  # 優先級隊列

priq = PriorityQueue()
priq.put((2,'alex'))
priq.put((1,'wusir'))
priq.put((0,'太白'))

print(priq.get())
print(priq.get())
print(priq.get())
View Code

池的定義

  要在程序開始的時候,還沒提交任務先建立幾個線程或者進程

  放在一個池子裏,這就是池

爲何要用池

  若是先開好進程/線程,那麼有任務以後就能夠直接使用這個池中的數據了

  而且開好的線程或者進程會一直存在在池中,能夠被多個任務反覆利用
  這樣極大的減小了開啓\關閉\調度線程/進程的時間開銷
  池中的線程/進程個數控制了操做系統須要調度的任務個數,控制池中的單位
  有利於提升操做系統的效率,減輕操做系統的負擔
  發展過程

 

 

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
def func(a,b):
    print('start')
    print(a,b)
    print('end')

if __name__ == '__main__':
    # p = ProcessPoolExecutor(max_workers=5)  #限制進程數量,默認爲cpu個數
    p = ThreadPoolExecutor(4)                    #線程默認是CPU個數的五倍

    for i in range(4):
        p.submit(func,1,2)                      #給進程池放置任務啓動,1,2爲傳參

 

同步:

任務發出去以後等待,直到這個任務最終結束以後,給我一個返回值,發佈下一個任務

from concurrent.futures import ProcessPoolExecutor
import os
import time
import random

def task():
    print(f"{os.getpid()}is running")
    time.sleep(1)
    return f'{os.getpid()} is finish'

if __name__ == '__main__':
    p = ProcessPoolExecutor(4)

    for i in range(10):
        obj = p.submit(task,)
        print(obj.result())      #同步等待一個進程內容所有執行完成在執行下一個
View Code

 

異步:

將任務發給進程,無論任務如何,直接運行下一個

from concurrent.futures import ProcessPoolExecutor
import os
import time
import random

def task():
    print(f'{os.getpid()} is running')
    time.sleep(random.randint(0,2))
    return f'{os.getpid()} is finish'

if __name__ == '__main__':
    p = ProcessPoolExecutor(4)
    obj_l1 = []
    for i in range(10):
        obj = p.submit(task,)   # 異步發出.
        obj_l1.append(obj)

    # time.sleep(3)
    p.shutdown(wait=True)
    # 1. 阻止在向進程池投放新任務,
    # 2. wait = True 十個任務是10,一個任務完成了-1,直至爲零.進行下一行.
    for i in obj_l1:
        print(i.result())

獲取任務結果

import os
import time,random
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
def func(a,b):
    print(os.getpid(),'start',a,b)
    time.sleep(random.randint(1,4))
    print(os.getpid(),'end')
    return a*b

if __name__ == '__main__':
    tp = ProcessPoolExecutor(4)
    futrue_l = {}
    for i in range(20):         # 異步非阻塞的
        ret = tp.submit(func,i,b=i+1)
        futrue_l[i] = ret
        # print(ret.result())   # Future將來對象
    for key in futrue_l:       # 同步阻塞的
        print(key,futrue_l[key].result())
View Code

map

只適合傳遞簡單的參數,而且必須是一個可迭代的類型做爲參數

# map  只適合傳遞簡單的參數,而且必須是一個可迭代的類型做爲參數
import os
import time,random
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
def func(a):
    print(os.getpid(),'start',a[0],a[1])
    time.sleep(random.randint(1,4))
    print(os.getpid(),'end')
    return a[0]*a[1]

if __name__ == '__main__':
    tp = ProcessPoolExecutor(4)
    ret = tp.map(func,((i,i+1) for i in range(20)))
    for key in ret:       # 同步阻塞的
        print(key)
View Code

 

回調函數

add_done_callback()

# 回調函數 : 效率最高的
import time,random
from threading import current_thread
from concurrent.futures import ThreadPoolExecutor

def func(a,b):
    print(current_thread().ident,'start',a,b)
    time.sleep(random.randint(1,4))
    print(current_thread().ident,'end',a)
    return (a,a*b)

def print_func(ret):       # 異步阻塞
    print(ret.result())

if __name__ == '__main__':
    tp = ThreadPoolExecutor(4)
    futrue_l = {}
    for i in range(20):         # 異步非阻塞的
        ret = tp.submit(func,i,b=i+1)
        ret.add_done_callback(print_func)  
# ret這個任務會在執行完畢的瞬間當即觸發print_func函數,而且把任務的返回值對象傳遞到print_func作參數
# 異步阻塞 回調函數 給ret對象綁定一個回調函數,等待ret對應的任務有告終果以後當即調用print_func這個函數
# 就能夠對結果當即進行處理,而不用按照順序接收結果處理結果
View Code

回調函數的例子

from concurrent.futures import ThreadPoolExecutor
import requests
import os

def get_page(url):    # 訪問網頁,獲取網頁源代碼   線程池中的線程來操做
    print('<進程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):   # 獲取到字典結果以後,計算網頁源碼的長度,把https://www.baidu.com : 1929749729寫到文件裏   線程任務執行完畢以後綁定回調函數
    res=res.result()
    print('<進程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)

if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]
    # 得到一個線程池對象 = 開啓線程池
    tp = ThreadPoolExecutor(4)
    # 循環urls列表
    for url in urls:
# 獲得一個futrue對象 = 把每個url提交一個get_page任務
        ret = tp.submit(get_page,url)
        # 給futrue對象綁定一個parse_page回調函數
        ret.add_done_callback(parse_page)   
        # 誰先回來誰就先寫結果進文件
View Code

 

協程

線程協程的區別:

  • 協程沒有鎖,協程又稱微線程
  • 線程和協程不一樣的是,線程是搶佔式調度切換,而協程是須要本身調度
  • 線程和進程,調度是CPU決定的,而協程就是上帝,在一個線程中規定某個代碼塊的執行順序
協程:本質是一個線程
     可以在一個線程內的多個任務之間來回切換
    節省io操做的時間也只能是和網絡操做相關的
    特色:數據安全,用戶級別,開銷小,不能利用多核,可以識別的io操做少
gevent 第三方模塊  完成併發的socket server
    協程對象.spawn(func,參數)
    能識別的io操做也是有限的
    而且要想讓gevent可以識別一些導入的模塊中的io操做
    from gevent import monkey;monkey.patch_all()
asyncio 內置模塊
    await 寫好的asyncio中的阻塞方法
    async 標識一個函數時協程函數,await語法必須用在async函數中

 

切換 並 規避io 的兩個模塊
gevent =  利用了  greenlet    底層模塊完成的切換 + 自動規避io的功能
asyncio = 利用了  yield    底層語法完成的切換 + 自動規避io的功能
    tornado 異步的web框架
    yield from - 更好的實現協程
    send - 更好的實現協程
    asyncio模塊 基於python原生的協程的概念正式的被成立
    特殊的在python中提供協程功能的關鍵字 : aysnc await
# 用戶級別的協程還有什麼好處:
    # 減輕了操做系統的負擔
    # 一條線程若是開了多個協程,那麼給操做系統的印象是線程很忙,這樣能多爭取一些時間片時間來被CPU執行,程序的效率就提升了

 

gevent

import gevent

def func():    # 帶有io操做的內容寫在函數裏,而後提交func給gevent
    print('start func')
    gevent.sleep(1)  
# gevent.sleep是一個特殊的,time.sleep在這裏不行
# 若是想用time就要在用下面的代碼
    print('end func')

g1 = gevent.spawn(func)
g2 = gevent.spawn(func)
g3 = gevent.spawn(func)
gevent.joinall([g1,g2,g3])

 

time

import time
print(time.sleep)
# 這裏的time和from gevent import mockey裏的不一樣

from gevent import monkey
monkey.patch_all()        
import time
import gevent

def func():    # 帶有io操做的內容寫在函數裏,而後提交func給gevent
    print('start func')
    time.sleep(1)
    print('end func')

g1 = gevent.spawn(func)
g2 = gevent.spawn(func)
g3 = gevent.spawn(func)
gevent.joinall([g1,g2,g3])
# 阻塞 直到協程g1任務執行結束
# 要有阻塞才能執行

gevent(server和client)

基於gevent協程實現socket併發

server

import socket
print(socket.socket)          # 在patch all以前打印一次
from gevent import monkey    # gevent 如何檢測是否能規避某個模塊的io操做呢?
monkey.patch_all()
import socket
import gevent
print(socket.socket)           # 在patch all以後打印一次,若是兩次的結果不同,那麼就說明可以規避io操做
def func(conn):
    while True:
        msg = conn.recv(1024).decode('utf-8')
        MSG = msg.upper()
        conn.send(MSG.encode('utf-8'))

sk = socket.socket()
sk.bind(('127.0.0.1',9001))
sk.listen()

while True:
    conn,_ = sk.accept()
    gevent.spawn(func,conn)
View Code

client

import time
import socket
from threading import Thread
def client():
    sk = socket.socket()
    sk.connect(('127.0.0.1',9001))
    while True:
        sk.send(b'hello')
        msg = sk.recv(1024)
        print(msg)
        time.sleep(0.5)

for i in range(500):
    Thread(target=client).start()
View Code

 

asyncio定義協程函數

import asyncio

async def func(name):
    print('start',name)
    # await 可能會發生阻塞的方法
    # await 關鍵字必須寫在一個async函數裏
    await asyncio.sleep(1)
    print('end')

loop = asyncio.get_event_loop()
loop.run_until_complete(func("alex")) # 單個任務
# loop.run_until_complete(asyncio.wait([func('alex'),func('太白')]))接收多個,接受列表
相關文章
相關標籤/搜索