Python之路【第十一篇】: 進程與線程

閱讀目錄

一. cpython併發編程之多進程
1.1 multiprocessing模塊介紹
1.2 Process類的介紹
1.3 Process類的使用
1.4 進程間通訊(IPC)方式一:隊列
1.5 進程間通訊(IPC)方式二:管道(瞭解部分)
1.6 進程間通訊方式三:共享數據
1.7 進程同步(鎖),信號量,事件...
1.8 進程池
二. python併發編程之多線程
2.1 threading模塊
2.2 Python GIL(Global Interpreter Lock)
2.3 同步鎖
2.4 死鎖與遞歸鎖
2.5 信號量Semahpore
2.6 事件Event
2.7 條件Condition(瞭解)html

2.8 定時器Timer
2.9 線程queue
2.10 Python標準模塊--concurrent.futures
三.  協程python

四. 協程模塊greenletmysql

五. gevent模塊(單線程併發)git

六. 綜合應用github

 

一. cpython併發編程之多進程

1.1 multiprocessing模塊介紹

python中的多線程沒法利用多核優點,若是想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分狀況須要使用多進程。Python提供了很是好用的多進程包multiprocessing。
 multiprocessing模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊threading的編程接口相似。web

multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。redis

強調: 與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。sql

1.2 Process類的介紹

建立進程的類:數據庫

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)

強調:
1. 須要使用關鍵字的方式來指定參數
2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號

參數介紹:編程

group參數未使用,值始終爲None

target表示調用對象,即子進程要執行的任務

args表示調用對象的位置參數元組,args=(1,2,'egon',)

kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}

name爲子進程的名稱

方法介紹:

p.start():啓動進程,並調用該子進程中的p.run() 
p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法  

p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
p.is_alive():若是p仍然運行,返回True

p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程

屬性介紹:

p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置

p.name:進程的名稱

p.pid:進程的pid

p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)

p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)

 

1.3 Process類的使用

1.建立並開啓子進程的兩種方式

注: 在windows中Process()必須放到# if __name__ == '__main__':下

Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. 
If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). 
This is the reason for hiding calls to Process() inside

if __name__ == "__main__"
since statements inside this if-statement will not get called upon import.

因爲Windows沒有fork,多處理模塊啓動一個新的Python進程並導入調用模塊。 
若是在導入時調用Process(),那麼這將啓動無限繼承的新進程(或直到機器耗盡資源)。 
這是隱藏對Process()內部調用的原理,使用if __name__ == 「__main __」,這個if語句中的語句將不會在導入時被調用。

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = "shuke"
# Date: 2017/6/26 0026

import time
import random
from multiprocessing import Process


def talk(name):
    print("%s is say 'Hello'" % name)
    time.sleep(3)
    print("talking end")

if __name__ == '__main__':
    p1=Process(target=talk,args=('Shuke',))         # args是元組的形式,必須加逗號
    p2=Process(target=talk,args=('Tom',))
    p3=Process(target=talk,args=('Eric',))
    p4=Process(target=talk,args=('Lucy',))
    p1.start()
    p2.start()
    p3.start()
    p4.start()
開啓進程(方式一)
import time
import random
from multiprocessing import Process


class Talk(Process):    # 繼承Process類

    def __init__(self,name):
        super(Talk, self).__init__()    # 繼承父類__init__方法
        self.name=name

    def run(self):          # 必須實現一個run方法,規定
        print("%s is say 'Hello'" % self.name)
        time.sleep(random.randint(1,3))
        print("%s talking end"% self.name)

if __name__ == '__main__':
    p1=Talk('Shuke')
    p2=Talk('Eric')
    p3=Talk('Tome')
    p4=Talk('Lucy')

    p1.start()          # start方法會自動調用run方法運行
    p2.start()
    p3.start()
    p4.start()
    print("主線程")

'''
執行結果:
主線程
Shuke is say 'Hello'
Lucy is say 'Hello'
Tome is say 'Hello'
Eric is say 'Hello'
Tome talking end
Eric talking end
Lucy talking end
Shuke talking end
'''
開啓進程(方式二)

併發實現socket通訊示例

from socket import *
from multiprocessing import Process

server = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8081))
server.listen(5)


def talk(conn, client_addr):
    while True:
        try:
            msg = conn.recv(1024)
            if not msg: break
            conn.send(msg.upper())
        except Exception:
            break


if __name__ == '__main__':  # windows下start進程必定要寫到這下面
    while True:
        conn, addr = server.accept()
        p = Process(target=talk, args=(conn, addr))
        p.start()
server端
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8081))

while True:
    msg=input('>>:').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
client端

存在的問題:

每來一個客戶端,都在服務端開啓一個進程,若是併發來一個萬個客戶端,要開啓一萬個進程嗎,你本身嘗試着在你本身的機器上開啓一萬個,10萬個進程試一試。

解決方法:進程池

2. Process對象的其餘方法和屬性

進程對象的其餘方法一:terminate,is_alive

import time
import random
from multiprocessing import Process


class Talk(Process):    # 繼承Process類

    def __init__(self,name):
        super(Talk, self).__init__()    # 繼承父類__init__方法
        self.name=name

    def run(self):          # 必須實現一個run方法,規定
        print("%s is say 'Hello'" % self.name)
        time.sleep(random.randint(1,3))
        print("%s talking end"% self.name)

if __name__ == '__main__':
    p1=Talk('Shuke')

    p1.start()          # start方法會自動調用run方法運行
    p1.terminate()      # 關閉進程,不會當即關閉,因此is_alive馬上查看的結果可能仍是存活
    print(p1.is_alive())# True
    time.sleep(1)       # 模擬CPU調度的延時
    print("====分割線====")
    print(p1.is_alive())# False

'''
執行結果:
True
====分割線====
False
'''
terminate,is_alive

進程對象的其餘方法二:p1.daemon=True,p1.join

import time
import random
from multiprocessing import Process


class Talk(Process):

    def __init__(self,name):
        super(Talk, self).__init__()
        self.name=name

    def run(self):
        print("%s is say 'Hello'" % self.name)
        time.sleep(random.randint(1,3))
        print("%s talking end"% self.name)

if __name__ == '__main__':
    p1=Talk('Shuke')
    p1.daemon = True    # 必定要在p1.start()前設置,設置p1爲守護進程,禁止p1建立子進程,而且父進程結束,p1跟着一塊兒結束
    p1.start()          # start方法會自動調用run方法運行
    p1.join(0.0001)     # 等待p1中止,等0.0001秒就再也不等了
p1.daemon=True,p1.join

剖析p1.join

from multiprocessing import Process

import time
import random
def piao(name):
    print('%s is piaoing' %name)
    time.sleep(random.randint(1,3))
    print('%s is piao end' %name)

p1=Process(target=piao,args=('egon',))
p2=Process(target=piao,args=('alex',))
p3=Process(target=piao,args=('yuanhao',))
p4=Process(target=piao,args=('wupeiqi',))

p1.start()
p2.start()
p3.start()
p4.start()

p1.join()
p2.join()
p3.join()
p4.join()

print('主線程')

#疑問:既然join是等待進程結束,那麼我像下面這樣寫,進程不就又變成串行的了嗎?
#固然不是了
#注意:進程只要start就會在開始運行了,因此p1-p4.start()時,系統中已經有四個併發的進程了
#而咱們p1.join()是在等p1結束,沒錯p1只要不結束主線程就會一直卡在原地,這也是問題的關鍵
#join是讓主線程等,而p1-p4仍然是併發執行的,p1.join的時候,其他p2,p3,p4仍然在運行,等p1.join結束,可能p2,p3,p4早已經結束了,這樣p2.join,p3.join.p4.join直接經過
# 因此4個join花費的總時間仍然是耗費時間最長的那個進程運行的時間


#上述啓動進程與join進程能夠簡寫爲
p_l=[p1,p2,p3,p4]

for p in p_l:
    p.start()

for p in p_l:
    p.join()
有了join,程序不就是串行了嗎???

進程對象的其餘屬性:name,pid

import time
import random
from multiprocessing import Process


class Talk(Process):

    def __init__(self,name):
        # self.name=name
        # super().__init__() #Process的__init__方法會執行self.name=Piao-1,
        #                    #因此加到這裏,會覆蓋咱們的self.name=name

        # 爲咱們開啓的進程設置名字的作法
        super().__init__()
        self.name=name

    def run(self):
        print("%s is say 'Hello'" % self.name)
        time.sleep(random.randint(1,3))
        print("%s talking end"% self.name)

if __name__ == '__main__':
    p1=Talk('Shuke')
    p1.start()          # start方法會自動調用run方法運行
    print("====")
    print(p1.pid)       # 查看pid

'''
執行結果:
====
20484
Shuke is say 'Hello'
Shuke talking end
'''
屬性:name,pid

3. 進程同步(鎖)

進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件,或同一個打印終端,是沒有問題的

#多進程共享一個打印終端(用python2測試看兩個進程同時往一個終端打印,出現打印到一行的錯誤)
from multiprocessing import Process
import time
class Logger(Process):
    def __init__(self):
        super(Logger,self).__init__()
    def run(self):
        print(self.name)


for i in range(1000000):
    l=Logger()
    l.start()
#多進程共享一套文件系統
from multiprocessing import Process
import time,random

def work(f,msg):
    f.write(msg)
    f.flush()


f=open('a.txt','w') #在windows上沒法把f當作參數傳入,能夠傳入一個文件名,而後在work內用a+的方式打開文件,進行寫入測試
for i in range(5):
    p=Process(target=work,args=(f,str(i)))
    p.start()

注: 既然能夠用文件共享數據,那麼進程間通訊用文件做爲數據傳輸介質就能夠了啊,能夠,可是有問題:

1.效率

2.須要本身加鎖處理

需知:加鎖的目的是爲了保證多個進程修改同一塊數據時,同一時間只能有一個修改,即串行的修改,沒錯,速度是慢了,犧牲了速度而保證了數據安全。

進程之間數據隔離,可是共享一套文件系統,於是能夠經過文件來實現進程直接的通訊,但問題是必須本身加鎖處理。因此,就讓咱們用文件當作數據庫,模擬搶票,(Lock互斥鎖),見下文搶票示例。

學習了經過使用共享的文件的方式,實現進程直接的共享,即共享數據的方式,這種方式必須考慮周全同步、鎖等問題。並且文件是操做系統提供的抽象,能夠做爲進程直接通訊的介質,與mutiprocess模塊無關。

但其實mutiprocessing模塊爲咱們提供了基於消息的IPC通訊機制:隊列和管道。

IPC機制中的隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來,咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可擴展性。

 

1.4 進程間通訊(IPC)方式一:隊列

 進程彼此之間互相隔離,要實現進程間通訊,即IPC,multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的,普遍應用在分佈式系統中。

Queue模塊有三種隊列及構造函數:
  1. Python Queue模塊的FIFO隊列先進先出。 class Queue.Queue(maxsize)
  2. LIFO相似於堆,即先進後出。 class Queue.LifoQueue(maxsize)
  3. 還有一種是優先級隊列級別越低越先出來。 class Queue.PriorityQueue(maxsize)

Queue類(建立隊列)

Queue([maxsize]):建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞,底層是以管道和鎖的方式實現的。

參數介紹:

maxsize是隊列中容許最大項數,省略則無大小限制。    

方法介紹:

主要方法:

q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。
q.get方法能夠從隊列讀取而且刪除一個元素。一樣,get方法有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常.
 
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)

q.empty():調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目。
q.full():調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走。
q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣
q.task_done() 在完成一項工做以後,q.task_done() 函數向任務已經完成的隊列發送一個信號
q.join() 實際上意味着等到隊列爲空,再執行別的操做

其餘方法:

q.cancel_join_thread():不會在進程退出時自動鏈接後臺線程。能夠防止join_thread()方法阻塞
q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,若是某個使用者正在被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。
q.join_thread():鏈接隊列的後臺線程。此方法用於在調用q.close()方法以後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread方法能夠禁止這種行爲

應用:

'''
multiprocessing 模塊支持進程間通訊的兩種主要形式:管道和隊列
都是基於消息傳遞實現的,都是隊列接口
'''

from multiprocessing import Process,Queue
import time

q=Queue(5)
q.put([1,2,3])
q.put(('a','b','c'))
q.put(100)
q.put("Hello World")
q.put({'name':'shuke'})
# q.put('隊列滿了')           # 若是隊列元素滿了,後續put進入隊列的數據將會處於等待狀態,直到隊列的元素被消費,才能夠加入
print(q.qsize())            # 5; 返回隊列的大小
print(q.full())             # True

print(q.get())              # [1, 2, 3]
print(q.get())              # ('a', 'b', 'c')
print(q.get())              # 100
print(q.get())              # Hello World
print(q.get())              # {'name': 'shuke'}
# print(q.get())            # 若是隊列元素所有被消費完成,會一直卡住,直到隊列中被放入新的元素
print(q.empty())            # True

生產者消費者模型

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。

爲何要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

什麼是生產者消費者模式

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

基於隊列實現生產者消費者模型

from multiprocessing import Process,Queue
import time
import random

def producer(seq,q,name):
    for item in seq:
        time.sleep(random.randint(1,3))
        q.put(item)
        print("%s 生產者生產了: %s"%(name,item))


def consumer(q,name):
    while True:
        time.sleep(random.randint(1,3))
        res=q.get()
        print("%s 消費者消費了: %s"%(name,res))


if __name__ == '__main__':
    q=Queue()
    seq=("蘋果%s"% i for i in range(5))

    p=Process(target=consumer,args=(q,'Tom'))       # 以元組的方式傳參
    p.start()
    producer(seq,q,'shuke')
    print("=====主線程=====")

'''
執行結果:
shuke 生產者生產了: 蘋果0
Tom 消費者消費了: 蘋果0
shuke 生產者生產了: 蘋果1
Tom 消費者消費了: 蘋果1
shuke 生產者生產了: 蘋果2
shuke 生產者生產了: 蘋果3
Tom 消費者消費了: 蘋果2
shuke 生產者生產了: 蘋果4
=====主線程=====
Tom 消費者消費了: 蘋果3
Tom 消費者消費了: 蘋果4
'''
生產者消費者模型示例(基於隊列)
# 生產者發送結束標誌給消費者
from multiprocessing import Process,Queue
import time
import random

def producer(seq,q,name):
    for item in seq:
        time.sleep(random.randint(1,3))
        q.put(item)
        print("%s 生產者生產了: %s"%(name,item))


def consumer(q,name):
    while True:
        time.sleep(random.randint(1,3))
        res=q.get()
        if res is None:break
        print("%s 消費者消費了: %s"%(name,res))


if __name__ == '__main__':
    q=Queue()
    seq=("蘋果%s"% i for i in range(5))

    c=Process(target=consumer,args=(q,'Tom'))       # 以元組的方式傳參
    c.start()

    producer(seq,q,'shuke')
    q.put(None)
    c.join()    # 主線程等待直到c消費者進程運行結束再繼續往下運行
    print("=====主線程=====")

'''
執行結果:
shuke 生產者生產了: 蘋果0
Tom 消費者消費了: 蘋果0
shuke 生產者生產了: 蘋果1
Tom 消費者消費了: 蘋果1
shuke 生產者生產了: 蘋果2
Tom 消費者消費了: 蘋果2
shuke 生產者生產了: 蘋果3
Tom 消費者消費了: 蘋果3
shuke 生產者生產了: 蘋果4
Tom 消費者消費了: 蘋果4
=====主線程=====
'''
主線程等到消費者結束

JoinableQueue類 (建立隊列的另一個類)

JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的消費者通知生產者隊列已經被成功處理,通知進程是使用共享的信號和條件變量來實現的。

參數介紹:

maxsize是隊列中容許最大項數,省略則無大小限制。

方法介紹:

JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:

  • q.task_done(): 使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常。
  • q.join(): 生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止。
from multiprocessing import Process,JoinableQueue
import time
import random

def producer(seq,q,name):
    for item in seq:
        q.put(item)
        print("%s 生產者生產了: %s"%(name,item))
    q.join()            # 生產者調用此方法進行阻塞


def consumer(q,name):
    while True:
        res=q.get()
        if res is None:break
        print("%s 消費者消費了: %s"%(name,res))
        q.task_done()       # 使用者使用此方法發出信號,表示q.get()的返回元素已經被消費處理。

if __name__ == '__main__':
    q=JoinableQueue()
    seq=("蘋果%s"% i for i in range(5))

    c=Process(target=consumer,args=(q,'Tom'))       # 以元組的方式傳參
    c.daemon=True     # 在start以前進行設置爲守護進程,在主線程中止時c也中止,可是不用擔憂,producer內調用q.join保證了consumer已經處理完隊列中的全部元素
    c.start()

    producer(seq,q,'shuke')
    print("=====主線程=====")

'''
執行結果:
shuke 生產者生產了: 蘋果0
Tom 消費者消費了: 蘋果0
shuke 生產者生產了: 蘋果1
Tom 消費者消費了: 蘋果1
shuke 生產者生產了: 蘋果2
Tom 消費者消費了: 蘋果2
shuke 生產者生產了: 蘋果3
Tom 消費者消費了: 蘋果3
shuke 生產者生產了: 蘋果4
Tom 消費者消費了: 蘋果4
=====主線程=====
'''
q.join與q.task_done示例
from multiprocessing import Process,JoinableQueue
import time
import random

def producer(seq,q,name):
    for item in seq:
        time.sleep(random.randint(1,3))
        q.put(item)
        print("%s 生產者生產了: %s"%(name,item))
    q.join()


def consumer(q,name):
    while True:
        time.sleep(random.randint(1, 3))
        res=q.get()
        if res is None:break
        print("%s 消費者消費了: %s"%(name,res))
        q.task_done()

if __name__ == '__main__':
    q=JoinableQueue()
    seq=("蘋果%s"% i for i in range(5))

    c1=Process(target=consumer,args=(q,'消費者1'))       # 以元組的方式傳參
    c2=Process(target=consumer,args=(q,'消費者2'))
    c3=Process(target=consumer,args=(q,'消費者3'))
    c1.daemon=True     # 在start以前進行設置爲守護進程,在主線程中止時c也中止,可是不用擔憂,producer內調用q.join保證了consumer已經處理完隊列中的全部元素
    c2.daemon=True
    c3.daemon=True
    c1.start()
    c2.start()
    c3.start()

    producer(seq,q,'shuke')
    print("=====主線程=====")

'''
執行結果:
shuke 生產者生產了: 蘋果0
消費者3 消費者消費了: 蘋果0
shuke 生產者生產了: 蘋果1
消費者1 消費者消費了: 蘋果1
shuke 生產者生產了: 蘋果2
消費者2 消費者消費了: 蘋果2
shuke 生產者生產了: 蘋果3
消費者1 消費者消費了: 蘋果3
shuke 生產者生產了: 蘋果4
消費者3 消費者消費了: 蘋果4
=====主線程=====
'''
一個生產者+多個消費者
from multiprocessing import Process,JoinableQueue
import time
import random

def producer(seq,q,name):
    for item in seq:
        # time.sleep(random.randint(1,3))
        q.put(item)
        print("%s 生產者生產了: %s"%(name,item))
    q.join()


def consumer(q,name):
    while True:
        # time.sleep(random.randint(1, 3))
        res=q.get()
        if res is None:break
        print("%s 消費者消費了: %s"%(name,res))
        q.task_done()

if __name__ == '__main__':
    q=JoinableQueue()
    seq=["蘋果%s"% i for i in range(5)]

    c1=Process(target=consumer,args=(q,'消費者1'))       # 以元組的方式傳參
    c2=Process(target=consumer,args=(q,'消費者2'))
    c3=Process(target=consumer,args=(q,'消費者3'))
    c1.daemon=True     # 在start以前進行設置爲守護進程,在主線程中止時c也中止,可是不用擔憂,producer內調用q.join保證了consumer已經處理完隊列中的全部元素
    c2.daemon=True
    c3.daemon=True
    c1.start()
    c2.start()
    c3.start()

    # producer(seq,q,'shuke')     # 也能夠是下面三行的形式,開啓一個新的子進程當生產者,不用主線程當生產者
    p=Process(target=producer,args=(seq,q,'shuke'))     # 注意此處參數seq爲列表
    p.start()
    p.join()
    print("=====主線程=====")

'''
執行結果:
shuke 生產者生產了: 蘋果0
shuke 生產者生產了: 蘋果1
消費者3 消費者消費了: 蘋果0
shuke 生產者生產了: 蘋果2
消費者2 消費者消費了: 蘋果1
消費者3 消費者消費了: 蘋果2
shuke 生產者生產了: 蘋果3
消費者2 消費者消費了: 蘋果3
shuke 生產者生產了: 蘋果4
消費者3 消費者消費了: 蘋果4
=====主線程=====
'''
開啓一個子進程看成生產者而不是主線程

 

1.5 進程間通訊(IPC)方式二:管道(瞭解部分)

管道也能夠說是隊列的另一種形式,下面咱們就開始介紹基於管道實現進程之間的消息傳遞

Pipe類(建立管道)

Pipe([duplex]): 在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道

參數介紹:

dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。

方法介紹:

主要方法:

  • conn1.recv(): 接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。
  • conn1.send(obj): 經過鏈接發送對象。obj是與序列化兼容的任意對象。
其餘方法:
conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法。
conn1.fileno():返回鏈接使用的整數文件描述符。
conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。
conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收    
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。

基於管道實現進程間通訊 (與隊列的方式是相似的,隊列就是管道加鎖實現的)

from multiprocessing import Process,Pipe
import time

def consumer(p,name):
    left,right = p
    left.close()
    while True:
        try:
            fruit = right.recv()
            print("%s 收到水果: %s" % (name,fruit))
        except EOFError:
            right.close()
            break

def producer(seq,p):
    left,right = p
    right.close()
    for item in seq:
        left.send(item)
    else:
        left.close()

if __name__ == '__main__':
    left,right = Pipe()
    c1=Process(target=consumer,args=((left,right),'Tom'))
    c1.start()

    seq=(i for i in range(5))
    producer(seq,(left,right))
    right.close()
    left.close()

    c1.join()
    print("===主線程===")
    
'''
執行結果:
Tom 收到水果: 0
Tom 收到水果: 1
Tom 收到水果: 2
Tom 收到水果: 3
Tom 收到水果: 4
===主線程===
'''

 注: 生產者和消費者都沒有使用管道的某個端點,就應該將其關閉,如在生產者中關閉管道的右端,在消費者中關閉管道的左端。若是忘記執行這些步驟,程序可能再消費者中的recv()操做上掛起。管道是由操做系統進行引用計數的,必須在全部進程中關閉管道後才能生產EOFError異常。所以在生產者中關閉管道不會有任何效果,除非消費者中也關閉了相同的管道端點。

管道能夠用於雙向通訊,一般利用在客戶端/服務器中使用的請求/響應模型或遠程過程調用,就可使用管道編寫與進程交互的程序,以下:
from multiprocessing import Process,Pipe

import time,os
def adder(p,name):
    server,client=p
    client.close()
    while True:
        try:
            x,y=server.recv()
        except EOFError:
            server.close()
            break
        res=x+y
        server.send(res)
    print('server done')
if __name__ == '__main__':
    server,client=Pipe()

    c1=Process(target=adder,args=((server,client),'c1'))
    c1.start()

    server.close()

    client.send((10,20))
    print(client.recv())
    client.close()

    c1.join()
    print('主進程')
示例

注: send()和recv()方法使用pickle模塊對對象進行序列化。

 

1.6 進程間通訊方式三:共享數據

展望將來,基於消息傳遞的併發編程是大勢所趨,即使是使用線程,推薦作法也是將程序設計爲大量獨立的線程集合經過消息隊列交換數據。這樣極大地減小了對使用鎖定和其餘同步手段的需求,還能夠擴展到分佈式系統中。

注: 進程間通訊應該儘可能避免使用本節所講的共享數據的方式

進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的,雖然進程間數據獨立,但能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此。

from multiprocessing import Process,Manager
import os

def foo(name,d,l):
    l.append(os.getpid())
    d[name]=os.getpid()
if __name__ == '__main__':
    with Manager() as manager:
        d=manager.dict({'name':'shuke'})
        l=manager.list(['init',])

        p_l=[]
        for i in range(5):
            p=Process(target=foo,args=('p%s' %i,d,l))
            p.start()
            p_l.append(p)

        for p in p_l:
            p.join() #必須有join否則會報錯

        print(d)
        print(l)
'''
執行結果:
{'p0': 62792, 'p4': 63472, 'name': 'shuke', 'p1': 60336, 'p3': 62704, 'p2': 63196}
['init', 60336, 62704, 62792, 63196, 63472]
'''
示例
 

1.7 進程同步(鎖),信號量,事件...

模擬搶票(Lock-->互斥鎖)

# 文件db的內容爲:{"count":1}
# 注意必定要用雙引號,否則json沒法識別
from multiprocessing import Process,Lock
import json
import time
import random
import os

def work(filename,lock): #買票
    # lock.acquire()
    with lock:      # with語法下面的代碼塊執行完畢會自動釋放鎖
        with open(filename,encoding='utf-8') as f:
            dic=json.loads(f.read())
            # print('剩餘票數: %s' % dic['count'])
        if dic['count'] > 0:
            dic['count']-=1
            time.sleep(random.randint(1,3)) #模擬網絡延遲
            with open(filename,'w',encoding='utf-8') as f:
                f.write(json.dumps(dic))
            print('%s 購票成功' %os.getpid())
        else:
            print('%s 購票失敗' %os.getpid())
    # lock.release()

if __name__ == '__main__':
    lock=Lock()
    p_l=[]
    for i in range(5):
        p=Process(target=work,args=('db',lock))
        p_l.append(p)
        p.start()
    for p in p_l:
        p.join()

    print('主線程')

'''
執行結果:
63448 購票成功
13676 購票失敗
61668 購票失敗
63544 購票失敗
17816 購票失敗
主線程
'''
#互斥鎖 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 ,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去,若是指定信號量爲3,那麼來一我的得到一把鎖,計數加1,當計數等於3時,後面的人均須要等待。一旦釋放,就有人能夠得到一把鎖

#信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念

from multiprocessing import Process,Semaphore
import time,random

def go_wc(sem,user):
    sem.acquire()
    print('%s 佔到一個茅坑' %user)
    time.sleep(random.randint(0,3)) #模擬每一個人拉屎速度不同,0表明有的人蹲下就起來了
    sem.release()

if __name__ == '__main__':
    sem=Semaphore(5)
    p_l=[]
    for i in range(13):
        p=Process(target=go_wc,args=(sem,'user%s' %i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('============》')
信號量Semahpore(同線程同樣)
# python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。
# 事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。
clear:將「Flag」設置爲False
set:將「Flag」設置爲True

#_*_coding:utf-8_*_
#!/usr/bin/env python

from multiprocessing import Process,Event
import time,random

def car(e,n):
    while True:
        if not e.is_set(): #Flase
            print('\033[31m紅燈亮\033[0m,car%s等着' %n)
            e.wait()
            print('\033[32m車%s 看見綠燈亮了\033[0m' %n)
            time.sleep(random.randint(3,6))
            if not e.is_set():
                continue
            print('走你,car', n)
            break

def police_car(e,n):
    while True:
        if not e.is_set():
            print('\033[31m紅燈亮\033[0m,car%s等着' % n)
            e.wait(1)
            print('燈的是%s,警車走了,car %s' %(e.is_set(),n))
            break

def traffic_lights(e,inverval):
    while True:
        time.sleep(inverval)
        if e.is_set():
            e.clear() #e.is_set() ---->False
        else:
            e.set()

if __name__ == '__main__':
    e=Event()
    # for i in range(10):
    #     p=Process(target=car,args=(e,i,))
    #     p.start()

    for i in range(5):
        p = Process(target=police_car, args=(e, i,))
        p.start()
    t=Process(target=traffic_lights,args=(e,10))
    t.start()

    print('============》')
Event(同線程同樣)

 

1.8 進程池 星級: *****

何時使用進程池?

開多進程的目的是爲了併發,若是有多核,一般有幾個核就開幾個進程,進程開啓過多,效率反而會降低(開啓進程是須要佔用系統資源的,並且開啓多餘核數目的進程也沒法作到並行),但很明顯須要併發執行的任務要遠大於核數,這時咱們就能夠經過維護一個進程池來控制進程數目,好比httpd的進程模式,規定最小進程數和最大進程數...    

當被操做對象數目不大時,能夠直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但若是是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時能夠發揮進程池的功效。

對於遠程過程調用的高級應用程序而言,應該使用進程池,Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,就重用進程池中的進程。

注: 在利用Python進行系統管理的時候,特別是同時操做多個文件目錄,或者遠程控制多臺主機,並行操做能夠節約大量的時間。

Pool類(建立進程池)

Pool([numprocess  [,initializer [, initargs]]]):建立進程池

參數介紹:

numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值
initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
initargs:是要傳給initializer的參數組

方法介紹:

主要方法:

p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()
p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。
   
p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
p.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用
其餘方法:
方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法
obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
obj.ready():若是調用完成,返回True
obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
obj.wait([timeout]):等待結果變爲可用。

 應用

   提交任務,並在主進程中拿到結果(以前的Process是執行任務,結果放到隊列裏,如今能夠在主進程中直接拿到結果)
from multiprocessing import Pool
import time
def work(n):
    print('開工啦...')
    time.sleep(3)
    return n**2

if __name__ == '__main__':
    q=Pool()

    #異步apply_async用法:若是使用異步提交的任務,主進程須要使用jion,等待進程池內任務都處理完,而後能夠用get收集結果,不然,主進程結束,進程池可能還沒來得及執行,也就跟着一塊兒結束了
    res=q.apply_async(work,args=(2,))
    q.close()
    q.join() #join在close以後調用
    print(res.get())

    #同步apply用法:主進程一直等apply提交的任務結束後才繼續執行後續代碼
    # res=q.apply(work,args=(2,))
    # print(res)

使用進程池維護固定數目的進程

'''
Pool內的進程數默認是cpu核數,假設爲4(查看方法os.cpu_count())
開啓6個客戶端,會發現2個客戶端處於等待狀態
在每一個進程內查看pid,會發現pid使用爲4個,即多個客戶端公用4個進程
'''

from socket import *
from multiprocessing import Pool
import os

server = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8081))
server.listen(5)


def talk(conn, client_addr):
    print("進程PID: %s"%(os.getpid()))
    while True:
        try:
            msg = conn.recv(1024)
            if not msg: break
            conn.send(msg.upper())
        except Exception:
            break


if __name__ == '__main__':  # windows下start進程必定要寫到這下面
    p = Pool()      # 默認使用CPU的核數
    while True:
        conn,client_addr=server.accept()
        p.apply_async(talk,args=(conn,client_addr))
        # p.apply(talk,args=(conn,client_addr))   # #同步的話,則同一時間只有一個客戶端能訪問
server端
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8081))

while True:
    msg=input('>>:').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
client端
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8081))

while True:
    msg=input('>>:').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
client1端

 

回調函數(callback)  星級: *****

1. 不須要回調函數的場景

若是在主進程中等待進程池中全部任務都執行完畢後,再統一處理結果,則無需回調函數。

from multiprocessing import Pool
import time,random,os

def work(n):
    time.sleep(1)
    return n**2
if __name__ == '__main__':
    p=Pool()

    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,))
        res_l.append(res)

    p.close()
    p.join() #等待進程池中全部進程執行完畢

    nums=[]
    for res in res_l:
        nums.append(res.get()) #拿到全部結果
    print(nums) #主進程拿到全部的處理結果,能夠在主進程中進行統一進行處理

2.  回調函數的應用場景

進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數。

咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。

from multiprocessing import Pool
import time,random,os

def get_page(url):
    print('(進程 %s) 正在下載頁面 %s' %(os.getpid(),url))
    time.sleep(random.randint(1,3))
    return url #用url充當下載後的結果

def parse_page(page_content):
    print('<進程 %s> 正在解析頁面: %s' %(os.getpid(),page_content))
    time.sleep(1)
    return '{%s 回調函數處理結果:%s}' %(os.getpid(),page_content)


if __name__ == '__main__':
    urls=[
        'http://maoyan.com/board/1',
        'http://maoyan.com/board/2',
        'http://maoyan.com/board/3',
        'http://maoyan.com/board/4',
        'http://maoyan.com/board/5',
        'http://maoyan.com/board/7',

    ]
    # 要建立進程池中的進程數,若是省略,將默認使用cpu_count()的值
    p=Pool()            
    res_l=[]

    #異步的方式提交任務,而後把任務的結果交給callback處理
    #注意:會專門開啓一個進程來處理callback指定的任務(單獨的一個進程,並且只有一個)
    for url in urls:
        res=p.apply_async(get_page,args=(url,),callback=parse_page)
        res_l.append(res)

    #異步提交完任務後,主進程先關閉p(必須先關閉),而後再用p.join()等待全部任務結束(包括callback)
    p.close()
    p.join()
    print('{主進程 %s}' %os.getpid())

    #收集結果,發現收集的是get_page的結果
    #因此須要注意了:
    #1. 當咱們想要在將get_page的結果傳給parse_page處理,那麼就不須要i.get(),經過指定callback,就能夠將i.get()的結果傳給callback執行的任務
    #2. 當咱們想要在主進程中處理get_page的結果,那就須要使用i.get()獲取後,再進一步處理
    for i in res_l: #本例中,下面這兩步是多餘的
        callback_res=i.get()
        print(callback_res)

'''
打印結果:
(進程 52346) 正在下載頁面 http://maoyan.com/board/1
(進程 52347) 正在下載頁面 http://maoyan.com/board/2
(進程 52348) 正在下載頁面 http://maoyan.com/board/3
(進程 52349) 正在下載頁面 http://maoyan.com/board/4
(進程 52348) 正在下載頁面 http://maoyan.com/board/5
<進程 52345> 正在解析頁面: http://maoyan.com/board/3
(進程 52346) 正在下載頁面 http://maoyan.com/board/7
<進程 52345> 正在解析頁面: http://maoyan.com/board/1
<進程 52345> 正在解析頁面: http://maoyan.com/board/2
<進程 52345> 正在解析頁面: http://maoyan.com/board/4
<進程 52345> 正在解析頁面: http://maoyan.com/board/5
<進程 52345> 正在解析頁面: http://maoyan.com/board/7
{主進程 52345}
http://maoyan.com/board/1
http://maoyan.com/board/2
http://maoyan.com/board/3
http://maoyan.com/board/4
http://maoyan.com/board/5
http://maoyan.com/board/7
'''
from multiprocessing import Pool
import time,random
import requests
import re

def get_page(url,pattern):
    response=requests.get(url)
    if response.status_code == 200:
        return (response.text,pattern)

def parse_page(info):
    page_content,pattern=info
    res=re.findall(pattern,page_content)
    for item in res:
        dic={
            'index':item[0],
            'title':item[1],
            'actor':item[2].strip()[3:],
            'time':item[3][5:],
            'score':item[4]+item[5]

        }
        print(dic)
if __name__ == '__main__':
    pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)

    url_dic={
        'http://maoyan.com/board/7':pattern1,
    }

    p=Pool()
    res_l=[]
    for url,pattern in url_dic.items():
        res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
        res_l.append(res)

    for i in res_l:
        i.get()

    # res=requests.get('http://maoyan.com/board/7')
    # print(re.findall(pattern,res.text))
'''
執行結果:
{'actor': '阿米爾·汗,薩卡詩·泰瓦,法緹瑪·薩那·紗卡', 'index': '1', 'score': '9.8', 'title': '摔跤吧!爸爸', 'time': '2017-05-05'}
{'actor': '李微漪,亦風', 'index': '2', 'score': '9.3', 'title': '重返·狼羣', 'time': '2017-06-16'}
{'actor': '高強,於月仙,李玉峯', 'index': '3', 'score': '9.2', 'title': '忠愛無言', 'time': '2017-06-09'}
{'actor': '楊培,尼瑪扎堆,斯朗卓嘎', 'index': '4', 'score': '9.1', 'title': '岡仁波齊', 'time': '2017-06-20'}
{'actor': '約翰尼·德普,哈維爾·巴登,布蘭頓·思懷茲', 'index': '5', 'score': '8.9', 'title': '加勒比海盜5:死無對證', 'time': '2017-05-26'}
{'actor': '戴夫·帕特爾,魯妮·瑪拉,大衛·文翰', 'index': '6', 'score': '8.8', 'title': '雄獅', 'time': '2017-06-22'}
{'actor': '蔡卓妍,周柏豪,鍾欣潼', 'index': '7', 'score': '8.6', 'title': '原諒他77次', 'time': '2017-06-23'}
{'actor': '水田山葵,山新,大原惠美', 'index': '8', 'score': '8.6', 'title': '哆啦A夢:大雄的南極冰冰涼大冒險', 'time': '2017-05-30'}
{'actor': '蓋爾·加朵,克里斯·派恩,羅賓·懷特', 'index': '9', 'score': '8.6', 'title': '神奇女俠', 'time': '2017-06-02'}
{'actor': '範楚絨,洪海天,謝元真', 'index': '10', 'score': '8.5', 'title': '潛艇總動員之時光寶盒', 'time': '2015-05-29'}
'''
爬蟲應用

apply_async(非阻塞)apply(阻塞)的區別示例:

使用進程池(非阻塞,apply_async

from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    # time.sleep(1)
    return 'Bye Bye!'

if __name__ == "__main__":
    processes=4                 # 進程池的進程總數
    pool = Pool(processes)      # 實例化
    res_l=[]
    for i in range(5):
        msg = "hello 同窗%s" % str(i)
        res=pool.apply_async(func, args=(msg,))   # 維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去
        res_l.append(res)

    print("============= 我是分割線 =================")
    pool.close()        # 關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
    pool.join()         # 調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool進程池,join函數等待全部子進程結束
    print("Sub-process(es) done.")
    for i in res_l:
        print(res.get())

'''
執行結果:
============= 我是分割線 =================
msg: hello 同窗0
msg: hello 同窗1
msg: hello 同窗2
msg: hello 同窗3
msg: hello 同窗4
Sub-process(es) done.
Bye Bye!
Bye Bye!
Bye Bye!
Bye Bye!
Bye Bye!
'''
apply_async

使用進程池(阻塞,apply

from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    # time.sleep(1)
    return 'Bye Bye!'

if __name__ == "__main__":
    processes=4                 # 進程池的進程總數
    pool = Pool(processes)      # 實例化
    res_l=[]
    for i in range(5):
        msg = "hello 同窗%s" % str(i)
        res=pool.apply(func, args=(msg,))   # 維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去
        res_l.append(res)                   # 同步執行,即執行完一個拿到結果,再去執行另一個

    print("============= 我是分割線 =================")
    pool.close()        # 關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
    pool.join()         # 調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool進程池,join函數等待全部子進程結束
    print("Sub-process(es) done.")
    print(res_l)
    for i in res_l:     # apply是同步的,因此直接獲得結果,沒有get()方法
        print(res)

'''
執行結果:
msg: hello 同窗0
msg: hello 同窗1
msg: hello 同窗2
msg: hello 同窗3
msg: hello 同窗4
============= 我是分割線 =================
Sub-process(es) done.
['Bye Bye!', 'Bye Bye!', 'Bye Bye!', 'Bye Bye!', 'Bye Bye!']
Bye Bye!
Bye Bye!
Bye Bye!
Bye Bye!
Bye Bye!
'''
apply
#coding: utf-8
import multiprocessing
import os, time, random

def Lee():
    print("\nRun task Lee-%s" %(os.getpid())) #os.getpid()獲取當前的進程的ID
    start = time.time()
    time.sleep(random.random() * 10) #random.random()隨機生成0-1之間的小數
    end = time.time()
    print('Task Lee, runs %0.2f seconds.' %(end - start))

def Marlon():
    print("\nRun task Marlon-%s" %(os.getpid()))
    start = time.time()
    time.sleep(random.random() * 40)
    end=time.time()
    print('Task Marlon runs %0.2f seconds.' %(end - start))

def Allen():
    print("\nRun task Allen-%s" %(os.getpid()))
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print('Task Allen runs %0.2f seconds.' %(end - start))

def Frank():
    print("\nRun task Frank-%s" %(os.getpid()))
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print('Task Frank runs %0.2f seconds.' %(end - start))

def Egon():
    print("\nRun task Egon-%s" %(os.getpid()))
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print('Task Egon runs %0.2f seconds.' %(end - start))

def Lily():
    print("\nRun task Lily-%s" %(os.getpid()))
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print('Task Lily runs %0.2f seconds.' %(end - start))

if __name__=='__main__':
    function_list=  [Lee, Marlon, Allen, Frank, Egon, Lily]
    print("parent process %s" %(os.getpid()))

    pool=multiprocessing.Pool(4)
    for func in function_list:
        pool.apply_async(func)     #Pool執行函數,apply執行函數,當有一個進程執行完畢後,會添加一個新的進程到pool中

    print('Waiting for all subprocesses done...')
    pool.close()
    pool.join()    #調用join以前,必定要先調用close() 函數,不然會出錯, close()執行後不會有新的進程加入到pool,join函數等待素有子進程結束
    print('All subprocesses done.')

多個進程池
多個進程池

 

二. python併發編程之多線程

 

2.1 threading模塊

multiprocess模塊的接口徹底模仿了threading模塊的接口,兩者在使用層面,有很大的類似性,於是再也不詳細介紹。

2.1.1開啓線程的兩種方式(同Process)

# 方法一
from threading import Thread
import time


def sayhi(name):
    time.sleep(2)
    print('%s say hello!'% name)

if __name__ == '__main__':
    t = Thread(target=sayhi,args=('shuke',))
    t.start()
    print("=====我是分割線=====")
    print("主線程")
# 方法二
from threading import Thread
import time


class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print("%s say hello!"%self.name)

if __name__ == '__main__':
    t=Sayhi('shuke')
    t.start()
    print("=====我是分割線=====")
    print("主線程")

2.1.2 子線程與子進程的區別 

線程與進程的執行速度對比

1. 根據輸出結果對比

# 線程方式
from threading import Thread

def work():
    print("Hello python!")

if __name__ == '__main__':
    t = Thread(target=work)
    t.start()
    print('主線程/主進程')
'''
執行結果:
Hello python!
主線程/主進程
'''
線程
# 進程方式
from multiprocessing import Process

def work():
    print("Hello python!")

if __name__ == '__main__':
    t = Process(target=work)
    t.start()
    print('主線程/主進程')
'''
執行結果:
主線程/主進程
Hello python!
'''
進程
注: 對比執行結果,能夠看出線程的執行速度>進程的執行速度

2. 根據pid來進行對比

# 線程方式
# 在主進程下開啓多個線程,每一個線程都跟主進程的pid同樣
from threading import Thread
import os

def work():
    print("Pid: %s" % os.getpid())

if __name__ == '__main__':
    t1 = Thread(target=work)
    t2 = Thread(target=work)
    t3 = Thread(target=work)
    t1.start()
    t2.start()
    t3.start()
    print("主線程/主進程pid: %s" % os.getpid())
'''
執行結果:
Pid: 65652
Pid: 65652
Pid: 65652
主線程/主進程pid: 65652
'''
線程
# 進程方式
# 開多個進程,每一個進程都有不一樣的pid
from multiprocessing import Process
import os

def work():
    print("Pid: %s" % os.getpid())

if __name__ == '__main__':
    t1 = Process(target=work)
    t2 = Process(target=work)
    t3 = Process(target=work)
    t1.start()
    t2.start()
    t3.start()
    print('主線程/主進程pid: %s' % os.getpid())
'''
主線程/主進程pid: 20484
Pid: 5800
Pid: 67076
Pid: 62244
'''
進程

2.1.3 小小的練習

練習一: 多線程併發的socket服務端
#_*_coding:utf-8_*_
#!/usr/bin/env python
import multiprocessing
import threading

import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)

def action(conn):
    while True:
        data=conn.recv(1024)
        print(data)
        conn.send(data.upper())

if __name__ == '__main__':

    while True:
        conn,addr=s.accept()


        p=threading.Thread(target=action,args=(conn,))
        p.start()

多線程併發的socket服務端
服務端
#_*_coding:utf-8_*_
#!/usr/bin/env python


import socket

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if not msg:continue

    s.send(msg.encode('utf-8'))
    data=s.recv(1024)
    print(data)

客戶端
客戶端
練習二: 三個任務,一個接收用戶輸入,一個將用戶輸入的內容格式化成大寫,一個將格式化後的結果存入文件
from threading import Thread
msg_l=[]
format_l=[]
def talk():
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
        msg_l.append(msg)

def format_msg():
    while True:
        if msg_l:
            res=msg_l.pop()
            format_l.append(res.upper())

def save():
    while True:
        if format_l:
            with open('db.txt','a',encoding='utf-8') as f:
                res=format_l.pop()
                f.write('%s\n' %res)

if __name__ == '__main__':
    t1=Thread(target=talk)
    t2=Thread(target=format_msg)
    t3=Thread(target=save)
    t1.start()
    t2.start()
    t3.start()
示例

2.1.4 線程的join與setdaemon

與進程的方法相似,實際上是multiprocessing模仿threading的接口
from threading import Thread
import time

def work(name):
    time.sleep(2)
    print("%s say hello" % name)

if __name__ == '__main__':
    t = Thread(target=work,args=('shuke',))
    t.setDaemon(True)
    t.start()
    t.join()
    print("主線程")
    print(t.is_alive())
'''
執行結果:
shuke say hello
主線程
False
'''

2.1.5 線程的其餘方法補充

Thread實例對象的方法

  • isAlive(): 返回線程是否活動的。
  • getName(): 返回線程名。
  • setName(): 設置線程名。

threading模塊提供的一些方法

  • threading.currentThread(): 返回當前的線程變量。
  • threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
  • threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
from threading import Thread
import threading
import time

def work():
    time.sleep(2)
    print(threading.current_thread().getName())

# 在主進程下開啓線程
if __name__ == '__main__':
    t = Thread(target=work)
    t.start()
    print(threading.current_thread().getName())
    print(threading.current_thread())     # 主線程
    print(threading.enumerate())            # 連同主線程在內有兩個運行的線程
    print(threading.active_count())
    print("主線程/主進程")

'''
執行結果:
MainThread
<_MainThread(MainThread, started 67280)>
[<_MainThread(MainThread, started 67280)>, <Thread(Thread-1, started 64808)>]
2
主線程/主進程
Thread-1
'''

 2.1.6 線程池

參考文章:  http://www.cnblogs.com/tracylining/p/3471594.html

 

2.2  Python GIL(Global Interpreter Lock)

'''
定義:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple 
native threads from executing Python bytecodes at once. This lock is necessary mainly 
because CPython’s memory management is not thread-safe. (However, since the GIL 
exists, other features have grown to depend on the guarantees that it enforces.)
''' 結論: 在Cpython解釋器中,同一個進程下開啓的多線程,同一時刻只能有一個線程執行,沒法利用多核優點

 

 這篇文章透徹的剖析了GIL對python多線程的影響,強烈推薦看瀏覽:http://www.dabeaz.com/python/UnderstandingGIL.pdf 

 
此處只需知道: 有了GIL的存在,同一時刻統一進程中只有一個線程被執行。

需求:

咱們有四個任務須要處理,處理方式確定是要玩出併發的效果,解決方案能夠是:

方案一:開啓四個進程

方案二:一個進程下,開啓四個線程 

單核狀況下,分析結果: 

  若是四個任務是計算密集型,沒有多核來並行計算,方案一徒增了建立進程的開銷,方案二勝

  若是四個任務是I/O密集型,方案一建立進程的開銷大,且進程的切換速度遠不如線程,方案二勝

多核狀況下,分析結果:

  若是四個任務是計算密集型,多核意味着並行計算,在python中一個進程中同一時刻只有一個線程執行用不上多核,方案一勝

  若是四個任務是I/O密集型,再多的核也解決不了I/O問題,方案二勝

多進程適用於計算密集型任務,能夠開啓多進程來充分利用多核優點,同時併發處理任務。
多線程適用於IO密集型任務,線程之間的開銷小,切換速度快,處理速度提高,此時,多核的優點不能被利用。
結論: 如今的計算機基本上都是多核,python對於計算密集型的任務開多線程的效率並不能帶來多大性能上的提高,甚至不如串行(沒有大量切換),可是,對於IO密集型的任務效率仍是有顯著提高的。
#計算密集型
from threading import Thread
from multiprocessing import Process
import os
import time
def work():
    res=0
    for i in range(1000000):
        res+=i

if __name__ == '__main__':
    t_l=[]
    start_time=time.time()
    # for i in range(300): #串行
    #     work()

    for i in range(300):
        t=Thread(target=work) #在個人機器上,4核cpu,多線程大概15秒
        # t=Process(target=work) #在個人機器上,4核cpu,多進程大概10秒
        t_l.append(t)
        t.start()

    for i in t_l:
        i.join()

    stop_time=time.time()
    print('run time is %s' %(stop_time-start_time))

    print('主線程')
計算密集型
#I/O密集型
from threading import Thread
from multiprocessing import Process
import time
import os
def work():
    time.sleep(2) #模擬I/O操做,能夠打開一個文件來測試I/O,與sleep是一個效果
    print(os.getpid())

if __name__ == '__main__':
    t_l=[]
    start_time=time.time()
    for i in range(1000):
        t=Thread(target=work) #耗時大概爲2秒
        # t=Process(target=work) #耗時大概爲25秒,建立進程的開銷遠高於線程,並且對於I/O密集型,多cpu根本無論用
        t_l.append(t)
        t.start()

    for t in t_l:
        t.join()
    stop_time=time.time()
    print('run time is %s' %(stop_time-start_time))
I/O密集型 

應用:

多線程用於IO密集型,如socket,爬蟲,web
多進程用於計算密集型,如金融分析

 

 2.3 同步鎖

使用方式與進程鎖同樣
import time
import threading

num = 100  #設定一個共享變量
# R=threading.Lock()

def addNum():
    global num #在每一個線程中都獲取這個全局變量
    #num-=1
    # R.acquire()
    temp=num
    time.sleep(0.1)
    num =temp-1  # 對此公共變量進行-1操做
    # R.release()

thread_list = []

for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待全部線程執行完畢
    t.join()

print('Result: ', num)

 鎖一般被用來實現對共享資源的同步訪問。爲每個共享資源建立一個Lock對象,當你須要訪問該資源時,調用acquire方法來獲取鎖對象(若是其它線程已經得到了該鎖,則當前線程需等待其被釋放),待資源訪問完後,再調用release方法釋放鎖,以下所示:

import threading

R=threading.Lock()

R.acquire()
'''
對公共數據的操做
'''
R.release()

GIL VS Lock

Python已經有一個GIL來保證同一時間只能有一個線程來執行了,爲何這裏還須要lock? 

達成共識:鎖的目的是爲了保護共享的數據,同一時間只能有一個線程來修改共享的數據

得出結論:保護不一樣的數據就應該加不一樣的鎖。

最後,問題就很明朗了,GIL 與Lock是兩把鎖,保護的數據不同,前者是解釋器級別的(固然保護的就是解釋器級別的數據,好比垃圾回收的數據),後者是保護用戶本身開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock。GIL是解釋器級別的鎖,LOCK是應用程序級別(用戶級別)的鎖。

詳解:

由於Python解釋器會自動按期進行內存回收,能夠理解爲python解釋器裏有一個獨立的線程,每過一段時間它起wake up作一次全局輪詢看看哪些內存數據是能夠被清空的,此時本身的程序 裏的線程和 py解釋器本身的線程是併發運行的,假設線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程當中的clearing時刻,可能一個其它線程正好又從新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,爲了解決相似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題,  這能夠說是Python早期版本的遺留問題。 

 

2.4 死鎖與遞歸鎖

死鎖:  是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程,以下就是死鎖。
from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A鎖\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B鎖\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A鎖\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

'''
Thread-1 拿到A鎖
Thread-1 拿到B鎖
Thread-1 拿到B鎖
Thread-2 拿到A鎖
而後就卡住,死鎖了
'''
死鎖示例

解決方法,遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:

mutexA=mutexB=threading.RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1,這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,即counter遞減到0爲止

 

2.5 信號量Semahpore

Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。

實例:(同時只有5個線程能夠得到semaphore,便可以限制最大鏈接數爲5)
 
import threading
import time

semaphore = threading.Semaphore(5)  # 設置爲5,表示同一時刻能夠經過5個線程進行操做

def func():
    if semaphore.acquire():
        print (threading.currentThread().getName() + ' get semaphore')
        time.sleep(2)
        semaphore.release()

for i in range(20):
  t1 = threading.Thread(target=func)
  t1.start()

與進程池是徹底不一樣的概念,進程池Pool(4),最大隻能產生4個進程,並且從始至終同一時刻只有四個進程存在,不會產生新的,而信號量是產生一堆線程/進程,同一時刻能夠經過5個線程/進程進行數據操做。

 

2.6 事件Event

      線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就 會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行。(能夠結合實際生活中的紅綠燈進行理解)

event.isSet():  #返回event的狀態值;
event.wait():   #若是 event.isSet()==False將阻塞線程;
event.set():    #設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;
event.clear():  #恢復event的狀態值爲False。
 能夠考慮一種應用場景(僅僅做爲說明),例如,咱們有多個線程從Redis隊列中讀取數據來處理,這些線程都要嘗試去鏈接Redis的服務,通常狀況下,若是Redis鏈接不成功,在各個線程的代碼中,都會去嘗試從新鏈接。若是咱們想要在啓動時確保Redis服務正常,才讓那些工做線程去鏈接Redis服務器,那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做:主線程中會去嘗試鏈接Redis服務,若是正常的話,觸發事件,各工做線程會嘗試鏈接Redis服務。
import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)

def worker(event):
    logging.debug('Waiting for redis ready...')
    event.wait()
    logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
    time.sleep(1)

def main():
    readis_ready = threading.Event()
    t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
    t1.start()

    t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
    t2.start()

    logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
    time.sleep(3) # simulate the check progress
    readis_ready.set()

if __name__=="__main__":
    main()

不瞭解redis能夠參考mysql的例子(同樣的道理)
redis示例
from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    print('\033[41m%s 等待鏈接mysql。。。\033[0m' %threading.current_thread().getName())
    event.wait()
    print('\033[41mMysql初始化成功,%s開始鏈接。。。\033[0m' %threading.current_thread().getName())


def check_mysql():
    print('\033[43m正在檢查mysql。。。\033[0m')
    time.sleep(random.randint(1,3))
    event.set()
    time.sleep(random.randint(1,3))

if __name__ == '__main__':
    event=Event()
    t1=Thread(target=conn_mysql) #等待鏈接mysql
    t2=Thread(target=conn_mysql) #等待鏈接myqsl
    t3=Thread(target=check_mysql) #檢查mysql

    t1.start()
    t2.start()
    t3.start()
'''
執行結果:
Thread-1 等待鏈接mysql。。。
Thread-2 等待鏈接mysql。。。
正在檢查mysql。。。
Mysql初始化成功,Thread-1開始鏈接。。。
Mysql初始化成功,Thread-2開始鏈接。。。
'''
mysql示例
threading.Event的wait方法還接受一個超時參數,默認狀況下若是事件一致沒有發生,wait方法會一直阻塞下去,而加入這個超時參數以後,若是阻塞時間超過這個參數設定的值以後,wait方法會返回。對應於上面的應用場景,若是Redis服務器一致沒有啓動,咱們但願子線程可以打印一些日誌來不斷地提醒咱們當前沒有一個能夠鏈接的Redis服務,咱們就能夠經過設置這個超時參數來達成這樣的目的:
def conn_mysql():
    count=0
    while not e.is_set():
        print('%s 第 <%s> 次嘗試' %(threading.current_thread().getName(),count))
        count+=1
        e.wait(0.5)
    print('%s ready to conn mysql' %threading.current_thread().getName())
    time.sleep(1)
from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    while not event.is_set():
        print('\033[42m%s 等待鏈接mysql。。。\033[0m' %threading.current_thread().getName())
        event.wait(0.1)
    print('\033[42mMysql初始化成功,%s開始鏈接。。。\033[0m' %threading.current_thread().getName())

def check_mysql():
    print('\033[41m正在檢查mysql。。。\033[0m')
    time.sleep(random.randint(1,3))
    event.set()
    time.sleep(random.randint(1,3))

if __name__ == '__main__':
    event=Event()
    t1=Thread(target=conn_mysql)
    t2=Thread(target=conn_mysql)
    t3=Thread(target=check_mysql)

    t1.start()
    t2.start()
    t3.start()

修訂上述mysql版本
完善mysql示例

這樣,咱們就能夠在等待Redis服務啓動的同時,看到工做線程里正在等待的狀況。

應用: 鏈接池

 

2.7 條件Condition(瞭解)

使得線程等待,只有知足某條件時,才釋放n個線程

import threading
 
def run(n):
    con.acquire()
    con.wait()
    print("run the thread: %s" %n)
    con.release()
 
if __name__ == '__main__':
 
    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()
 
    while True:
        inp = input('>>>')
        if inp == 'q':
            break
        con.acquire()
        con.notify(int(inp))
        con.release()
def condition_func():

    ret = False
    inp = input('>>>')
    if inp == '1':
        ret = True

    return ret


def run(n):
    con.acquire()
    con.wait_for(condition_func)
    print("run the thread: %s" %n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()
示例

 

2.8 定時器Timer

定時器,指定n秒後執行某操做

from threading import Timer

def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

 

2.9 線程queue

queue隊列 :使用import queue,用法與進程Queue同樣

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

1. class queue.Queue(maxsize=0) #先進先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(先進先出):
first
second
third
'''

2. class queue.LifoQueue(maxsize=0) #last in fisrt out 

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(後進先出):
third
second
first
'''

3. class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
結果(數字越小優先級越高,優先級高的優先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''

 

2.10 Python標準模塊--concurrent.futures

concurrent.futures模塊是在Python3.2中添加的。根據Python的官方文檔,concurrent.futures模塊提供給開發者一個執行異步調用的高級接口。concurrent.futures基本上就是在Python的threading和multiprocessing模塊之上構建的抽象層,更易於使用。儘管這個抽象層簡化了這些模塊的使用,可是也下降了不少靈活性,因此若是你須要處理一些定製化的任務,concurrent.futures或許並不適合你。

concurrent.futures包括抽象類Executor,它並不能直接被使用,因此你須要使用它的兩個子類:ThreadPoolExecutor或者ProcessPoolExecutor。正如你所猜的,這兩個子類分別對應着Python的threading和multiprocessing接口。這兩個子類都提供了池,你能夠將線程或者進程放入其中。

https://docs.python.org/dev/library/concurrent.futures.html

 

三. 協程

協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。

須要強調的是:

  1. python的線程屬於內核級別的,即由操做系統控制調度(如單線程一旦遇到io就被迫交出cpu執行權限,切換其餘線程運行)
  2. 單線程內開啓協程,一旦遇到io,從應用程序級別(而非操做系統)控制切換

對比操做系統控制線程的切換,用戶在單線程內控制協程的切換,優勢以下:

  1. 協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級
  2. 單線程內就能夠實現併發的效果,最大限度地利用cpu

要實現協程,關鍵在於用戶程序本身控制程序切換,切換以前必須由用戶程序本身保存協程上一次調用時的狀態,如此,每次從新調用時,可以從上次的位置繼續執行

(詳細的:協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧)

爲此,咱們以前已經學習過一種在單線程下能夠保存程序運行狀態的方法,即yield,咱們來簡單複習一下:

  1. yiled能夠保存狀態,yield的狀態保存與操做系統的保存線程狀態很像,可是yield是代碼級別控制的,更輕量級
  2. send能夠把一個函數的結果傳給另一個函數,以此實現單線程內程序之間的切換 
#不用yield:每次函數調用,都須要重複開闢內存空間,即重複建立名稱空間,於是開銷很大
import time
def consumer(item):
    # print('拿到包子%s' %item)
    x=11111111111
    x1=12111111111
    x3=13111111111
    x4=14111111111
    y=22222222222
    z=33333333333
    pass

def producer(target,seq):
    for item in seq:
        target(item)    # 每次調用函數,會臨時產生名稱空間,調用結束則釋放,循環100000000次,則重複這麼屢次的建立和釋放,開銷很是大

start_time=time.time()
producer(consumer,range(100000000))
stop_time=time.time()
print('run time is:%s' %(stop_time-start_time))     # 13.474999904632568


#使用yield:無需重複開闢內存空間,即重複建立名稱空間,於是開銷小
import time
def init(func):
    def wrapper(*args,**kwargs):
        g=func(*args,**kwargs)
        next(g)
        return g
    return wrapper

@init
def consumer():
    x=11111111111
    x1=12111111111
    x3=13111111111
    x4=14111111111
    y=22222222222
    z=33333333333
    while True:
        item=yield
        # print('拿到包子%s' %item)
        pass
def producer(target,seq):
    for item in seq:
        target.send(item) #無需從新建立名稱空間,從上一次暫停的位置繼續,相比上例,開銷小

start_time=time.time()
producer(consumer(),range(100000000))
stop_time=time.time()
print('run time is:%s' %(stop_time-start_time))     # 10.674000024795532
示例說明

協程的定義(知足1,2,3就可稱爲協程):

  1. 必須在只有一個單線程裏實現併發
  2. 修改共享數據不需加鎖
  3. 用戶程序裏本身保存多個控制流的上下文棧
  4. 附加:一個協程遇到IO操做自動切換到其它協程(如何實現檢測IO,yield、greenlet都沒法實現,就用到了gevent模塊(select機制)

缺點:

協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程。

協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程。

 

四. 協程模塊greenlet

 greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可使你在任意函數之間隨意切換,而不需把這個函數先聲明爲generator。

from greenlet import greenlet

def test1():
    print('test1,first')
    gr2.switch()
    print('test1,sencod')
    gr2.switch()
def test2():
    print('test2,first')
    gr1.switch()
    print('test2,sencod')


gr1=greenlet(test1)
gr2=greenlet(test2)
gr1.switch()
'''
執行結果:
test1,first
test2,first
test1,sencod
test2,sencod
'''
from greenlet import greenlet
def eat(name):
    print('%s eat fruit apple' %name)
    gr2.switch('shuke')
    print('%s eat fruit banana' %name)
    gr2.switch()
def play_phone(name):
    print('%s play basketbal' %name)
    gr1.switch()
    print('%s play football' %name)

gr1=greenlet(eat)
gr2=greenlet(play_phone)
gr1.switch(name='jack')  #能夠在第一次switch時傳入參數,之後都不須要

'''
執行結果:
jack eat fruit apple
shuke play basketbal
jack eat fruit banana
shuke play football
'''
switch傳參

單純的切換(在沒有io的狀況下或者沒有重複開闢內存空間的操做),反而會下降程序的執行速度。

#順序執行
import time
def f1():
    res=0
    for i in range(10000000):
        res+=i

def f2():
    res=0
    for i in range(10000000):
        res*=i


start_time=time.time()
f1()
f2()
stop_time=time.time()
print('run time is: %s' %(stop_time-start_time)) #1.7395639419555664


#切換
from greenlet import greenlet
import time
def f1():
    res=0
    for i in range(10000000):
        res+=i
        gr2.switch()


def f2():
    res=0
    for i in range(10000000):
        res*=i
        gr1.switch()

gr1=greenlet(f1)
gr2=greenlet(f2)

start_time=time.time()
gr1.switch()
stop_time=time.time()
print('run time is: %s' %(stop_time-start_time)) #7.789067983627319
示例

greenlet只是提供了一種比generator更加便捷的切換方式,仍然是沒有解決遇到IO自動切換的問題。

 

五. gevent模塊(單線程併發)

Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。

g1=gevent.spawn()建立一個協程對象g1,spawn括號內第一個參數是函數名,如eat,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數eat的。

遇到IO阻塞時會自動切換任務

import gevent

def eat():
    print('eat food 1')
    gevent.sleep(2)     # 等飯來
    print('eat food 2')

def play_phone():
    print('play phone 1')
    gevent.sleep(1)     # 網卡了
    print('play phone 2')

# gevent.spawn(eat)
# gevent.spawn(play_phone)
# print('主')  # 直接結束

# 於是也須要join方法,進程或線程的jion方法只能join一個,而gevent的join方法能夠join多個
g1=gevent.spawn(eat)
g2=gevent.spawn(play_phone)
gevent.joinall([g1,g2])
print('')

'''
執行結果:
eat food 1
play phone 1
play phone 2
eat food 2
主
'''

注: 上例中gevent.sleep(2)模擬的是gevent能夠識別的io阻塞,而time.sleep(2)或其餘的阻塞,gevent是不能直接識別的,此時就須要進行打補丁,將阻塞設置爲gevent能夠識別的IO阻塞。

一般的寫法爲,在文件的開頭,以下

from gevent import monkey;monkey.patch_all()
import gevent
import time
from gevent import monkey;monkey.patch_all()

import gevent
import time


def eat():
    print('eat food 1')
    time.sleep(2)
    print('eat food 2')

def play_phone():
    print('play phone 1')
    time.sleep(1)
    print('play phone 2')



g1=gevent.spawn(eat)
g2=gevent.spawn(play_phone)
gevent.joinall([g1,g2])
print('')
示例

同步與異步

概念:

同步和異步的概念對於不少人來講是一個模糊的概念,是一種彷佛只能意會不能言傳的東西。其實咱們的生活中存在着不少同步異步的例子。好比:你叫我去吃飯,我聽到了就馬上和你去吃飯,若是咱們有聽到,你就會一直叫我,直到我聽見和你一塊兒去吃飯,這個過程叫同步;異步過程指你叫我去吃飯,而後你就去吃飯了,而無論我是否和你一塊兒去吃飯。而我獲得消息後可能當即就走,也可能過段時間再走。若是我請你吃飯,就是同步,若是你請我吃飯就用異步,這樣你比較省錢。哈哈哈。。。

import gevent

def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(0.5)
    print('Task %s done' % pid)

def synchronous():
    for i in range(1, 10):
        task(i)

def asynchronous():
    threads = [gevent.spawn(task, i) for i in range(10)]
    gevent.joinall(threads)

print('Synchronous:')
synchronous()       # 同步

print('Asynchronous:')
asynchronous()      # 異步

上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此列表被傳給gevent.joinall 函數,後者阻塞當前流程,並執行全部給定的greenlet。執行流程只會在 全部greenlet執行完後纔會繼續向下走。

#gevent線程的一些用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)
g2=gevent.spawn(func2)
g1.join() #等待g1結束
g2.join() #等待g2結束
#或者上述兩步合做一步:gevent.joinall([g1,g2])
g1.value#拿到func1的返回值
from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time

def get_page(url):
    print('GET: %s' %url)
    response=requests.get(url)
    if response.status_code == 200:
        print('%d bytes received from %s' %(len(response.text),url))


start_time=time.time()
gevent.joinall([
    gevent.spawn(get_page,'https://www.python.org/'),
    gevent.spawn(get_page,'https://www.yahoo.com/'),
    gevent.spawn(get_page,'https://github.com/'),
])
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))

協程應用:爬蟲
協程應用(爬蟲)

經過gevent實現單線程下的socket併發(from gevent import monkey;monkey.patch_all()必定要放到導入socket模塊以前,不然gevent沒法識別socket的阻塞)

from gevent import monkey;monkey.patch_all()
from socket import *
import gevent

#若是不想用money.patch_all()打補丁,能夠用gevent自帶的socket
# from gevent import socket
# s=socket.socket()

def server(server_ip,port):
    s=socket(AF_INET,SOCK_STREAM)
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    s.bind((server_ip,port))
    s.listen(5)
    while True:
        conn,addr=s.accept()
        gevent.spawn(talk,conn,addr)

def talk(conn,addr):
    try:
        while True:
            res=conn.recv(1024)
            print('client %s:%s msg: %s' %(addr[0],addr[1],res))
            conn.send(res.upper())
    except Exception as e:
        print(e)
    finally:
        conn.close()

if __name__ == '__main__':
    server('127.0.0.1',8080)
服務端
#_*_coding:utf-8_*_
__author__ = 'Linhaifeng'

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
客戶端
from threading import Thread
from socket import *
import threading

def client(server_ip,port):
    c=socket(AF_INET,SOCK_STREAM)
    c.connect((server_ip,port))

    count=0
    while True:
        c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
        msg=c.recv(1024)
        print(msg.decode('utf-8'))
        count+=1
if __name__ == '__main__':
    for i in range(500):
        t=Thread(target=client,args=('127.0.0.1',8080))
        t.start()
多線程併發多個客戶端

 

六. 綜合應用

簡單主機批量管理工具

需求:

  1. 主機分組
  2. 主機信息配置文件用configparser解析
  3. 可批量執行命令、發送文件,結果實時返回,執行格式以下 
    1. batch_run  -h h1,h2,h3   -g web_clusters,db_servers    -cmd  "df -h" 
    2. batch_scp   -h h1,h2,h3   -g web_clusters,db_servers  -action put  -local test.py  -remote /tmp/ 
  4. 主機用戶名密碼、端口能夠不一樣
  5. 執行遠程命令使用paramiko模塊
  6. 批量命令需使用multiprocessing併發
code: https://github.com/shuke163/learnpy/tree/master/homework/day09/managetool 
 
 
 
 
 
 
 
對比學習參考:
2.   http://www.cnblogs.com/linhaifeng/articles/6817679.html