併發編程知識點剖析

 

  併發編程知識點剖析python

一.  mysql

  進程(Process):是系統進行資源分配和調度的基本單位,是操做系統結構的基礎,進程是線程的容器。linux

  線程(Threading): 一條流水線的工做過程,cpu最小執行單位git

  線程與進程的區別能夠概括爲如下4點:github

      1)地址空間和其它資源(如打開文件):進程間相互獨立,同一進程的各線程間共享。某進程內的線程在其它進程不可見, 進程之間是空間隔離的,一個 進程裏的線程之間共享內存
      2)通訊:進程間通訊IPC,線程間能夠直接讀寫進程數據段(如全局變量)來進行通訊——須要進程同步和 互斥手段的輔助,以保證數據的一致性。(就相似進程中的鎖的做用)
      3)調度和切換:線程上下文切換比進程上下文切換要快得多。
      4)在多線程操做系統中(如今我們用的系統基本都是多線程的操做系統),進程不是一個可執行的實體,正去執行程序的不是進程,是線程,你能夠理解進程就是一個線程的容器。

  並行:同時運行,只有具有多個cpu才能實現並行,利用了多核,利用了多核,多個任務真正的在同時運行sql

     將多個cpu必須成高速公路上的多個車道,進程就比如每一個車道上行駛的車輛,並行就是說,你們在本身的車道上行駛,會不影響,同時在開車。這就是並行編程

  併發:僞並行,也提升了效率,遇到IO就切換,充分的利用了IO時間json

       即看起來是同時運行。單個cpu+多道技術就能夠實現併發,(並行也屬於併發)windows

  同步 : 要等待任務執行結果,才能進行下一個任務,其實就是一個程序結束才執行另一個程序,串行的,不必定兩個程序就有依賴關係。數組

  異步 : 不須要等待任務的執行結果,繼續執行本身的任務,不須要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工做,依賴的任務也當即執行,只要本身完成了整個任務就算完成了。

  阻塞 : 等待某個事件的發生而沒法繼續執行,阻塞的方法:input、time.sleep,socket中的recv、accept等等。 

  非阻塞 : 不等待

 

二.建立方式

#當前文件名稱爲test.py
# from multiprocessing import Process
#
# def func():
#     print(12345)
#
# if __name__ == '__main__': #windows 下才須要寫這個,這和系統建立進程的機制有關係,不用深究,記着windows下要寫就好啦
#     #首先我運行當前這個test.py文件,運行這個文件的程序,那麼就產生了進程,這個進程咱們稱爲主進程
#
#     p = Process(target=func,) #將函數註冊到一個進程中,p是一個進程對象,此時尚未啓動進程,只是建立了一個進程對象。而且func是不加括號的,由於加上括號這個函數就直接運行了對吧。
#     p.start() #告訴操做系統,給我開啓一個進程,func這個函數就被咱們新開的這個進程執行了,而這個進程是我主進程運行過程當中建立出來的,因此稱這個新建立的進程爲主進程的子進程,而主進程又能夠稱爲這個新進程的父進程。
          #而這個子進程中執行的程序,至關於將如今這個test.py文件中的程序copy到一個你看不到的python文件中去執行了,就至關於當前這個文件,被另一個py文件import過去並執行了。
          #start並非直接就去執行了,咱們知道進程有三個狀態,進程會進入進程的三個狀態,就緒,(被調度,也就是時間片切換到它的時候)執行,阻塞,而且在這個三個狀態之間不斷的轉換,等待cpu執行時間片到了。
#     print('*' * 10) #這是主進程的程序,上面開啓的子進程的程序是和主進程的程序同時運行的,咱們稱爲異步
進程的建立方式一
class MyProcess(Process): #本身寫一個類,繼承Process類
    #咱們經過init方法能夠傳參數,若是隻寫一個run方法,那麼無法傳參數,由於建立對象的是傳參就是在init方法裏面,面向對象的時候,咱們是否是學過
    def __init__(self,person):
        super().__init__()
        self.person=person
    def run(self):
        print(os.getpid())
        print(self.pid)
        print(self.pid)
        print('%s 正在和女主播聊天' %self.person)
    # def start(self):
    #     #若是你非要寫一個start方法,能夠這樣寫,而且在run方法先後,能夠寫一些其餘的邏輯
    #     self.run()
if __name__ == '__main__':
    p1=MyProcess('Jedan')
    p2=MyProcess('太白')
    p3=MyProcess('alexDSB')

    p1.start() #start內部會自動調用run方法
    p2.start()
    # p2.run()
    p3.start()


    p1.join()
    p2.join()
    p3.join()
進程的建立方式二

 

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('太白',))
    t.start()
    print('主線程')
線程的建立方式一
import time
from threading import Thread
class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)


if __name__ == '__main__':
    t = Sayhi('太白')
    t.start()
    print('主線程'
線程的建立方式二

 

三.守護進程,守護線程

  必定要在p.start()前設置,設置p爲守護進程(守護線程),禁止p建立子進程(子線程),而且父進程代碼執行結束,p即終止運行

import os
import time
from multiprocessing import Process

class Myprocess(Process):
    def __init__(self,person):
        super().__init__()
        self.person = person
    def run(self):
        print(os.getpid(),self.name)
        print('%s正在和女主播聊天' %self.person)
        time.sleep(3)
if __name__ == '__main__':
    p=Myprocess('太白')
    p.daemon=True #必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程代碼執行結束,p即終止運行
    p.start()
    # time.sleep(1) # 在sleep時linux下查看進程id對應的進程ps -ef|grep id
    print('')
進程設置守護進程
from threading import Thread
from multiprocessing import Process
import time
def func1():
    while True:
        print(666)
        time.sleep(0.5)
def func2():
    print('hello')
    time.sleep(3)

if __name__ == '__main__':
    # t = Thread(target=func1,)
    # t.daemon = True  #主線程結束,守護線程隨之結束
    # # t.setDaemon(True) #兩種方式,和上面設置守護線程是同樣的
    # t.start()
    # t2 = Thread(target=func2,) #這個子線程要執行3秒,主線程的代碼雖然執行完了,可是一直等着子線程的任務執行完畢,主線程纔算完畢,由於經過結果你會發現我主線程雖然代碼執行完畢了,\
    # 可是主線程的的守護線程t1還在執行,說明什麼,說明個人主線程尚未完畢,只不過是代碼執行完了,一直等着子線程t2執行完畢,我主線程的守護線程才中止,說明子線程執行完畢以後,個人主線程才執行完畢
    # t2.start()
    # print('主線程代碼執行完啦!')
    p = Process(target=func1,)
    p.daemon = True
    p.start()

    p2 = Process(target=func2,)
    p2.start()
    time.sleep(1) #讓主進程等1秒,爲了能看到func1的打印效果
    print('主進程代碼執行完啦!') #經過結果你會發現,若是主進程的代碼運行完畢了,那麼主進程就結束了,由於主進程的守護進程p隨着主進程的代碼結束而結束了,守護進程被回收了,這和線程是不同的,主線程的代碼完了並不表明主線程運行完畢了,須要等着全部其餘的非守護的子線程執行完畢纔算完畢
線程設置守護線程

  信號量(Semaphore)

  Semaphore管理一個內置的計數器,
  每當調用acquire()時內置計數器-1;
  調用release() 時內置計數器+1;
  計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。

from multiprocessing import Process,Semaphore
import time,random

def go_ktv(sem,user):
    sem.acquire()
    print('%s 佔到一間ktv小屋' %user)
    time.sleep(random.randint(0,3)) #模擬每一個人在ktv中待的時間不一樣
    sem.release()

if __name__ == '__main__':
    sem=Semaphore(4)
    p_l=[]
    for i in range(13):
        p=Process(target=go_ktv,args=(sem,'user%s' %i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('============》')
進程信號量
from threading import Thread,Semaphore
import threading
import time
# def func():
#     if sm.acquire():
#         print (threading.currentThread().getName() + ' get semaphore')
#         time.sleep(2)
#         sm.release()
def func():
    sm.acquire()
    print('%s get sm' %threading.current_thread().getName())
    time.sleep(3)
    sm.release()
if __name__ == '__main__':
    sm=Semaphore(5)
    for i in range(23):
        t=Thread(target=func)
        t.start()
線程信號量

  事件

  事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。
  event.isSet():返回event的狀態值;
  event.wait():若是 event.isSet()==False將阻塞線程;
  event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;
  event.clear():恢復event的狀態值爲False。

from multiprocessing import Process,Semaphore,Event
import time,random

e = Event() #建立一個事件對象
print(e.is_set())  #is_set()查看一個事件的狀態,默認爲False,可經過set方法改成True
print('look here!')
# e.set()          #將is_set()的狀態改成True。
# print(e.is_set())#is_set()查看一個事件的狀態,默認爲False,可經過set方法改成Tr
# e.clear()        #將is_set()的狀態改成False
# print(e.is_set())#is_set()查看一個事件的狀態,默認爲False,可經過set方法改成Tr
e.wait()           #根據is_set()的狀態結果來決定是否在這阻塞住,is_set()=False那麼就阻塞,is_set()=True就不阻塞
print('give me!!')

#set和clear  修改事件的狀態 set-->True   clear-->False
#is_set     用來查看一個事件的狀態
#wait       依據事件的狀態來決定是否阻塞 False-->阻塞  True-->不阻塞
進程 事件應用
from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('連接超時') #本身發起錯誤
        print('<%s>第%s次嘗試連接' % (threading.current_thread().getName(), count))
        event.wait(0.5) #
        count+=1
    print('<%s>連接成功' %threading.current_thread().getName())


def check_mysql():
    print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName())
    t1 = random.randint(0,3)
    print('>>>>',t1)
    time.sleep(t1)
    event.set()
if __name__ == '__main__':
    event=Event()
    check = Thread(target=check_mysql)
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)

    check.start()
    conn1.start()
    conn2.start()
線程 事件應用

  數據共享(Manager模塊)

from multiprocessing import Manager,Process,Lock
def work(d,lock):
    with lock: #不加鎖而操做共享的數據,確定會出現數據錯亂
        d['count']-=1

if __name__ == '__main__':
    lock=Lock()
    with Manager() as m:
        dic=m.dict({'count':100})
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)
進程之間數據不共享
from  threading import Thread
from multiprocessing import Process
import os
def work():
    global n  #修改全局變量的值
    n=0

if __name__ == '__main__':
    # n=100
    # p=Process(target=work)
    # p.start()
    # p.join()
    # print('主',n) #毫無疑問子進程p已經將本身的全局的n改爲了0,但改的僅僅是它本身的,查看父進程的n仍然爲100


    n=1
    t=Thread(target=work)
    t.start()
    t.join()   #必須加join,由於主線程和子線程不必定誰快,通常都是主線程快一些,全部咱們要等子線程執行完畢才能看出效果
    print('',n) #查看結果爲0,由於同一進程內的線程之間共享進程內的數據
# 經過一個global就實現了全局變量的使用,不須要進程的IPC通訊方法
線程之間數據共享

  隊列(queue)

 

q = Queue([maxsize]) 
建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。 
Queue的實例q具備如下方法:

q.get( [ block [ ,timeout ] ] ) 
返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。

q.get_nowait( ) 
同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) 
將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。

q.qsize() 
返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。


q.empty() 
若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。

q.full() 
若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。
隊列的方法介紹

 

#看下面的隊列的時候,按照編號看註釋
import time
from multiprocessing import Process, Queue

# 8. q = Queue(2) #建立一個Queue對象,若是寫在這裏,那麼在windows還子進程去執行的時候,咱們知道子進程中還會執行這個代碼,可是子進程中不可以再次建立了,也就是這個q就是你主進程中建立的那個q,經過咱們下面在主進程中先添加了一個字符串以後,在去開啓子進程,你會發現,小鬼這個字符串還在隊列中,也就是說,咱們使用的仍是主進程中建立的這個隊列。
def f(q):
    # q = Queue() #9. 咱們在主進程中開啓了一個q,若是咱們在子進程中的函數裏面再開一個q,那麼你下面q.put('姑娘,多少錢~')添加到了新建立的這q裏裏面了
    q.put('姑娘,多少錢~')  #4.調用主函數中p進程傳遞過來的進程參數 put函數爲向隊列中添加一條數據。
    # print(q.qsize()) #6.查看隊列中有多少條數據了

def f2(q):
    print('》》》》》》》》')
    print(q.get())  #5.取數據
if __name__ == '__main__':
    q = Queue() #1.建立一個Queue對象
    q.put('小鬼')

    p = Process(target=f, args=(q,)) #2.建立一個進程
    p2 = Process(target=f2, args=(q,)) #3.建立一個進程
    p.start()
    p2.start()
    time.sleep(1) #7.若是阻塞一點時間,就會出現主進程運行太快,致使咱們在子進程中查看qsize爲1個。
    # print(q.get()) #結果:小鬼
    print(q.get()) #結果:姑娘,多少錢~
    p.join()
進程隊列用法

class queue.Queue(maxsize=0) #先進先出

先進先出示例代碼

  class queue.LifoQueue(maxsize=0) #last in fisrt out

import queue q=queue.LifoQueue() #隊列,相似於棧,棧咱們提過嗎,是否是先進後出的順序啊 q.put('first') q.put('second') q.put('third') # q.put_nowait() print(q.get()) print(q.get()) print(q.get()) # q.get_nowait() ''' 結果(後進先出): third second first '''
先進後出示例代碼

  class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

import queue q=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高 q.put((-10,'a')) q.put((-5,'a')) #負數也能夠 # q.put((20,'ws')) #若是兩個值的優先級同樣,那麼按照後面的值的acsii碼順序來排序,若是字符串第一個數元素相同,比較第二個元素的acsii碼順序 # q.put((20,'wd')) # q.put((20,{'a':11})) #TypeError: unorderable types: dict() < dict() 不能是字典 # q.put((20,('w',1))) #優先級相同的兩個數據,他們後面的值必須是相同的數據類型才能比較,能夠是元祖,也是經過元素的ascii碼順序來排序  q.put((20,'b')) q.put((20,'a')) q.put((0,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) ''' 結果(數字越小優先級越高,優先級高的優先出隊): '''
優先級隊列示例代碼


  管道

  會致使數據不安全

from multiprocessing import Process, Pipe

def f(conn):
    conn.send("Hello 妹妹") #子進程發送了消息
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe() #創建管道,拿到管道的兩端,雙工通訊方式,兩端均可以收發消息
    p = Process(target=f, args=(child_conn,)) #將管道的一段給子進程
    p.start() #開啓子進程
    print(parent_conn.recv()) #主進程接受了消息
    p.join()
View Code


四.生產者消費者模型

    #程序中有兩類角色
        一類負責生產數據(生產者)
        一類負責處理數據(消費者)    
    #引入生產者消費者模型爲了解決的問題是:
        平衡生產者與消費者之間的工做能力,從而提升程序總體處理數據的速度     
    #如何實現:
        生產者<-->隊列<——>消費者
    #生產者消費者模型實現類程序的解耦和

 

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(name,q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))



if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨頭',q))
    p3=Process(target=producer,args=('泔水',q))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    p2.start()
    p3.start()
    c1.start()

    p1.join() #必須保證生產者所有生產完畢,才應該發送結束信號
    p2.join()
    p3.join()
    q.put(None) #有幾個消費者就應該發送幾回結束信號None
    q.put(None) #發送結束信號
    print('')
多個生產者消費者實現

 

五.鎖 GIL lock(同步鎖\互斥鎖) 遞歸鎖 (RLock),

 

  GIL 與Lock是兩把鎖,保護的數據不同,前者是解釋器級別的(固然保護的就是解釋器級別的數據,好比垃圾回收的數據),後者是保護用戶本身開發的應用程序的數據,很明顯GIL不負責     這件事,只能用戶自定義加鎖處理,即Lock

#注意:首先在當前文件目錄下建立一個名爲db的文件
#文件db的內容爲:{"count":1},只有這一行數據,而且注意,每次運行完了以後,文件中的1變成了0,你須要手動將0改成1,而後在去運行代碼。
#注意必定要用雙引號,否則json沒法識別
#加鎖保證數據安全,不出現混亂
from multiprocessing import Process,Lock
import time,json,random

#查看剩餘票數
def search():
    dic=json.load(open('db')) #打開文件,直接load文件中的內容,拿到文件中的包含剩餘票數的字典
    print('\033[43m剩餘票數%s\033[0m' %dic['count'])

#搶票
def get():
    dic=json.load(open('db'))
    time.sleep(0.1)       #模擬讀數據的網絡延遲,那麼進程之間的切換,致使全部人拿到的字典都是{"count": 1},也就是每一個人都拿到了這一票。
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(0.2)   #模擬寫數據的網絡延遲
        json.dump(dic,open('db','w'))
        #最終結果致使,每一個人顯示都搶到了票,這就出現了問題~
        print('\033[43m購票成功\033[0m')
    else:
        print('sorry,沒票了親!')
def task(lock):
    search()
    #由於搶票的時候是發生數據變化的時候,全部咱們將鎖加加到這裏
    lock.acquire()
    get()
    lock.release()
if __name__ == '__main__':
    lock = Lock() #建立一個鎖
    for i in range(3): #模擬併發100個客戶端搶票
        p=Process(target=task,args=(lock,)) #將鎖做爲參數傳給task函數
        p.start()

#看結果分析:只有一我的搶到了票
# 剩餘票數1
# 剩餘票數1
# 剩餘票數1
# 購票成功   #幸運的人兒
# sorry,沒票了親!
# sorry,沒票了親!
多進程加鎖 搶票系統實現

 

import time
from threading import Thread,RLock
fork_lock = noodle_lock = RLock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 搶到了麪條'%name)
    fork_lock.acquire()
    print('%s 搶到了叉子'%name)
    print('%s 吃麪'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    time.sleep(1) 
    noodle_lock.acquire()
    print('%s 搶到了麪條' % name)
    print('%s 吃麪' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['taibai','wulaoban']:
    t1 = Thread(target=eat1,args=(name,))
    t1.start()
for name in ['alex','peiqi']:
    t2 = Thread(target=eat2,args=(name,))
    t2.start()
遞歸鎖解決死鎖現象


六.進程池,線程池

p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
'''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()'''

p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
'''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。'''
    
p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成

P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用

方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法
obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
obj.ready():若是調用完成,返回True
obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
obj.wait([timeout]):等待結果變爲可用。
obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
方法
#一:使用進程池(異步調用,apply_async)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply_async(func, (msg, ))   #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去
        res_l.append(res)
        # s = res.get() #若是直接用res這個結果對象調用get方法獲取結果的話,這個程序就變成了同步,由於get方法直接就在這裏等着你建立的進程的結果,第一個進程建立了,而且去執行了,那麼get就會等着第一個進程的結果,沒有結果就一直等着,那麼主進程的for循環是沒法繼續的,因此你會發現變成了同步的效果
    print("==============================>") #沒有後面的join,或get,則程序總體結束,進程池中的任務還沒來得及所有執行完也都跟着主進程一塊兒結束了

    pool.close() #關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
    pool.join()   #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束

    print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對象組成的列表,而非最終的結果,但這一步是在join後執行的,證實結果已經計算完畢,剩下的事情就是調用每一個對象下的get方法去獲取結果
    for i in res_l:
        print(i.get()) #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get

#二:使用進程池(同步調用,apply)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(0.1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply(func, (msg, ))   #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去
        res_l.append(res) #同步執行,即執行完一個拿到結果,再去執行另一個
    print("==============================>")
    pool.close()
    pool.join()   #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束

    print(res_l) #看到的就是最終的結果組成的列表
    for i in res_l: #apply是同步的,因此直接獲得結果,沒有get()方法
        print(i)
進程池 同步 異步 操做

 

from multiprocessing import Pool
import time,random
import requests
import re

def get_page(url,pattern):
    response=requests.get(url)
    if response.status_code == 200:
        return (response.text,pattern)

def parse_page(info):
    page_content,pattern=info
    res=re.findall(pattern,page_content)
    for item in res:
        dic={
            'index':item[0],
            'title':item[1],
            'actor':item[2].strip()[3:],
            'time':item[3][5:],
            'score':item[4]+item[5]

        }
        print(dic)
if __name__ == '__main__':
    pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)

    url_dic={
        'http://maoyan.com/board/7':pattern1,
    }

    p=Pool()
    res_l=[]
    for url,pattern in url_dic.items():
        res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
        res_l.append(res)

    for i in res_l:
        i.get()

    # res=requests.get('http://maoyan.com/board/7')
    # print(re.findall(pattern,res.text))
爬蟲實例
import os
from multiprocessing import Pool

def func1(n):
    print('func1>>',os.getpid())
    print('func1')
    return n*n

def func2(nn):
    print('func2>>',os.getpid())
    print('func2')
    print(nn)
    # import time
    # time.sleep(0.5)
if __name__ == '__main__':
    print('主進程:',os.getpid())
    p = Pool(5)
    #args裏面的10給了func1,func1的返回值做爲回調函數的參數給了callback對應的函數,不能直接給回調函數直接傳參數,他只能是你任務函數func1的函數的返回值
    # for i in range(10,20): #若是是多個進程來執行任務,那麼當全部子進程將結果給了回調函數以後,回調函數又是在主進程上執行的,那麼就會出現打印結果是同步的效果。咱們上面func2裏面註銷的時間模塊打開看看
    #     p.apply_async(func1,args=(i,),callback=func2)
    p.apply_async(func1,args=(10,),callback=func2)

    p.close()
    p.join()

#結果
# 主進程: 11852  #發現回調函數是在主進程中完成的,其實若是是在子進程中完成的,那咱們直接將代碼寫在子進程的任務函數func1裏面就好了,對不對,這也是爲何稱爲回調函數的緣由。
# func1>> 17332
# func1
# func2>> 11852
# func2
# 100
進程池回調函數的使用
#server>>>>>>>>>>

#Pool內的進程數默認是cpu核數,假設爲4(查看方法os.cpu_count())
#開啓6個客戶端,會發現2個客戶端處於等待狀態
#在每一個進程內查看pid,會發現pid使用爲4個,即多個客戶端公用4個進程
from socket import *
from multiprocessing import Pool
import os

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn):
    print('進程pid: %s' %os.getpid())
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    p=Pool(4)
    while True:
        conn,*_=server.accept()
        p.apply_async(talk,args=(conn,))
        # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問

#client.>>>>>>>>>>>>>.

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
進程池版socket聊天
import time
import os
import threading
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def func(n):
    time.sleep(2)
    print('%s打印的:'%(threading.get_ident()),n)
    return n*n
tpool = ThreadPoolExecutor(max_workers=5) #默認通常起線程的數據不超過CPU個數*5
# tpool = ProcessPoolExecutor(max_workers=5) #進程池的使用只須要將上面的ThreadPoolExecutor改成ProcessPoolExecutor就好了,其餘都不用改
#異步執行
t_lst = []
for i in range(5):
    t = tpool.submit(func,i) #提交執行函數,返回一個結果對象,i做爲任務函數的參數 def submit(self, fn, *args, **kwargs):  能夠傳任意形式的參數
    t_lst.append(t)  #
    # print(t.result())
    #這個返回的結果對象t,不能直接去拿結果,否則又變成串行了,能夠理解爲拿到一個號碼,等全部線程的結果都出來以後,咱們再去經過結果對象t獲取結果
tpool.shutdown() #起到原來的close阻止新任務進來 + join的做用,等待全部的線程執行完畢
print('主線程')
for ti in t_lst:
    print('>>>>',ti.result())

# 咱們還能夠不用shutdown(),用下面這種方式
# while 1:
#     for n,ti in enumerate(t_lst):
#         print('>>>>', ti.result(),n)
#     time.sleep(2) #每一個兩秒去去一次結果,哪一個有結果了,就能夠取出哪個,想表達的意思就是說不用等到全部的結果都出來再去取,能夠輪詢着去取結果,由於你的任務須要執行的時間很長,那麼你須要等好久才能拿到結果,經過這樣的方式能夠將快速出來的結果先拿出來。若是有的結果對象裏面尚未執行結果,那麼你什麼也取不到,這一點要注意,不是空的,是什麼也取不到,那怎麼判斷我已經取出了哪個的結果,能夠經過枚舉enumerate來搞,記錄你是哪個位置的結果對象的結果已經被取過了,取過的就再也不取了

#結果分析: 打印的結果是沒有順序的,由於到了func函數中的sleep的時候線程會切換,誰先打印就沒準兒了,可是最後的咱們經過結果對象取結果的時候拿到的是有序的,由於咱們主線程進行for循環的時候,咱們是按順序將結果對象添加到列表中的。
# 37220打印的: 0
# 32292打印的: 4
# 33444打印的: 1
# 30068打印的: 2
# 29884打印的: 3
# 主線程
# >>>> 0
# >>>> 1
# >>>> 4
# >>>> 9
# >>>> 16
ThreadPoolExecutor的簡單使用
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<進程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<進程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,須要用obj.result()拿到結果
進程池回調函數的使用


七. 協程

  協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。、

   總結協程特色:

  1. 必須在只有一個單線程裏實現併發
  2. 修改共享數據不需加鎖
  3. 用戶程序裏本身保存多個控制流的上下文棧
  4. 附加:一個協程遇到IO操做自動切換到其它協程(如何實現檢測IO,yield、greenlet都沒法實現,就用到了gevent模塊(select機制))

   Greenlet

  若是咱們在單個線程內有20個任務,要想實如今多個任務之間切換,使用yield生成器的方式過於麻煩(須要先獲得初始化一次的生成器,而後再調用     send。。。很是麻煩),而使用greenlet模塊能夠很是簡單地實現這20個任務直接的切換

#安裝 pip3 install greenlet
複製代碼
複製代碼
 
  #真正的協程模塊就是使用greenlet完成的切換
from greenlet import greenlet def eat(name): print('%s eat 1' %name) #2 g2.switch('taibai') #3 print('%s eat 2' %name) #6 g2.switch() #7 def play(name): print('%s play 1' %name) #4 g1.switch() #5 print('%s play 2' %name) #8 g1=greenlet(eat) g2=greenlet(play) g1.switch('taibai')#能夠在第一次switch時傳入參數,之後都不須要 1

   Gevent

   #安裝
  pip3 install gevent

 

用法
g1=gevent.spawn(func,1,2,3,x=4,y=5)建立一個協程對象g1,spawn括號內第一個參數是函數名,如eat,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數eat的

g2=gevent.spawn(func2)

g1.join() #等待g1結束,上面只是建立協程對象,這個join纔是去執行

g2.join() #等待g2結束  有人測試的時候會發現,不寫第二個join也能執行g2,是的,協程幫你切換執行了,可是你會發現,若是g2裏面的任務執行的時間長,可是不寫join的話,就不會執行完等到g2剩下的任務了


#或者上述兩步合做一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值

 

from gevent import monkey;monkey.patch_all() #必須寫在最上面,這句話後面的全部阻塞所有可以識別了

import gevent  #直接導入便可
import time
def eat():
    #print()  
    print('eat food 1')
    time.sleep(2)  #加上mokey就可以識別到time模塊的sleep了
    print('eat food 2')

def play():
    print('play 1')
    time.sleep(1)  #來回切換,直到一個I/O的時間結束,這裏都是咱們個gevent作得,再也不是控制不了的操做系統了。
    print('play 2')

g1=gevent.spawn(eat)
g2=gevent.spawn(play_phone)
gevent.joinall([g1,g2])
print('')
gevent使用
from gevent import spawn,joinall,monkey;monkey.patch_all()

import time
def task(pid):
    """
    Some non-deterministic task
    """
    time.sleep(0.5)
    print('Task %s done' % pid)


def synchronous():
    for i in range(10):
        task(i)

def asynchronous():
    g_l=[spawn(task,i) for i in range(10)]
    joinall(g_l)

if __name__ == '__main__':
    print('Synchronous:')
    synchronous()

    print('Asynchronous:')
    asynchronous()
#上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,後者阻塞當前流程,並執行全部給定的greenlet。執行流程只會在 全部greenlet執行完後纔會繼續向下走。
gevent的同步異步
from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time

def get_page(url):
    print('GET: %s' %url)
    response=requests.get(url)
    if response.status_code == 200:
        print('%d bytes received from %s' %(len(response.text),url))


start_time=time.time()
gevent.joinall([
    gevent.spawn(get_page,'https://www.python.org/'),
    gevent.spawn(get_page,'https://www.yahoo.com/'),
    gevent.spawn(get_page,'https://github.com/'),
])
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))
協程在爬蟲中的應用

 

八.IO多路複用

    同步:提交一個任務以後要等待這個任務執行完畢

    異步:只管提交任務,不等待這個任務執行完畢就能夠去作其餘的事情

    阻塞:recv、recvfrom、accept,線程階段  運行狀態-->阻塞狀態-->就緒

    非阻塞:沒有阻塞狀態

  在一個線程的IO模型中,咱們recv的地方阻塞,咱們就開啓多線程,可是無論你開啓多少個線程,這個recv的時間是否是沒有被規避掉,無論是多線程仍是多進程都沒有規避掉這個IO時間。

  selectors模塊

 

#服務端
from socket import *
import selectors

sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
    conn,addr=server_fileobj.accept()
    sel.register(conn,selectors.EVENT_READ,read)

def read(conn,mask):
    try:
        data=conn.recv(1024)
        if not data:
            print('closing',conn)
            sel.unregister(conn)
            conn.close()
            return
        conn.send(data.upper()+b'_SB')
    except Exception:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()



server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False) #設置socket的接口爲非阻塞
sel.register(server_fileobj,selectors.EVENT_READ,accept) #至關於網select的讀列表裏append了一個文件句柄server_fileobj,而且綁定了一個回調函數accept

while True:
    events=sel.select() #檢測全部的fileobj,是否有完成wait data的
    for sel_obj,mask in events:
        callback=sel_obj.data #callback=accpet
        callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)

#客戶端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))

while True:
    msg=input('>>: ')
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))
selectors模塊代碼實例
相關文章
相關標籤/搜索