週末班:Python基礎之併發編程

進程

相關概念

進程

進程(Process)是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操做系統結構的基礎。在早期面向進程設計的計算機結構中,進程是程序的基本執行實體;在當代面向線程設計的計算機結構中,進程是線程的容器。程序是指令、數據及其組織形式的描述,進程是程序的實體。html

狹義定義:進程是正在運行的程序的實例(an instance of a computer program that is being executed)。python

廣義定義:進程是一個具備必定獨立功能的程序關於某個數據集合的一次運行活動。它是操做系統動態執行的基本單元,在傳統的操做系統中,進程既是基本的分配單元,也是基本的執行單元。linux

同步/異步

所謂同步就是一個任務的完成須要依賴另一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成,這是一種可靠的任務序列。要麼成功都成功,失敗都失敗,兩個任務的狀態能夠保持一致。git

所謂異步是不須要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工做,依賴的任務也當即執行,只要本身完成了整個任務就算完成了。至於被依賴的任務最終是否真正完成,依賴它的任務沒法肯定,因此它是不可靠的任務序列。程序員

阻塞/非阻塞

阻塞和非阻塞這兩個概念與程序等待消息通知(無所謂同步或者異步)時的狀態有關。也就是說阻塞與非阻塞主要是程序(線程)等待消息通知時的狀態角度來講的。github

併發/並行

並行 : 並行是指多個任務同時執行,好比兩個男人同時在給本身女友發微信。算法

併發 : 併發是多個任務交替輪流使用資源,好比一個男人在給他7個女友發微信,只要他發的夠快,宏觀上來講他在同時聊7我的。編程

進程狀態與調度

在瞭解其餘概念以前,咱們首先要了解進程的幾個狀態。在程序運行的過程當中,因爲被操做系統的調度算法控制,程序會進入幾個狀態:就緒,運行和阻塞。json

(1)就緒(Ready)狀態小程序

當進程已分配到除CPU之外的全部必要的資源,只要得到處理機即可當即執行,這時的進程狀態稱爲就緒狀態。

(2)執行/運行(Running)狀態當進程已得到處理機,其程序正在處理機上執行,此時的進程狀態稱爲執行狀態。

(3)阻塞(Blocked)狀態正在執行的進程,因爲等待某個事件發生而沒法執行時,便放棄處理機而處於阻塞狀態。引發進程阻塞的事件可有多種,例如,等待I/O完成、申請緩衝區不能知足、等待信件(信號)等。

Python中使用多進程

以前咱們已經瞭解了不少進程相關的理論知識,瞭解進程是什麼應該再也不困難了,剛剛咱們已經瞭解了,運行中的程序就是一個進程。全部的進程都是經過它的父進程來建立的。所以,運行起來的python程序也是一個進程,那麼咱們也能夠在程序中再建立進程。多個進程能夠實現併發效果,也就是說,當咱們的程序中存在多個進程的時候,在某些時候,就會讓程序的執行速度變快。以咱們以前所學的知識,並不能實現建立進程這個功能,因此咱們就須要藉助python中強大的模塊。

multiprocessing模塊

仔細說來,multiprocess不是一個模塊而是python中一個操做、管理進程的包。 之因此叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和進程有關的全部子模塊。因爲提供的子模塊很是多,爲了方便你們歸類記憶,我將這部分大體分爲四個部分:建立進程部分,進程同步部分,進程池部分,進程之間數據共享。

multiprocessing.Process介紹

process模塊是一個建立進程的模塊,藉助這個模塊,就能夠完成進程的建立。

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)

強調:
1. 須要使用關鍵字的方式來指定參數
2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號

參數介紹:
1 group參數未使用,值始終爲None
2 target表示調用對象,即子進程要執行的任務
3 args表示調用對象的位置參數元組,args=(1,2,'egon',)
4 kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
5 name爲子進程的名稱
1 p.start():啓動進程,並調用該子進程中的p.run()
2 p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法
3 p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
4 p.is_alive():若是p仍然運行,返回True
5 p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程 
方法介紹
1 p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置
2 p.name:進程的名稱
3 p.pid:進程的pid
4 p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)
5 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)
屬性介紹
在Windows操做系統中因爲沒有fork(linux操做系統中建立進程的機制),在建立子進程的時候會自動 import 啓動它的這個文件,而在 import 的時候又執行了整個文件。所以若是將process()直接寫在文件中就會無限遞歸建立子進程報錯。因此必須把建立子進程的部分使用if __name__ ==‘__main__’ 判斷保護起來,import 的時候  ,就不會遞歸運行了。
在windows中使用process模塊的注意事項

使用process模塊建立進程

在一個python進程中開啓子進程,start方法和併發效果。

from multiprocessing import Process
import time


def task(name):
    print('{} is running!'.format(name))
    time.sleep(3)
    print('{} is done!'.format(name))


# Windows開子進程要寫在__name__==__main__下
# 由於開子進程會從新加載父進程的內容
if __name__ == '__main__':
    # 建立一個Python中的進程對象
    p = Process(target=task, args=('t1', ))
    # p = Process(target=task, kwargs={'name': 't1'})
    p.start()  # 調用操做系統接口啓動一個進程執行命令
    print('--- 主進程 ----')

查看主進程id和父進程id:

from multiprocessing import Process
import time
import os


def task(name):
    print('{} is running!'.format(name))
    print('子進程id :', os.getpid(), '父進程id :', os.getppid())
    time.sleep(3)
    print('{} is done!'.format(name))


# Windows開子進程要寫在__name__==__main__下
# 由於開子進程會從新加載父進程的內容
if __name__ == '__main__':
    print('主進程id :', os.getpid())
    # 建立一個Python中的進程對象
    p = Process(target=task, args=('t1', ))
    # p = Process(target=task, kwargs={'name': 't1'})
    p.start()  # 調用操做系統接口啓動一個進程執行命令
    print('--- 主進程 ----')
查看進程id

咱們再啓動多個進程看一下,運行的效果:(注意,子進程的執行順序不是根據啓動順序決定的)

from multiprocessing import Process
import time


def task(name):
    print('{} is running!'.format(name))
    time.sleep(3)
    print('{} is done!'.format(name))


if __name__ == '__main__':
    for i in range(10):
        p = Process(target=task, args=(i, ))
        p.start()
    print('--- 主進程 ----')

進程間的數據隔離

進程的運行時數據是互相獨立的,不會相互影響。

想個辦法,驗證一下:

from multiprocessing import Process
import time

x = 100


def change():
    global x
    x = 10
    print('子進程修改了x,子進程結束了!')


if __name__ == '__main__':
    p = Process(target=change)
    p.start()
    time.sleep(5)  # 想辦法等子進程結束 此處應該有思考
    print(x)

上面的例子中能夠看到,父進程中的X變量並無被子進程中的代碼修改掉。

join

接下來咱們看一下上面代碼中的問題,也就是咱們應該如何優雅的實現父進程等待子進程結束呢?

這裏就用到了Python中進程對象的join()。

from multiprocessing import Process

x = 100


def change():
    global x
    x = 10
    print('子進程修改了x,子進程結束了!')


if __name__ == '__main__':
    p = Process(target=change)
    p.start()
    p.join()  # 優雅地實現主進程等待子進程結束
    print(x)

再來幾個多線程的例子,驗證一下:

from multiprocessing import Process
import time


def task(n):
    print('這是子進程:{}'.format(n))
    time.sleep(n)
    print('子進程:{}結束了!'.format(n))


if __name__ == '__main__':
    start_time = time.time()
    p1 = Process(target=task, args=(1,))
    p2 = Process(target=task, args=(2,))
    p3 = Process(target=task, args=(3,))
    p1.start()
    p2.start()
    p3.start()

    p1.join()
    p2.join()
    p3.join()
    print('我是主進程')
    print('共耗時:{}'.format(time.time()-start_time))

或者寫成下面這樣的簡寫方式:

from multiprocessing import Process
import time


def task(n):
    print('這是子進程:{}'.format(n))
    time.sleep(n)
    print('子進程:{}結束了!'.format(n))


if __name__ == '__main__':
    start_time = time.time()
    # p1 = Process(target=task, args=(1,))
    # p2 = Process(target=task, args=(2,))
    # p3 = Process(target=task, args=(3,))
    # p1.start()
    # p2.start()
    # p3.start()
    #
    # p1.join()
    # p2.join()
    # p3.join()
    # 或者簡寫成下面的方式
    p_list = []
    for i in range(1, 4):
        p = Process(target=task, args=(i,))
        p.start()
        p_list.append(p)
    for p in p_list:
        p.join()
    print('我是主進程')
    print('共耗時:{}'.format(time.time()-start_time))
for循環起進程

思考一下:

下面的代碼有何不可?

from multiprocessing import Process
import time


def task(n):
    print('這是子進程:{}'.format(n))
    time.sleep(n)
    print('子進程:{}結束了!'.format(n))


if __name__ == '__main__':
    start_time = time.time()
    p1 = Process(target=task, args=(1,))
    p2 = Process(target=task, args=(2,))
    p3 = Process(target=task, args=(3,))
    p1.start()
    p1.join()
    p2.start()
    p2.join()
    p3.start()
    p3.join()

    print('我是主進程')
    print('共耗時:{}'.format(time.time() - start_time))
這樣寫的問題是?

除了上面這些開啓進程的方法,還有一種以繼承Process類的形式開啓進程的方式

import os
from multiprocessing import Process


class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print(os.getpid())
        print('%s 正在和女主播聊天' %self.name)

p1=MyProcess('wupeiqi')
p2=MyProcess('yuanhao')
p3=MyProcess('nezha')

p1.start() #start會自動調用run
p2.start()
# p2.run()
p3.start()


p1.join()
p2.join()
p3.join()

print('主線程')
經過繼承Process類開啓進程

守護進程

父進程中將一個子進程設置爲守護進程,那麼這個子進程會隨着主進程的結束而結束。

主進程建立守護進程

其一:守護進程會在主進程代碼執行結束後就終止

其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children

注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止

import os
import time
from multiprocessing import Process

class Myprocess(Process):
    def __init__(self,person):
        super().__init__()
        self.person = person
    def run(self):
        print(os.getpid(),self.name)
        print('%s正在和女主播聊天' %self.person)


p=Myprocess('哪吒')
p.daemon=True #必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程代碼執行結束,p即終止運行
p.start()
time.sleep(10) # 在sleep時查看進程id對應的進程ps -ef|grep id
print('')
守護進程的啓動
from multiprocessing import Process

def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


p1=Process(target=foo)
p2=Process(target=bar)

p1.daemon=True
p1.start()
p2.start()
time.sleep(0.1)
print("main-------")#打印該行則主進程代碼結束,則守護進程p1應該被終止.#可能會有p1任務執行的打印信息123,由於主進程打印main----時,p1也執行了,可是隨即被終止.
主進程代碼執行結束守護進程當即結束

socket聊天併發實例

from socket import *
from multiprocessing import Process

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn,client_addr):
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__': #windows下start進程必定要寫到這下面
    while True:
        conn,client_addr=server.accept()
        p=Process(target=talk,args=(conn,client_addr))
        p.start()
使用多進程實現socket聊天併發-server
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
client端

多進程中的其餘方法

from multiprocessing import Process
import time
import random

class Myprocess(Process):
    def __init__(self,person):
        self.name=person
        super().__init__()

    def run(self):
        print('%s正在和網紅臉聊天' %self.name)
        time.sleep(random.randrange(1,5))
        print('%s還在和網紅臉聊天' %self.name)


p1=Myprocess('哪吒')
p1.start()

p1.terminate()#關閉進程,不會當即關閉,因此is_alive馬上查看的結果可能仍是存活
print(p1.is_alive()) #結果爲True

print('開始')
print(p1.is_alive()) #結果爲False
進程對象的其餘方法:terminate,is_alive
 1 class Myprocess(Process):
 2     def __init__(self,person):
 3         self.name=person   # name屬性是Process中的屬性,標示進程的名字
 4         super().__init__() # 執行父類的初始化方法會覆蓋name屬性
 5         #self.name = person # 在這裏設置就能夠修改進程名字了
 6         #self.person = person #若是不想覆蓋進程名,就修改屬性名稱就能夠了
 7     def run(self):
 8         print('%s正在和網紅臉聊天' %self.name)
 9         # print('%s正在和網紅臉聊天' %self.person)
10         time.sleep(random.randrange(1,5))
11         print('%s正在和網紅臉聊天' %self.name)
12         # print('%s正在和網紅臉聊天' %self.person)
13
14
15 p1=Myprocess('哪吒')
16 p1.start()
17 print(p1.pid)    #能夠查看子進程的進程id
進程對象的其餘屬性:pid和name

互斥鎖

經過剛剛的學習,咱們想方設法實現了程序的異步,讓多個任務能夠同時在幾個進程中併發處理,他們之間的運行沒有順序,一旦開啓也不受咱們控制。儘管併發編程讓咱們能更加充分的利用IO資源,可是也給咱們帶來了新的問題。

當多個進程使用同一份數據資源的時候,就會由於競爭而引起數據安全或順序混亂問題。

互斥鎖介紹

下面的代碼演示了不一樣的任務爭搶一個資源(終端輸出)的場景。

from multiprocessing import Process
import time
import random


def task1():
    print('這是 task1 任務'.center(30, '-'))
    print('task1 進了洗手間')
    time.sleep(random.randint(1, 3))
    print('task1 辦事呢...')
    time.sleep(random.randint(1, 3))
    print('task1 走出了洗手間')


def task2():
    print('這是 task2 任務'.center(30, '-'))
    print('task2 進了洗手間')
    time.sleep(random.randint(1, 3))
    print('task2 辦事呢...')
    time.sleep(random.randint(1, 3))
    print('task2 走出了洗手間')


def task3():
    print('這是 task3 任務'.center(30, '-'))
    print('task3 進了洗手間')
    time.sleep(random.randint(1, 3))
    print('task3 辦事呢...')
    time.sleep(random.randint(1, 3))
    print('task3 走出了洗手間')


if __name__ == '__main__':
    p1 = Process(target=task1)
    p2 = Process(target=task2)
    p3 = Process(target=task3)

    p1.start()
    p2.start()
    p3.start()
進程之間競爭資源

經過加鎖來控制。

from multiprocessing import Process, Lock
import time
import random

# 生成一個互斥鎖
mutex_lock = Lock()


def task1(lock):
    # 鎖門
    lock.acquire()
    print('這是 task1 任務'.center(30, '-'))
    print('task1 進了洗手間')
    time.sleep(random.randint(1, 3))
    print('task1 辦事呢...')
    time.sleep(random.randint(1, 3))
    print('task1 走出了洗手間')
    # 釋放鎖
    lock.release()


def task2(lock):
    # 鎖門
    lock.acquire()
    print('這是 task2 任務'.center(30, '-'))
    print('task2 進了洗手間')
    time.sleep(random.randint(1, 3))
    print('task2 辦事呢...')
    time.sleep(random.randint(1, 3))
    print('task2 走出了洗手間')
    # 釋放鎖
    lock.release()


def task3(lock):
    # 鎖門
    lock.acquire()
    print('這是 task3 任務'.center(30, '-'))
    print('task3 進了洗手間')
    time.sleep(random.randint(1, 3))
    print('task3 辦事呢...')
    time.sleep(random.randint(1, 3))
    print('task3 走出了洗手間')
    # 釋放鎖
    lock.release()


if __name__ == '__main__':
    p1 = Process(target=task1, args=(mutex_lock, ))
    p2 = Process(target=task2, args=(mutex_lock, ))
    p3 = Process(target=task3, args=(mutex_lock, ))

    # 釋放新建進程的信號,具體誰先啓動沒法肯定
    p1.start()
    p2.start()
    p3.start()
使用互斥鎖解決競爭

上面這種狀況雖然使用加鎖的形式實現了順序的執行,可是程序又從新變成串行了,這樣確實會浪費了時間,卻保證了數據的安全。

互斥鎖示例

接下來,咱們以模擬搶票爲例,來看看數據安全的重要性。

from multiprocessing import Process, Lock
import json
import time
import random
import os


def search():
    time.sleep(0.5)
    with open('db.json', 'r', encoding='utf8') as f:
        data = json.load(f)
        print('剩餘票數:{}'.format(data.get('count')))


def buy():

    with open('db.json', 'r', encoding='utf8') as f:
        data = json.load(f)
    if data.get('count', 0) > 0:
        data['count'] -= 1
        time.sleep(random.randint(1, 3))
        with open('db.json', 'w', encoding='utf8') as f2:
            json.dump(data, f2)
        print('{}購票成功!'.format(os.getpid()))
    else:
        print('購票失敗')


def task():
    search()  # 查票併發
    buy()  # 串行買票


if __name__ == '__main__':
    for i in range(10):
        p = Process(target=task)
        p.start()
多進程同時搶票

使用互斥鎖,保證數據安全。

from multiprocessing import Process, Lock
import json
import time
import random
import os

# 設置互斥鎖
mutex_lock = Lock()


def search():
    time.sleep(0.5)
    with open('db.json', 'r', encoding='utf8') as f:
        data = json.load(f)
        print('剩餘票數:{}'.format(data.get('count')))


def buy():

    with open('db.json', 'r', encoding='utf8') as f:
        data = json.load(f)
    if data.get('count', 0) > 0:
        data['count'] -= 1
        time.sleep(random.randint(1, 3))
        with open('db.json', 'w', encoding='utf8') as f2:
            json.dump(data, f2)
        print('{}購票成功!'.format(os.getpid()))
    else:
        print('購票失敗')


def task(lock):
    search()  # 查票併發
    lock.acquire()
    buy()  # 串行買票
    lock.release()


if __name__ == '__main__':
    for i in range(10):
        p = Process(target=task, args=(mutex_lock, ))
        p.start()
互斥鎖版搶票  

一點思考

加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,在犧牲速度的前提下保證數據安全。
雖然能夠用文件共享數據實現進程間通訊,但問題是:
1. 效率低(共享數據基於文件,而文件是硬盤上的數據)
2. 須要本身加鎖處理

所以咱們最好找尋一種解決方案可以兼顧:
1. 效率高(多個進程共享一塊內存的數據)
2. 幫咱們處理好鎖問題。
mutiprocessing模塊中爲咱們提供了一個IPC通訊機制:隊列和管道。
隊列和管道都是將數據存放於內存中,隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來,
咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可擴展性。

隊列

Queue介紹

咱們能夠建立一個共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。

Queue([maxsize])
建立共享的進程隊列。
參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。
Queue([maxsize])
建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。
Queue的實例q具備如下方法:

q.get( [ block [ ,timeout ] ] )
返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。

q.get_nowait( )
同q.get(False)方法。

q.put(item [, block [,timeout ] ] )
將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。

q.qsize()
返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。


q.empty()
若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。

q.full()
若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。
方法介紹
q.close()
關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,若是某個使用者正被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。

q.cancel_join_thread()
不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。

q.join_thread()
鏈接隊列的後臺線程。此方法用於在調用q.close()方法後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread()方法能夠禁止這種行爲。
其餘方法(瞭解)

隊列簡單示例

'''
multiprocessing模塊支持進程間通訊的兩種主要形式:管道和隊列
都是基於消息傳遞實現的,可是隊列接口
'''

from multiprocessing import Queue
q=Queue(3)

#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
# q.put(3)   # 若是隊列已經滿了,程序就會停在這裏,等待數據被別人取走,再將數據放入隊列。
           # 若是隊列中的數據一直不被取走,程序就會永遠停在這裏。
try:
    q.put_nowait(3) # 可使用put_nowait,若是隊列滿了不會阻塞,可是會由於隊列滿了而報錯。
except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,可是會丟掉這個消息。
    print('隊列已經滿了')

# 所以,咱們再放入數據以前,能夠先看一下隊列的狀態,若是已經滿了,就不繼續put了。
print(q.full()) #滿了

print(q.get())
print(q.get())
print(q.get())
# print(q.get()) # 同put方法同樣,若是隊列已經空了,那麼繼續取就會出現阻塞。
try:
    q.get_nowait(3) # 可使用get_nowait,若是隊列滿了不會阻塞,可是會由於沒取到值而報錯。
except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。
    print('隊列已經空了')

print(q.empty()) #空了
單看隊列用法

上面這個例子尚未加入進程通訊,只是先來看看隊列爲咱們提供的方法,以及這些方法的使用和現象。

import time
from multiprocessing import Process, Queue

def f(q):
    q.put([time.asctime(), 'hi', 'hello'])  #調用主函數中p進程傳遞過來的進程參數 put函數爲向隊列中添加一條數據。

if __name__ == '__main__':
    q = Queue() #建立一個Queue對象
    p = Process(target=f, args=(q,)) #建立一個進程
    p.start()
    print(q.get())
    p.join()
子進程發送數據給父進程

上面是一個queue的簡單應用,使用隊列q對象調用get函數來取得隊列中最早進入的數據。 接下來看一個稍微複雜一些的例子:

import os
import time
import multiprocessing

# 向queue中輸入數據的函數
def inputQ(queue):
    info = str(os.getpid()) + '(put):' + str(time.asctime())
    queue.put(info)

# 向queue中輸出數據的函數
def outputQ(queue):
    info = queue.get()
    print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info))

# Main
if __name__ == '__main__':
    multiprocessing.freeze_support()
    record1 = []   # store input processes
    record2 = []   # store output processes
    queue = multiprocessing.Queue(3)

    # 輸入進程
    for i in range(10):
        process = multiprocessing.Process(target=inputQ,args=(queue,))
        process.start()
        record1.append(process)

    # 輸出進程
    for i in range(10):
        process = multiprocessing.Process(target=outputQ,args=(queue,))
        process.start()
        record2.append(process)

    for p in record1:
        p.join()

    for p in record2:
        p.join()
批量生產數據放入隊列再批量獲取結果 x

生產者消費者模型

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產數據和消費數據的工做能力來提升程序的總體處理數據的速度。

爲何要使用生產者和消費者模式

生產者就是生產數據的一方,消費者就是消費數據的一方。一般生產者和消費者的能力很難協調,例如:若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

什麼是生產者消費者模式

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

基於隊列實現生產者消費者模型

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()
    print('')
基於隊列實現生產者消費者模型

此時的問題是主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。

解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環。

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
    q.put(None) #發送結束信號
if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()
    print('')
改良版——生產者消費者模型

注意:結束信號None,不必定要由生產者發,主進程裏一樣能夠發,但主進程須要等生產者結束後才應該發送該信號

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()

    p1.join()
    q.put(None) #發送結束信號
    print('')
主進程在生產者生產完畢後發送結束信號None

但上述解決方式,在有多個生產者和多個消費者時,咱們則須要用一個很low的方式去解決

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(name,q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨頭',q))
    p3=Process(target=producer,args=('泔水',q))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    p2.start()
    p3.start()
    c1.start()

    p1.join() #必須保證生產者所有生產完畢,才應該發送結束信號
    p2.join()
    p3.join()
    q.put(None) #有幾個消費者就應該發送幾回結束信號None
    q.put(None) #發送結束信號
    print('')
多個消費者的例子:有幾個消費者就須要發送幾回結束信號

線程

線程介紹

有了進程爲何要有線程

進程有不少優勢,它提供了多道編程,讓咱們感受咱們每一個人都擁有本身的CPU和其餘資源,能夠提升計算機的利用率。不少人就不理解了,既然進程這麼優秀,爲何還要線程呢?其實,仔細觀察就會發現進程仍是有不少缺陷的,主要體如今兩點上:

  • 進程只能在一個時間幹一件事,若是想同時幹兩件事或多件事,進程就無能爲力了。

  • 進程在執行的過程當中若是阻塞,例如等待輸入,整個進程就會掛起,即便進程中有些工做不依賴於輸入的數據,也將沒法執行。

若是這兩個缺點理解比較困難的話,舉個現實的例子也許你就清楚了:若是把咱們上課的過程當作一個進程的話,那麼咱們要作的是耳朵聽老師講課,手上還要記筆記,腦子還要思考問題,這樣才能高效的完成聽課的任務。而若是隻提供進程這個機制的話,上面這三件事將不能同時執行,同一時間只能作一件事,聽的時候就不能記筆記,也不能用腦子思考,這是其一;若是老師在黑板上寫演算過程,咱們開始記筆記,而老師忽然有一步推不下去了,阻塞住了,他在那邊思考着,而咱們呢,也不能幹其餘事,即便你想趁此時思考一下剛纔沒聽懂的一個問題都不行,這是其二。

如今你應該明白了進程的缺陷了,而解決的辦法很簡單,咱們徹底可讓聽、寫、思三個獨立的過程,並行起來,這樣很明顯能夠提升聽課的效率。而實際的操做系統中,也一樣引入了這種相似的機制——線程。

線程的出現

60年代,在OS中能擁有資源和獨立運行的基本單位是進程,然而隨着計算機技術的發展,進程出現了不少弊端,一是因爲進程是資源擁有者,建立、撤消與切換存在較大的時空開銷,所以須要引入 輕型進程 ;二是因爲對稱多處理機(SMP)出現, 能夠知足多個運行單位 ,而多個進程並行開銷過大。
所以在80年代,出現了 能獨立運行的基本單位 ——線程(Threads)
注意:進程是資源分配的最小單位,線程是CPU調度的最小單位.
每個進程中至少有一個線程。

進程和線程的關係

線程與進程的區別 能夠概括爲如下4點:
1)地址空間和其它資源(如打開文件):進程間相互獨立,同一進程的各線程間共享。某進程內的線程在其它進程不可見。
2)通訊: 進程間通訊 IPC ,線程間能夠直接讀寫進程數據段(如全局變量)來進行通訊——須要 進程同步 和互斥手段的輔助,以保證數據的一致性。
3)調度和切換:線程上下文切換比進程上下文切換要快得多。
4)在多線程操做系統中,進程不是一個可執行的實體。

線程的特色

在多線程的操做系統中,一般是在一個進程中包括多個線程,每一個線程都是做爲利用CPU的基本單位,是花費最小開銷的實體。線程具備如下屬性。
1)輕型實體
線程中的實體基本上不擁有系統資源,只是有一點必不可少的、能保證獨立運行的資源。
線程的實體包括程序、數據和TCB。線程是動態概念,它的動態特性由線程控制塊TCB(Thread Control Block)描述。
TCB包括如下信息:
(1)線程狀態。
(2)當線程不運行時,被保存的現場資源。
(3)一組執行堆棧。
(4)存放每一個線程的局部變量主存區。
(5)訪問同一個進程中的主存和其它資源。
用於指示被執行指令序列的程序計數器、保留局部變量、少數狀態參數和返回地址等的一組寄存器和堆棧。
TCB包括如下信息
2)獨立調度和分派的基本單位。
在多線程OS中,線程是能獨立運行的基本單位,於是也是獨立調度和分派的基本單位。因爲線程很「輕」,故線程的切換很是迅速且開銷小(在同一進程中的)。
3)共享進程資源。
線程在同一進程中的各個線程,均可以共享該進程所擁有的資源,這首先表如今:全部線程都具備相同的進程id,這意味着,線程能夠訪問該進程的每個內存資源;此外,還能夠訪問進程所擁有的已打開文件、定時器、信號量機構等。因爲同一個進程內的線程共享內存和文件,因此線程之間互相通訊沒必要調用內核。
4 )可併發執行。
在一個進程中的多個線程之間,能夠併發執行,甚至容許在一個進程中全部線程都能併發執行;一樣,不一樣進程中的線程也能併發執行,充分利用和發揮了處理機與外圍設備並行工做的能力。

內存中的線程

 

多個線程共享同一個進程的地址空間中的資源,是對一臺計算機上多個進程的模擬,有時也稱線程爲輕量級的進程。

而對一臺計算機上多個進程,則共享物理內存、磁盤、打印機等其餘物理資源。多線程的運行也多進程的運行相似,是cpu在多個線程之間的快速切換。

不一樣的進程之間是充滿敵意的,彼此是搶佔、競爭cpu的關係,若是迅雷會和QQ搶資源。而同一個進程是由一個程序員的程序建立,因此同一進程內的線程是合做關係,一個線程能夠訪問另一個線程的內存地址,你們都是共享的,一個線程乾死了另一個線程的內存,那純屬程序員腦子有問題。

相似於進程,每一個線程也有本身的堆棧,不一樣於進程,線程庫沒法利用時鐘中斷強制線程讓出CPU,能夠調用thread_yield運行線程自動放棄cpu,讓另一個線程運行。

線程一般是有益的,可是帶來了不小程序設計難度,線程的問題是:

1. 父進程有多個線程,那麼開啓的子線程是否須要一樣多的線程

2. 在同一個進程中,若是一個線程關閉了文件,而另一個線程正準備往該文件內寫內容呢?

所以,在多線程的代碼中,須要更多的心思來設計程序的邏輯、保護程序的數據。

在python中使用線程

threading模塊

multiprocess模塊的徹底模仿了threading模塊的接口,兩者在使用層面,有很大的類似性,於是再也不詳細介紹( 官方連接

線程的建立

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    print('主線程')
建立線程的方式1
from threading import Thread
import time
class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)


if __name__ == '__main__':
    t = Sayhi('egon')
    t.start()
    print('主線程')
建立線程的方式2

多線程與多進程

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello',os.getpid())

if __name__ == '__main__':
    #part1:在主進程下開啓多個線程,每一個線程都跟主進程的pid同樣
    t1=Thread(target=work)
    t2=Thread(target=work)
    t1.start()
    t2.start()
    print('主線程/主進程pid',os.getpid())

    #part2:開多個進程,每一個進程都有不一樣的pid
    p1=Process(target=work)
    p2=Process(target=work)
    p1.start()
    p2.start()
    print('主線程/主進程pid',os.getpid())
pid的比較
from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello')

if __name__ == '__main__':
    #在主進程下開啓線程
    t=Thread(target=work)
    t.start()
    print('主線程/主進程')
    '''
    打印結果:
    hello
    主線程/主進程
    '''

    #在主進程下開啓子進程
    t=Process(target=work)
    t.start()
    print('主線程/主進程')
    '''
    打印結果:
    主線程/主進程
    hello
    '''
開啓效率的較量
from  threading import Thread
from multiprocessing import Process
import os
def work():
    global n
    n=0

if __name__ == '__main__':
    # n=100
    # p=Process(target=work)
    # p.start()
    # p.join()
    # print('主',n) #毫無疑問子進程p已經將本身的全局的n改爲了0,但改的僅僅是它本身的,查看父進程的n仍然爲100


    n=1
    t=Thread(target=work)
    t.start()
    t.join()
    print('',n) #查看結果爲0,由於同一進程內的線程之間共享進程內的數據
同一進程內的線程共享該進程的數據?
內存數據的共享問題

練習 :多線程實現socket

#_*_coding:utf-8_*_
#!/usr/bin/env python
import multiprocessing
import threading

import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)

def action(conn):
    while True:
        data=conn.recv(1024)
        print(data)
        conn.send(data.upper())

if __name__ == '__main__':

    while True:
        conn,addr=s.accept()


        p=threading.Thread(target=action,args=(conn,))
        p.start()
server
#_*_coding:utf-8_*_
#!/usr/bin/env python


import socket

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if not msg:continue

    s.send(msg.encode('utf-8'))
    data=s.recv(1024)
    print(data)
client

Thread類的其餘方法

Thread實例對象的方法
  # isAlive(): 返回線程是否活動的。
  # getName(): 返回線程名。
  # setName(): 設置線程名。

threading模塊提供的一些方法:
  # threading.currentThread(): 返回當前的線程變量。
  # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
  # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    #在主進程下開啓線程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName())
    print(threading.current_thread()) #主線程
    print(threading.enumerate()) #連同主線程在內有兩個運行的線程
    print(threading.active_count())
    print('主線程/主進程')

    '''
    打印結果:
    MainThread
    <_MainThread(MainThread, started 140735268892672)>
    [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
    主線程/主進程
    Thread-1
    '''
代碼示例
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    t.join()
    print('主線程')
    print(t.is_alive())
    '''
    egon say hello
    主線程
    False
    '''
join方法

守護線程

不管是進程仍是線程,都遵循:守護xx會等待主xx運行完畢後被銷燬。 須要強調的是:運行完畢並不是終止運行

#1.對主進程來講,運行完畢指的是主進程代碼運行完畢
#2.對主線程來講,運行完畢指的是主線程所在的進程內全部非守護線程通通運行完畢,主線程纔算運行完畢
#1 主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),而後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(不然會產生殭屍進程),纔會結束,
#2 主線程在其餘非守護線程運行完畢後纔算運行完畢(守護線程在此時就被回收)。由於主線程的結束意味着進程的結束,進程總體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束。
詳細解釋
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.setDaemon(True) #必須在t.start()以前設置
    t.start()

    print('主線程')
    print(t.is_alive())
    '''
    主線程
    True
    '''
守護線程例1
from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


t1=Thread(target=foo)
t2=Thread(target=bar)

t1.daemon=True
t1.start()
t2.start()
print("main-------")
守護線程例2

同步鎖

from threading import Thread
import os,time
def work():
    global n
    temp=n
    time.sleep(0.1)
    n=temp-1
if __name__ == '__main__':
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結果可能爲99
多個線程搶佔資源的狀況
import threading
R=threading.Lock()
R.acquire()
'''
對公共數據的操做
'''
R.release()
from threading import Thread,Lock
import os,time
def work():
    global n
    lock.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結果確定爲0,由原來的併發執行變成串行,犧牲了執行效率保證了數據安全
同步鎖的引用
#不加鎖:併發執行,速度快,數據不安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    global n
    print('%s is running' %current_thread().getName())
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
'''


#不加鎖:未加鎖部分併發執行,加鎖部分串行執行,速度慢,數據安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    #未加鎖的代碼併發運行
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    #加鎖的代碼串行運行
    lock.acquire()
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()

if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0
'''

#有的同窗可能有疑問:既然加鎖會讓運行變成串行,那麼我在start以後當即使用join,就不用加鎖了啊,也是串行的效果啊
#沒錯:在start以後馬上使用jion,確定會將100個任務的執行變成串行,毫無疑問,最終n的結果也確定是0,是安全的,但問題是
#start後當即join:任務內的全部代碼都是串行執行的,而加鎖,只是加鎖的部分即修改共享數據的部分是串行的
#單從保證數據安全方面,兩者均可以實現,但很明顯是加鎖的效率更高.
from threading import current_thread,Thread,Lock
import os,time
def task():
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        t.start()
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0 #耗時是多麼的恐怖
'''
互斥鎖與join的區別

死鎖與可重入鎖

進程也有死鎖與可重入鎖,使用方法都是同樣的,因此放到這裏一塊兒說:

所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程,以下就是死鎖。

from threading import Lock as Lock
import time
mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()
死鎖

解決方法:使用可重入鎖,在Python中爲了支持在同一線程中屢次請求同一資源,提供了可重入鎖RLock。

這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:

from threading import RLock
import time
mutexA=RLock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()
可重入鎖示例

典型問題:科學家吃麪

import time
from threading import Thread,Lock
noodle_lock = Lock()
fork_lock = Lock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 搶到了麪條'%name)
    fork_lock.acquire()
    print('%s 搶到了叉子'%name)
    print('%s 吃麪'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 搶到了麪條' % name)
    print('%s 吃麪' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['哪吒','egon','yuan']:
    t1 = Thread(target=eat1,args=(name,))
    t2 = Thread(target=eat2,args=(name,))
    t1.start()
    t2.start()
死鎖問題
import time
from threading import Thread,RLock
fork_lock = noodle_lock = RLock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 搶到了麪條'%name)
    fork_lock.acquire()
    print('%s 搶到了叉子'%name)
    print('%s 吃麪'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 搶到了麪條' % name)
    print('%s 吃麪' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['哪吒','egon','yuan']:
    t1 = Thread(target=eat1,args=(name,))
    t2 = Thread(target=eat2,args=(name,))
    t1.start()
    t2.start()
可重入鎖解決死鎖問題

全局解釋器鎖GIL

GIL介紹

首先須要明確的一點是GIL並非Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。

Python也同樣,一樣一段代碼能夠經過CPython,PyPy,Psyco等不一樣的Python執行環境來執行。像其中的JPython就沒有GIL。然而由於CPython是大部分環境下默認的Python執行環境。因此在不少人的概念裏CPython就是Python,也就想固然的把GIL歸結爲Python語言的缺陷。因此這裏要先明確一點:GIL並非Python的特性,Python徹底能夠不依賴於GIL。

簡單來講,在Cpython解釋器中,由於有GIL鎖的存在同一個進程下開啓的多線程,同一時刻只能有一個線程執行,沒法利用多核優點。

常見問題1

咱們有了GIL鎖爲何還要本身在代碼中加鎖呢?

GIL保護的是Python解釋器級別的數據資源,本身代碼中的數據資源就須要本身加鎖防止競爭。以下圖:

常見問題2

有了GIL的存在,同一時刻同一進程中只有一個線程被執行,進程能夠利用多核,可是開銷大,而Python的多線程開銷小,但卻沒法利用多核優點,也就是說Python這語言難堪大用。

其實編程所解決的現實問題大體分爲IO密集型和計算密集型。

對於IO密集型的場景,Python的多線程編程徹底OK,而對於計算密集型的場景,Python中有不少成熟的模塊或框架如Pandas等可以提升計算效率。

 

推薦閱讀:理解GIL

池 —— concurrent.futures

Python標準模塊--concurrent.futures

https://docs.python.org/dev/library/concurrent.futures.html

concurrent.futures模塊提供了高度封裝的異步調用接口,其中:

ThreadPoolExecutor:線程池

ProcessPoolExecutor: 進程池

藉助上面兩個類,咱們能夠很方便地建立進程池對象和線程池對象。

p_pool = ProcessPoolExecutor(max_workers=5)  # 建立一個最多5個woker的進程池
t_pool = ThreadPoolExecutor(max_workers=5)  # 建立一個最多5個woker的線程池

可用方法介紹:

# 基本方法
#submit(fn, *args, **kwargs)
提交任務

# map(func, *iterables, timeout=None, chunksize=1) 
取代for循環submit的操做

# shutdown(wait=True) 
至關於進程池的pool.close()+pool.join()操做
wait=True,等待池內全部任務執行完畢回收完資源後才繼續
wait=False,當即返回,並不會等待池內的任務執行完畢
但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢
submit和map必須在shutdown以前

# result(timeout=None)
取得結果

# add_done_callback(fn)
回調函數
#介紹
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.


#用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        future=executor.submit(task,i)
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())
ProcessPoolExecutor
#介紹
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.

Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.

New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.

#用法
與ProcessPoolExecutor相同
ThreadPoolExecutor

使用線程池改進網絡編程的例子:

import socket
from concurrent.futures import ThreadPoolExecutor

t_pool = ThreadPoolExecutor(max_workers=3)

server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)


def comm(conn):
    while 1:
        try:
            data = conn.recv(1024)
            if not data:
                break
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()


def serve():
    while 1:
        conn, addr = server.accept()
        t_pool.submit(comm, conn)
    server.close()


if __name__ == '__main__':
    serve()
server端
import socket
client = socket.socket()
client.connect(('127.0.0.1', 8080))

while 1:
    msg = input('>>>').strip()
    if not msg: continue
    client.send(msg.encode('utf8'))
    data = client.recv(1024)
    print(data.decode('utf8'))

client.close()
client端

其餘方法:

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    executor.map(task,range(1,12)) #map取代了for+submit
map的用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<進程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<進程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,須要用obj.result()拿到結果
回調函數
相關文章
相關標籤/搜索