併發編程.md

操做系統基礎

  • 人機矛盾: CPU利用率低css

  • 磁帶存儲+批處理:下降數據的讀取時間,提升CPU的利用率html

  • 多道操做系統------在一個任務遇到IO的時候主動讓出CPU,給其餘任務使用python

    • 由操做系統完成
    • 切換要須要時間
    多道技術:
        1.產生背景:針對單核,實現併發
        ps:
        如今的主機通常是多核,那麼每一個核都會利用多道技術
        有4個cpu,運行於cpu1的某個程序遇到io阻塞,會等到io結束再從新調度,會被調度到4個
        cpu中的任意一個,具體由操做系統調度算法決定。
        
        2.空間上的複用:如內存中同時有多道程序
        3.時間上的複用:複用一個cpu的時間片
           強調:遇到io切,佔用cpu時間過長也切,核心在於切以前將進程的狀態保存下來,這樣
                才能保證下次切換回來時,能基於上次切走的位置繼續運行
  • 分時操做系統-------給時間分片,讓多個任務輪流使用CPUlinux

    • 短做業優先算法
    • 先來先服務算法

    每一個程序分配一個時間片,輪轉使用CPU,切換須要時間,下降CPU利用率,提升用戶體驗ios

  • 通用操做系統-------分時操做系統 + 多道操做系統 + 實時操做系統redis

    • 多個程序一塊兒在計算機中執行
    • 一個程序若是遇到IO操做,切出去讓出CPU
    • 一個程序沒有遇到IO,可是時間片到時了,切出去讓出CPU
  • 操做系統負責什麼?算法

    調度進程前後執行的順序 控制執行的時間等等
    資源的分配數據庫

進程的概念

進程:json

  • 運行中的程序
  • 是計算機中最小的資源分配單位
  • 在操做系統中惟一標識符:PID
進程和程序的區別:
程序只是一個文件
進程是這個文件被CPU運行起來了

操做系統調度進程的算法:windows

  • 短做業優先
  • 先來先服務
  • 時間片輪轉
  • 多級反饋算法

並行與併發:

  • 並行:並行是指二者同時執行,好比賽跑,兩我的都在不停的往前跑;(資源夠用,好比三個線程,四核的CPU )
  • 併發:併發是指資源有限的狀況下,二者交替輪流使用資源,好比一段路(單核CPU資源)同時只能過一我的,A走一段後,讓給B,B用完繼續給A ,交替使用,目的是提升效率。

img

  • 就緒(Ready)狀態

    當進程已分配到除CPU之外的全部必要的資源,只要得到處理機即可當即執行,這時的進程狀態稱爲就緒狀態。

  • 執行/運行(Running)

    狀態當進程已得到處理機,其程序正在處理機上執行,此時的進程狀態稱爲執行狀態。

  • 阻塞(Blocked)狀態

    正在執行的進程,因爲等待某個事件發生而沒法執行時,便放棄處理機而處於阻塞狀態。引發進程阻塞的事件可有多種,例如,等待I/O完成、申請緩衝區不能知足、等待信件(信號)等。

同步異步:

所謂同步就是一個任務的完成須要依賴另一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成,這是一種可靠的任務序列。要麼成功都成功,失敗都失敗,兩個任務的狀態能夠保持一致。

所謂異步是不須要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工做,依賴的任務也當即執行,只要本身完成了整個任務就算完成了。至於被依賴的任務最終是否真正完成,依賴它的任務沒法肯定,因此它是不可靠的任務序列。

在python程序中的進程操做

進程:

# 建立進程 時間開銷大
# 銷燬進程 時間開銷大
# 進程之間切換 時間開銷大

線程:

線程是進程的一部分,每一個進程中至少有一個線程
能被CPU調度的最小單位
一個進程中的多個線程是能夠共享這個進程的數據的  —— 數據共享
線程的建立、銷燬、切換 開銷遠遠小於進程  —— 開銷小

進程:是計算機中最小的資源分配單位(進程是負責圈資源)

線程:是計算機中能被CPU調度的最小單位 (線程是負責執行具體代碼的)

os.getpid():獲取當前進程pid

os.getppid():獲取父級進程pid,能夠建立子進程,在pycharm中啓動的全部py程序都是pycharm的子進程

import os
import time
from multiprocessing import Process
# multiprocessing多進程模塊Process類
def func():
    print('start',os.getpid())
    time.sleep(1)
    print('end',os.getpid())

if __name__ == '__main__':
    p = Process(target=func) # 將函數封裝到類,建立一個要開啓func進程的對象
    p.start()   # 異步 調用開啓進程的方法 可是並不等待這個進程真的開啓
    print('main :',os.getpid())
#main : 11436
#start 9860
#end 9860
操做系統建立進程的方式不一樣
windows操做系統執行開啓進程的代碼
    實際上新的子進程須要經過import父進程的代碼來完成數據的導入工做
    因此有一些內容咱們只但願在父進程中完成,就寫在if __name__ == '__main__':下面
ios linux操做系統建立進程 fork,拷貝的方式
  • 主進程和子進程之間的關係

    父進程會等待着全部的子進程結束以後才結束,爲了回收資源

主進程代碼執行完畢:
    # 主進程負責回收子進程的資源
    # 若是子進程執行結束,父進程沒有回收資源,那麼這個子進程會變成一個殭屍進程
# 主進程的結束邏輯
    # 主進程的代碼結束
    # 全部的子進程結束
    # 給子進程回收資源
    # 主進程結束

# 主進程怎麼知道子進程結束了的呢?
    # 基於網絡、文件
  • join方法 :阻塞父進程,直到對應子進程結束就結束
import time
from multiprocessing import Process
def send_mail():
    time.sleep(3)
    print('發送了一封郵件')
if __name__ == '__main__':
    p = Process(target=send_mail)
    p.start()   # 異步 非阻塞
    # time.sleep(5)
    print('join start')
    p.join()    # 同步 阻塞 直到p對應的進程結束以後才結束阻塞
    print('5000封郵件已發送完畢')
#join start
#發送了一封郵件
#5000封郵件已發送完畢
import time
import random
from multiprocessing import Process
def send_mail(a):
    time.sleep(random.random())
    print('發送了一封郵件',a)

if __name__ == '__main__':
    l = []
    for i in range(10):
        p = Process(target=send_mail,args=(i,))#向子進程傳參數,用元組
        p.start()
        l.append(p)  #回收多個子進程資源,先添指列表,最後統一處理
    for p in l:p.join()
    # 阻塞 直到上面的十個進程都結束
    print('5000封郵件已發送完畢')
發送了一封郵件 5
發送了一封郵件 4
發送了一封郵件 3
......
5000封郵件已發送完畢

補充:Windows開啓進程,因爲建立機制,必須採用此方式.

print([__name__])
if __name__ == '__main__':
    # 控制當這個py文件被看成腳本直接執行的時候,就執行這裏面的代碼
    # 當這個py文件被看成模塊導入的時候,就不執行這裏面的代碼
    print('hello hello')
# __name__ == '__main__'
    # 執行的文件就是__name__所在的文件
# __name__ == '文件名'
    # __name__所在的文件被導入執行的時候
  • 守護進程

    隨着主進程的代碼結束而結束的,全部的子進程都必須在主進程結束以前結束,由主進程來負責回收資源

    p.daemon = True

其餘方法:

p.is_alive() 判斷進程是否活着
 p.terminate()   # 能夠解釋異步非阻塞, 關閉須要時間,並不等到返回結束進程結果,會變殭屍
def son1():
    while True:
        print('is alive')
        time.sleep(0.5)

if __name__ == '__main__':
    p = Process(target=son1)
    p.start()      # 異步 非阻塞
    print(p.is_alive())
    time.sleep(1)
    p.terminate()   # 異步的 非阻塞
    print(p.is_alive())   # 進程還活着 由於操做系統還沒來得及關閉進程
    time.sleep(0.01)
    print(p.is_alive())   # 操做系統已經響應了咱們要關閉進程的需求,再去檢測的時候,獲得的結果是進程已經結束了

使用面向對象方式開啓進程

import os
import time
from multiprocessing import Process

class MyProcecss2(Process):  #必須繼承Process
    def run(self):			#必需要有run方法,重寫process的run,start自動調用run
        while True:
            print('is alive')
            time.sleep(0.5)

class MyProcecss1(Process):
    def __init__(self,x,y):   #傳參數要定義init函數
        self.x = x
        self.y = y
        super().__init__()	  #要導入父類的初始化參數
    def run(self):
        print(self.x,self.y,os.getpid())
        for i in range(5):
            print('in son2')
            time.sleep(1)

if __name__ == '__main__':
    mp = MyProcecss1(1,2)
    mp.daemon = True      
    mp.start()
    print(mp.is_alive())
    mp.terminate()
    # mp2 = MyProcecss2()
    # mp2.start()
    # print('main :',os.getpid())
    # time.sleep(1)

Process操做進程的方法

# p.start() 開啓進程      異步非阻塞
# p.terminate() 結束進程  異步非阻塞
# p.join()     同步阻塞
# p.isalive()  獲取當前進程的狀態
# daemon = True 設置爲守護進程,守護進程永遠在主進程的代碼結束以後自動結束

# 1.若是在一個併發的場景下,涉及到某部份內容
    # 是須要修改一些全部進程共享數據資源
    # 須要加鎖來維護數據的安全
# 2.在數據安全的基礎上,才考慮效率問題
# 3.同步存在的意義
    # 數據的安全性

# 在主進程中實例化 lock = Lock()
# 把這把鎖傳遞給子進程
# 在子進程中 對須要加鎖的代碼 進行 with lock:
    # with lock至關於lock.acquire()和lock.release()
# 在進程中須要加鎖的場景
    # 共享的數據資源(文件、數據庫)
    # 對資源進行修改、刪除操做
# 加鎖以後可以保證數據的安全性 可是也下降了程序的執行效率
mport time
import json
from multiprocessing import Process,Lock

def search_ticket(user):
    with open('ticket_count') as f:
        dic = json.load(f)
        print('%s查詢結果 : %s張餘票'%(user,dic['count']))

def buy_ticket(user,lock):
    # with lock:
    # lock.acquire() # 給這段代碼加上一把鎖
        time.sleep(0.02)
        with open('ticket_count') as f:
            dic = json.load(f)
        if dic['count'] > 0:
            print('%s買到票了'%(user))
            dic['count'] -= 1
        else:
            print('%s沒買到票' % (user))
        time.sleep(0.02)
        with open('ticket_count','w') as f:
            json.dump(dic,f)
    # lock.release() # 給這段代碼解鎖

def task(user, lock):
    search_ticket(user)
    with lock:
        buy_ticket(user, lock)

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        p = Process(target=task,args=('user%s'%i,lock))
        p.start()

進程之間通訊IPC

進程之間的通訊 - IPC(inter process communication)
第三方:redis,memcache,kafka,rabbitmq
特色:併發需求,高可用,斷電保存數據,解耦
from multiprocessing import Queue,Process
# 先進先出
def func(exp,q):
    ret = eval(exp)
    q.put({ret,2,3})
    q.put(ret*2)
    q.put(ret*4)

if __name__ == '__main__':
    q = Queue()
    Process(target=func,args=('1+2+3',q)).start()
    print(q.get())
    print(q.get())
    print(q.get())
# Queue基於 天生就是數據安全的
    # 文件家族的socket pickle lock
# pipe 管道(不安全的) = 文件家族的socket pickle
# 隊列 = 管道 + 鎖
# from multiprocessing import Pipe
# pip = Pipe()
# pip.send()
# pip.recv()
import queue

# from multiprocessing import Queue
# q = Queue(5)
# q.put(1)
# q.put(2)
# q.put(3)
# q.put(4)
# q.put(5) # 當隊列爲滿的時候再向隊列中放數據 隊列會阻塞
# print('5555555')
# try:
# q.put_nowait(6) # 當隊列爲滿的時候再向隊列中放數據 會報錯而且會丟失數據
# except queue.Full:
# pass
# print('6666666')
#
# print(q.get())
# print(q.get())
# print(q.get()) # 在隊列爲空的時候會發生阻塞
# print(q.get()) # 在隊列爲空的時候會發生阻塞
# print(q.get()) # 在隊列爲空的時候會發生阻塞
# try:
# print(q.get_nowait()) # 在隊列爲空的時候 直接報錯
# except queue.Empty:pass

生產者消費者模型

什麼是生產者消費者模型?
# 把一個產生數據而且處理數據的過程解耦
# 讓生產的數據的過程和處理數據的過程達到一個工做效率上的平衡
# 中間的容器,在多進程中咱們使用隊列或者可被join的隊列,作到控制數據的量
    # 當數據過剩的時候,隊列的大小會控制這生產者的行爲
    # 當數據嚴重不足的時候,隊列會控制消費者的行爲
    # 而且咱們還能夠經過按期檢查隊列中元素的個數來調節生產者消費者的個數

第一種方式:

import time
import random
from multiprocessing import Process,Queue

def producer(q,name,food):
    for i in range(10):
        time.sleep(random.random())
        fd = '%s%s'%(food,i)
        q.put(fd)
        print('%s生產了一個%s'%(name,food))

def consumer(q,name):
    while True:
        food = q.get()
        if not food:break
        time.sleep(random.randint(1,3))
        print('%s吃了%s'%(name,food))


def cp(c_count,p_count):
    q = Queue(10)
    for i in range(c_count):
        Process(target=consumer, args=(q, 'alex')).start()
    p_l = []
    for i in range(p_count):
        p1 = Process(target=producer, args=(q, 'wusir', '泔水'))
        p1.start()
        p_l.append(p1)
    for p in p_l:p.join()
    for i in range(c_count):
        q.put(None)
if __name__ == '__main__':
    cp(2,3)
流程:消費者開啓進程get,生產者開啓進程put,加入隊列,所有結束後(jion),隊列put(None),消費者get到空終止

第二種方式:

import time
import random
from  multiprocessing import JoinableQueue,Process

def producer(q,name,food):
    for i in range(10):
        time.sleep(random.random())
        fd = '%s%s'%(food,i)
        q.put(fd)
        print('%s生產了一個%s'%(name,food))
    q.join()

def consumer(q,name):
    while True:
        food = q.get()
        time.sleep(random.random())
        print('%s吃了%s'%(name,food))
        q.task_done()

if __name__ == '__main__':
    jq = JoinableQueue()
    p =Process(target=producer,args=(jq,'wusir','泔水'))
    p.start()
    c = Process(target=consumer,args=(jq,'alex'))
    c.daemon = True
    c.start()
    p.join()

JoinableQueue一樣經過multiprocessing使用。

建立隊列的另一個類:

​ JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

參數介紹:

​ maxsize是隊列中容許最大項數,省略則無大小限制。

方法介紹:

​ JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:

​ q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常

​ q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止

進程間數據共享

from multiprocessing import Manager,Process,Lock

def func(dic,lock):
    with lock:
        dic['count'] -= 1

if __name__ == '__main__':
    # m = Manager()
    with Manager() as m:
        l = Lock()
        dic = m.dict({'count':100})
        p_l = []
        for i in range(100):
            p = Process(target=func,args=(dic,l))
            p.start()
            p_l.append(p)
        for p in p_l:p.join()
        print(dic)

mulprocessing中有一個manager類,封裝了全部和進程相關的 數據共享 數據傳遞相關的數據類型,可是對於 字典 列表這一類的數據操做的時候會產生數據不安全, 須要加鎖解決問題,而且須要儘可能少的使用這種方式.

線程

GIL鎖:全局解釋器鎖,cpython解釋器中特殊的垃圾回收機制,致使了在同一個進程中多個線程不能同時利用多核 —— python的多線程只能是併發不能是並行

因此使用所線程並不影響高io型的操做,只會對高計算型的程序由效率上的影響

主線程何時結束?等待全部子線程結束以後才結束
主線程若是結束了,主進程也就結束了
# multiprocessing 是徹底仿照這threading的類寫的
from threading import Thread
def func():
    print('start son thread')   
# 啓動線程 start
Thread(target=func).start()
# 開啓多個子線程
def func(i):
    print('start son thread',i)
    time.sleep(1)
    print('end son thread',i)

for i in range(10):
    Thread(target=func,args=(i,)).start()
print('main')
# join方法 阻塞 直到子線程執行結束
import time
import os
from threading import Thread
def func(i):
    print('start son thread',i)
    time.sleep(1)
    print('end son thread',i,os.getpid())
t_l = []
for i in range(10):
    t = Thread(target=func,args=(i,))
    t.start()
    t_l.append(t)
for t in t_l:t.join()
print('子線程執行完畢')
# 使用面向對象的方式啓動線程
class MyThread(Thread):
    def __init__(self,i):
        self.i = i
        super().__init__()
    def run(self):
        print('start',self.i,self.ident)
        time.sleep(1)
        print('end',self.i)

for i in range(10):
    t = MyThread(i)
    t.start()
    print(t.ident)  #線程id
# 線程裏的一些其餘方法
from threading import current_thread,enumerate,active_count
def func(i):
    t = current_thread()	#當前線程對象
    print('start son thread',i,t.ident)
    time.sleep(1)
    print('end son thread',i,os.getpid())

t = Thread(target=func,args=(1,))
t.start()
print(t.ident)
print(current_thread().ident)   # 水性楊花 在哪個線程裏,current_thread()獲得的就是這個當前線程的信息
print(enumerate())	#活着的線程列表
print(active_count())   # =====len(enumerate())
terminate 結束進程,在線程中不能從主線程結束一個子線程
# 守護線程
import time
from threading import Thread
def son1():
    while True:
        time.sleep(0.5)
        print('in son1')
def son2():
    for i in range(5):
        time.sleep(1)
        print('in son2')
t =Thread(target=son1)
t.daemon = True
t.start()
Thread(target=son2).start()
time.sleep(3)
# 守護線程一直等到全部的非守護線程都結束以後才結束
# 除了守護了主線程的代碼以外也會守護子線程

線程鎖

即使是線程 即使有GIL 也會出現數據不安全的問題

# 1.操做的是全局變量
# 2.作一下操做
    # += -= *= /+ 先計算再賦值才容易出現數據不安全的問題
    # 包括 lst[0] += 1  dic['key']-=1
a = 0
def add_f(lock):
    global a
    for i in range(200000):
        with lock:
            a += 1

def sub_f(lock):
    global a
    for i in range(200000):
        with lock:
            a -= 1

from threading import Thread,Lock
lock = Lock()
t1 = Thread(target=add_f,args=(lock,))
t1.start()
t2 = Thread(target=sub_f,args=(lock,))
t2.start()
t1.join()
t2.join()
print(a)

加鎖會影響程序的執行效率,可是保證了數據的安全

互斥鎖是鎖中的一種:在同一個線程中,不能連續acquire屢次

☆帶鎖的單例模式

import time
from threading import Lock
class A:
    __instance = None
    lock = Lock()
    def __new__(cls, *args, **kwargs):
        with cls.lock:
            if not cls.__instance:
                time.sleep(0.1)
                cls.__instance = super().__new__(cls)
        return cls.__instance
    def __init__(self,name,age):
        self.name = name
        self.age = age

def func():
    a = A('alex', 84)
    print(a)

from threading import Thread
for i in range(10):
    t = Thread(target=func)
    t.start()

遞歸鎖

from threading import RLock
# rlock = RLock()
# rlock.acquire()
# print('*'*20)
# rlock.acquire()
# print('-'*20)
# rlock.acquire()
# print('*'*20)

優勢:在同一個線程中,能夠連續acuqire屢次不會被鎖住

缺點:佔用了更多資源

死鎖現象:在某一些線程中出現陷入阻塞而且永遠沒法結束阻塞的狀況就是死鎖現象

1.多把鎖+交替使用

2.互斥鎖在一個線程中連續acquire

避免方法:在一個線程中只有一把鎖,而且每一次acquire以後都要release

解決方法:能夠用遞歸鎖解決,也能夠經過優化代碼邏輯解決.

import time
from threading import RLock,Thread
# noodle_lock = RLock()
# fork_lock = RLock()
noodle_lock = fork_lock = RLock()
print(noodle_lock,fork_lock)
def eat1(name,noodle_lock,fork_lock):
    noodle_lock.acquire()
    print('%s搶到面了'%name)
    fork_lock.acquire()
    print('%s搶到叉子了' % name)
    print('%s吃了一口面'%name)
    time.sleep(0.1)
    fork_lock.release()
    print('%s放下叉子了' % name)
    noodle_lock.release()
    print('%s放下面了' % name)

def eat2(name,noodle_lock,fork_lock):
    fork_lock.acquire()
    print('%s搶到叉子了' % name)
    noodle_lock.acquire()
    print('%s搶到面了'%name)
    print('%s吃了一口面'%name)
    time.sleep(0.1)
    noodle_lock.release()
    print('%s放下面了' % name)
    fork_lock.release()
    print('%s放下叉子了' % name)

lst = ['alex','wusir','taibai','yuan']
Thread(target=eat1,args=(lst[0],noodle_lock,fork_lock)).start()
Thread(target=eat2,args=(lst[1],noodle_lock,fork_lock)).start()
Thread(target=eat1,args=(lst[2],noodle_lock,fork_lock)).start()
Thread(target=eat2,args=(lst[3],noodle_lock,fork_lock)).start()

互斥鎖解決

import time
from threading import Lock,Thread
lock = Lock()
def eat1(name,noodle_lock,fork_lock):
    lock.acquire()
    print('%s搶到面了'%name)
    print('%s搶到叉子了' % name)
    print('%s吃了一口面'%name)
    time.sleep(0.1)
    print('%s放下叉子了' % name)
    print('%s放下面了' % name)
    lock.release()

def eat2(name,noodle_lock,fork_lock):
    lock.acquire()
    print('%s搶到叉子了' % name)
    print('%s搶到面了'%name)
    print('%s吃了一口面'%name)
    time.sleep(0.1)
    print('%s放下面了' % name)
    print('%s放下叉子了' % name)
    lock.release()

lst = ['alex','wusir','taibai','yuan']
Thread(target=eat1,args=(lst[0],noodle_lock,fork_lock)).start()
Thread(target=eat2,args=(lst[1],noodle_lock,fork_lock)).start()
Thread(target=eat1,args=(lst[2],noodle_lock,fork_lock)).start()
Thread(target=eat2,args=(lst[3],noodle_lock,fork_lock)).start()

先進先出隊列

from queue import Queue

後進先出隊列---棧

from queue import LifoQueue

優先級隊列

自動的排序
搶票的用戶級別 100000 100001
告警級別
from queue import PriorityQueue
pq = PriorityQueue()
pq.put((10,'alex'))
pq.put((6,'wusir'))
pq.put((20,'yuan'))
print(pq.get())
print(pq.get())

預先的開啓固定個數的進程數,當任務來臨的時候,直接提交給已經開好的進程,讓這個進程去執行就能夠了,節省了進程,線程的開啓關閉的切換時間,而且減輕了操做系統調度的負擔.

開啓步驟

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
# 建立一個池子
tp = ThreadPoolExecutor(池中線程/進程的個數)
# 異步提交任務
ret = tp.submit(函數,參數1,參數2....)
# 獲取返回值
ret.result()
# 在異步的執行完全部任務以後,主線程/主進程纔開始執行的代碼
tp.shutdown() 阻塞 直到全部的任務都執行完畢
# 關閉池以後就不能繼續提交任務,而且會阻塞,直到已經提交的任務完成
# map方法
ret = tp.map(func,iterable) #迭代獲取iterable中的內容,做爲func的參數,讓子線程來執行對應的任務
for i in ret: 每個都是任務的返回值
# 回調函數
ret.add_done_callback(函數名)
# 要在ret對應的任務執行完畢以後,直接繼續執行add_done_callback綁定的函數中的內容,而且ret的結果會做爲參數返回給綁定的函數

帶參數及返回值

def func(i,name):
    print('start',os.getpid())
    time.sleep(random.randint(1,3))
    print('end', os.getpid())
    return '%s * %s'%(i,os.getpid())
if __name__ == '__main__':
    p = ProcessPoolExecutor(5)
    ret_l = []
    for i in range(10):
        ret = p.submit(func,i,'alex')
        ret_l.append(ret)
    for ret in ret_l:
        print('ret-->',ret.result())  # ret.result() 同步阻塞
    print('main',os.getpid())

回調函數

import requests
from concurrent.futures import ThreadPoolExecutor
def get_page(url):
    res = requests.get(url)
    return {'url':url,'content':res.text}

def parserpage(ret): #必須有參數
    dic = ret.result()
    print(dic)
tp = ThreadPoolExecutor(5)
url_lst = [
    'http://www.baidu.com',   # 3
    'http://www.cnblogs.com', # 1
    'http://www.douban.com',  # 1
    'http://www.tencent.com',
    'http://www.cnblogs.com/Eva-J/articles/8306047.html',
    'http://www.cnblogs.com/Eva-J/articles/7206498.html',
]
ret_l = []
for url in url_lst:
    ret = tp.submit(get_page,url)
    ret_l.append(ret)
    ret.add_done_callback(parserpage)
回調函數add_done_callback
        # 執行完子線程任務以後直接調用對應的回調函數
        # 爬取網頁 須要等待數據傳輸和網絡上的響應高IO的 -- 子線程
        # 分析網頁 沒有什麼IO操做 -- 這個操做不必在子線程完成,交給回調函數
是單獨開啓線程進程仍是池?
    # 若是隻是開啓一個子線程作一件事情,就能夠單獨開線程
    # 有大量的任務等待程序去作,要達到必定的併發數,開啓線程池
    # 根據你程序的io操做也能夠斷定是用池仍是不用池?
        # socket的server 大量的阻塞io   recv recvfrom socketserver
        # 爬蟲的時候 池

池的總結

hreadPoolExecutor中的幾個經常使用方法
    # tp = ThreadPoolExecutor(cpu*5)
    # obj = tp.submit(須要在子線程執行的函數名,參數)
    # obj
        # 1.獲取返回值 obj.result() 是一個阻塞方法
        # 2.綁定回調函數 obj.add_done_callback(子線程執行完畢以後要執行的代碼對應的函數)
    # ret = tp.map(須要在子線程執行的函數名,iterable)
        # 1.迭代ret,老是能獲得全部的返回值
    # shutdown
        # tp.shutdown()

進程和線程中的鎖

# 全部在線程中能工做的基本都不能在進程中工做
# 在進程中可以使用的基本在線程中也可使用

在多進程中啓動多線程

在多進程裏啓動多線程
import os
from multiprocessing import Process
from threading import Thread

def tfunc():
    print(os.getpid())
def pfunc():
    print('pfunc-->',os.getpid())
    Thread(target=tfunc).start()

if __name__ == '__main__':
    Process(target=pfunc).start()

協程

# 協程:
    # 用戶級別的,由咱們本身寫的python代碼來控制切換的
    # 是操做系統不可見的
# 在Cpython解釋器下 - 協程和線程都不能利用多核,都是在一個CPU上輪流執行
    # 因爲多線程自己就不能利用多核
    # 因此即使是開啓了多個線程也只能輪流在一個CPU上執行
    # 協程若是把全部任務的IO操做都規避掉,只剩下須要使用CPU的操做
    # 就意味着協程就能夠作到題高CPU利用率的效果
# 多線程和協程
    # 線程 切換須要操做系統,開銷大,操做系統不可控,給操做系統的壓力大
        # 操做系統對IO操做的感知更加靈敏
    # 協程 切換須要python代碼,開銷小,用戶操做可控,徹底不會增長操做系統的壓力
        # 用戶級別可以對IO操做的感知比較低

gevent模塊開啓協程

import time
print('-->',time.sleep)
import gevent
from gevent import monkey
monkey.patch_all()
def eat():
    print('wusir is eating')
    print('in eat: ')
    time.sleep(1)				#遇到阻塞讓出CPU
    return 'wusir finished eat'

def sleep():
    print('小馬哥 is sleeping')
    time.sleep(1)
    print('小馬哥 finished sleep')
g_l=[]
for i in range(10):		# 創造十個協程任務
    g1 = gevent.spawn(eat)  
    g_l.append(g1)
g2 = gevent.spawn(sleep)  # 創造一個協程任務
g2.join()   # 阻塞 直到g1任務完成爲止
gevent.joinall(g_l)		#jionall後面加包含gevent對象的列表
for i in g_l:
    print(i.value)		#value取值

asyncio模塊

# 起一個任務
# async def demo(): # 協程方法
# print('start')
# await asyncio.sleep(1) # 阻塞
# print('end')
#
# loop = asyncio.get_event_loop() # 建立一個事件循環
# loop.run_until_complete(demo()) # 把demo任務丟到事件循環中去執行

# 啓動多個任務,而且沒有返回值
# async def demo(): # 協程方法
# print('start')
# await asyncio.sleep(1) # 阻塞
# print('end')
#
# loop = asyncio.get_event_loop() # 建立一個事件循環
# wait_obj = asyncio.wait([demo(),demo(),demo()])
# loop.run_until_complete(wait_obj)

# 啓動多個任務而且有返回值
# async def demo(): # 協程方法
# print('start')
# await asyncio.sleep(1) # 阻塞
# print('end')
# return 123
#
# loop = asyncio.get_event_loop()
# t1 = loop.create_task(demo())
# t2 = loop.create_task(demo())
# tasks = [t1,t2]
# wait_obj = asyncio.wait([t1,t2])
# loop.run_until_complete(wait_obj)
# for t in tasks:
# print(t.result())

# 誰先回來先取誰的結果
# import asyncio
# async def demo(i): # 協程方法
# print('start')
# await asyncio.sleep(10-i) # 阻塞
# print('end')
# return i,123
#
# async def main():
# task_l = []
# for i in range(10):
# task = asyncio.ensure_future(demo(i))
# task_l.append(task)
# for ret in asyncio.as_completed(task_l):
# res = await ret
# print(res)
#
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())



# import asyncio
#
# async def get_url():
# reader,writer = await asyncio.open_connection('www.baidu.com',80)
# writer.write(b'GET / HTTP/1.1\r\nHOST:www.baidu.com\r\nConnection:close\r\n\r\n')
# all_lines = []
# async for line in reader:
# data = line.decode()
# all_lines.append(data)
# html = '\n'.join(all_lines)
# return html
#
# async def main():
# tasks = []
# for url in range(20):
# tasks.append(asyncio.ensure_future(get_url()))
# for res in asyncio.as_completed(tasks):
# result = await res
# print(result)
#
#
# if __name__ == '__main__':
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main()) # 處理一個任務
相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息