併發編程之多線程

1、threading模塊介紹

  multiprocess模塊的徹底模仿了threading模塊的接口,兩者在使用層面,有很大的類似性,於是再也不詳細介紹html

  官網連接:點擊進入python

2、開啓線程的兩種方式

  multiprocess模塊的徹底模仿了threading模塊的接口,兩者在使用層面,有很大的類似性正則表達式

import time, random
# from multiprocessing import Process
from threading import Thread


def piao(name):
    print('%s piaoing' % name)
    time.sleep(random.randrange(1, 5))
    print('%s piao end' % name)

if __name__ == '__main__':
    t1 = Thread(target=piao, args=('egon', ))
    t1.start()  # 主線程向操做系統發信號,又開了一個線程
    print("主線程")   # 執行角度看是主線程,從資源角度看是主進程
# 這個程序整體是一個進程、兩個線程
"""
egon piaoing
主進程
egon piao end
"""
開啓線程方式一
import time, random
# from multiprocessing import Process
from threading import Thread

class MyThread(Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print("%s piaoing" % self.name)
        time.sleep(random.randrange(1, 5))
        print("%s piao end" % self.name)

if __name__ == '__main__':
    t1 = MyThread('egon')
    t1.start()  # 主線程向操做系統發信號,又開了一個線程
    print("主線程")
"""
egon piaoing
主線程
egon piao end
"""
方式二:定製線程

3、在一個進程下開啓線程與在一個進程下開啓多個子進程的區別

import time
from multiprocessing import Process
from threading import Thread


def piao(name):
    print('%s piaoing' % name)
    time.sleep(2)
    print('%s piao end' % name)

if __name__ == '__main__':
    # p1 = Process(target=piao, args=('進程', ))
    # p1.start()
    """
    主線程
    進程 piaoing
    進程 piao end
    """


    t1 = Thread(target=piao, args=('線程', ))
    t1.start()
    """
    線程 piaoing
    主線程
    線程 piao end
    """
    print("主線程")
# 對比可知,線程開銷遠小於進程,由於進程須要申請內存空間。
一、進程開銷遠大於線程
from threading import Thread
from multiprocessing import Process

n = 100
def task():
    global n
    n = 0

if __name__ == '__main__':
    """進程驗證:
    p1 = Process(target=task,)
    p1.start()   # 會把子進程的n改成了0,看是否影響主進程
    p1.join()
    print("主進程", n)   # 主進程 100
    # 因而可知進程間是隔離的,子進程變量修改不影響主進程
    """

    """線程驗證"""
    t1 = Thread(target=task, )
    t1.start()
    t1.join()
    print("主線程", n)   # 主線程 0
二、同一進程內的多個線程共享進程地址空間
from threading import Thread
from multiprocessing import Process, current_process  # current_process查看進程ID號
import os   # os.getpid()也能夠查看進程ID

n = 100
def task():
    # print(current_process().pid)
    print('子進程PID:%s   父進程的PID:%s' % (os.getpid(), os.getppid()))

if __name__ == '__main__':
    p1 = Process(target=task,)
    p1.start()

    # print("主線程", current_process().pid)
    print("主線程", os.getpid())
"""
主線程 6455
子進程PID:6456   父進程的PID:6455
"""
三、pid觀察
from threading import Thread
import os   # os.getpid()也能夠查看進程ID

n = 100
def task():
    # print(current_process().pid)
    print('線程的進程 PID:%s' % os.getpid())

if __name__ == '__main__':
    t1 = Thread(target=task,)
    t1.start()

    # print("主線程", current_process().pid)
    print("主線程", os.getpid())
"""說明兩個線程是同一個進程:
線程的進程 PID:6493
主線程 6493
"""
四、研究線程:線程都屬於同一個進程

4、練習

一、基於多線程實現併發的套接字通訊緩存

# -*- coding:utf-8 -*-
__author__ = 'Qiushi Huang'

from socket import *
from threading import Thread

# 通信和創建連接分開,啓動不一樣的線程,你們是併發執行。
def communicate(conn):
    while True:
        try:
            data = conn.recv(1024)
            if not data:break
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()


def server(ip, port):
    server = socket(AF_INET, SOCK_STREAM)
    server.bind((ip, port))
    server.listen(5)

    while True:
        conn, addr = server.accept()   # 建連接
        t = Thread(target=communicate, args=(conn,))  # 建一個連接創一個線程
        t.start()
        # communicate(conn)

    server.close()


if __name__ == '__main__':
    server('127.0.0.1', 8091)   # 主線程

"""
這種解決方案的問題是:當客戶端愈來愈多後,線程也會愈來愈多,會帶來服務崩潰的問題。
"""
多線程併發服務端
# -*- coding:utf-8 -*-
__author__ = 'Qiushi Huang'

# 使用時,能夠一個程序運行屢次,這是多個不一樣的in
from socket import *

client = socket(AF_INET, SOCK_STREAM)
client.connect(("127.0.0.1", 8091))

while True:
    msg = input(">>").strip()
    if not msg:continue
    client.send(msg.encode("utf-8"))
    data = client.recv(1024)
    print(data.decode("utf-8"))

client.close()
客戶端

二、編寫一個簡單的文本處理工具,具有三個任務,一個接收用戶輸入,一個將用戶輸入的內容格式化成大寫,一個將格式化後的結果存入文件安全

from threading import Thread
msg_l=[]
format_l=[]
def talk():
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
        msg_l.append(msg)

def format_msg():
    while True:
        if msg_l:
            res=msg_l.pop()
            format_l.append(res.upper())

def save():
    while True:
        if format_l:
            with open('db.txt','a',encoding='utf-8') as f:
                res=format_l.pop()
                f.write('%s\n' %res)

if __name__ == '__main__':
    t1=Thread(target=talk)
    t2=Thread(target=format_msg)
    t3=Thread(target=save)
    t1.start()
    t2.start()
    t3.start()
多線程併發執行多項任務

5、線程對象的屬性和方法

一、Thread實例對象的方法

isAlive(): 返回線程是否活動的。
getName(): 返回線程名。
setName(): 設置線程名。

二、threading模塊提供的一些方法

threading.currentThread(): 返回當前的線程變量。
threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

三、屬性和方法的應用與驗證

from threading import Thread, currentThread   # 獲得線程對象的方法
from threading import active_count    # 獲得活躍進程數
from threading import enumerate   # 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
import time

# 須要注意的是線程沒有子線程的概念,線程都是屬於進程的
def task():
    print("%s is running" % currentThread().getName())   # 對象下有一個getName()方法
    time.sleep(2)
    print("%s is done" % currentThread().getName())

if __name__ == '__main__':
    getName()方法返回線程名
    t = Thread(target=task, name='子線程1')
    t.start()
    print("主進程", currentThread().getName())
    """
    子線程1 is running
    主進程 MainThread
    子線程1 is done
    """
getName方法獲得線程名
from threading import Thread, currentThread   # 獲得線程對象的方法
from threading import active_count    # 獲得活躍進程數
from threading import enumerate   # 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
import time

def task():
    print("%s is running" % currentThread().getName())   # 對象下有一個getName()方法
    time.sleep(2)
    print("%s is done" % currentThread().getName())

if __name__ == '__main__':
    setName()方法設置線程名
    t = Thread(target=task, name='子線程1')
    t.start()
    t.setName('兒子線程1')   # 修改進程名稱
    currentThread().setName("主線程")   # 設置主線程名稱(默認是MainThread)
    print(t.isAlive())    # 判斷線程是否存活
    print("主進程", currentThread().getName())
    """
    子線程1 is running
    True
    主進程 主線程
    兒子線程1 is done
    """
setName方法和isAlive方法
from threading import Thread, currentThread   # 獲得線程對象的方法
from threading import active_count    # 獲得活躍進程數
from threading import enumerate   # 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
import time

def task():
    print("%s is running" % currentThread().getName())   # 對象下有一個getName()方法
    time.sleep(2)
    print("%s is done" % currentThread().getName())

if __name__ == '__main__':
    t = Thread(target=task, name='子線程1')
    t.start()
    t.setName('兒子線程1')  # 修改進程名稱
    t.join()  # 主線程等子進程運行完畢再執行
    currentThread().setName("主線程")  # 設置主線程名稱(默認是MainThread)
    print(t.isAlive())  # 判斷線程是否存活
    print("主進程", currentThread().getName())
    """
    子線程1 is running
    兒子線程1 is done
    False
    主進程 主線程
    """
join方法主線程等子線程運行完執行
from threading import Thread, currentThread   # 獲得線程對象的方法
from threading import active_count    # 獲得活躍進程數
from threading import enumerate   # 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
import time

def task():
    print("%s is running" % currentThread().getName())   # 對象下有一個getName()方法
    time.sleep(2)
    print("%s is done" % currentThread().getName())

if __name__ == '__main__':
    # 測試threading.active_count方法
    t = Thread(target=task, name='子線程1')
    t.start()
    print(active_count())
    """
    子線程1 is running
    2
    子線程1 is done
    """
threading.active_count方法
from threading import Thread, currentThread   # 獲得線程對象的方法
from threading import active_count    # 獲得活躍進程數
from threading import enumerate   # 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
import time

def task():
    print("%s is running" % currentThread().getName())   # 對象下有一個getName()方法
    time.sleep(2)
    print("%s is done" % currentThread().getName())

if __name__ == '__main__':
    # 對上面改寫添加一個join()
    t = Thread(target=task, name='子線程1')
    t.start()
    t.join()   # 運行完才執行主線程,所以後面打印的活躍線程數是一個
    print(active_count())
    """
    子線程1 is running
    子線程1 is done
    1
    """
對線程添加join方法,執行active_count
from threading import Thread, currentThread   # 獲得線程對象的方法
from threading import active_count    # 獲得活躍進程數
from threading import enumerate   # 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
import time

def task():
    print("%s is running" % currentThread().getName())   # 對象下有一個getName()方法
    time.sleep(2)
    print("%s is done" % currentThread().getName())

if __name__ == '__main__':
    # threading.enumerate()方法:返回一個包含正在運行的線程的list
    t = Thread(target=task, name='子線程1')
    t.start()
    print(enumerate())
    """
    子線程1 is running
    [<_MainThread(MainThread, started 4320744256)>, <Thread(子線程1, started 123145383735296)>]
    子線程1 is done
    """
threading.enumerate()方法:返回一個包含正在運行的線程的list

6、守護線程

  一個進程內,若是不開線程,默認就是一個主線程,主線程代碼運行完畢,進程被銷燬。服務器

  一個進程內,開多個線程的狀況下,主線程在代碼運行完畢後,還要等其餘線程工做完才死掉,進程銷燬。網絡

  守護線程守護主線程,等到主線程死了纔會被銷燬。在有其餘線程的狀況下,主線程代碼運行完後,等其餘非守護線程結束,守護線程纔會死掉。多線程

  不管是進程仍是線程,都遵循:守護xxx會等待主xxx運行完畢後被銷燬。併發

  須要強調的是:運行完畢並不是終止運行。運行完畢的真正含義:app

  一、對主進程來講,運行完畢指的是主進程代碼運行完畢。

  二、對主線程來講,運行完畢指的是主線程所在的進程內全部非守護線程通通運行完畢,主線程才能運行完畢。

  詳細解釋

  一、主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),而後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(不然會產生殭屍進程),纔會結束

  二、主線程在其餘非守護線程運行完畢後纔算運行完畢(守護線程在此時就被回收)。由於主線程的結束意味着進程的結束,進程總體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束。

from threading import Thread
import time

def sayhi(name):
    time.sleep(2)
    print("%s say hello" % name)

if __name__ == '__main__':
    t = Thread(target=sayhi, args=('egon',))

    # 守護線程必須在t.start()前設置
    # 守護線程設置方式一:
    t.daemon=True
    # 守護線程設置方式二:
    # t.setDaemon(True)

    t.start()   # 立馬建立子線程,但須要等待兩秒,所以程序會先執行下面的代碼

    print("主線程")
    print(t.is_alive())
# 這一行代碼執行完後,主線程執行完畢,因爲主線程以外,只有一個守護線程,主線程不須要等守護線程執行結束,所以主線程和守護進程終止,進程結束。
"""
主線程
True
"""

  練習:思考下述代碼的執行結果有多是哪些狀況?爲何?

from threading import Thread
import time

def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == '__main__':
    t1=Thread(target=foo)
    t2=Thread(target=bar)

    t1.daemon=True   # t1是守護線程
    t1.start()
    t2.start()
    print("main-------")   # 主線程結束後,會等待非守護線程結束
# 因爲非守護線程須要等待的時間比守護線程長,所以線程都會獲得執行
"""
123
456
main------
end123
end456
"""
因爲非守護線程須要等待的時間比守護線程長,所以線程都會獲得執行

7、GIL全局解釋鎖(Global Interpreter Lock)

連接:http://www.cnblogs.com/linhaifeng/articles/7449853.html

  後期須要詳細分析這個部分的內容。

8、同步鎖

一、三個須要注意的點

  一、線程搶的是GIL鎖,GIL鎖至關於執行權限,拿到執行權限後才能拿到互斥鎖Lock,其餘線程也能夠搶到GIL,但若是發現Lock仍然沒有被釋放則阻塞,即使是拿到執行權限GIL也要馬上交出來。

  二、join是等待全部,即總體串行,而鎖只是鎖住修改共享數據的部分,即部分串行,要想保證數據安全的根本原理在於讓併發變成串行,join與互斥鎖均可以實現,毫無疑問,互斥鎖的部分串行效率要更高。

  三、必定要主要本小節最後GIL和互斥鎖的經典分析。

二、GIL和Lock的對比

  Python已經有了一個GIL來保證同一時間只能有一個線程來執行,爲何還須要lock?

    鎖的目的是爲了保護共享的數據,同一時間只能有一個線程來修改共享的數據。

    保護不一樣的數據就應該加不一樣的鎖。

  GIL 與Lock是兩把鎖,保護的數據不同,GIL是解釋器級別的(固然保護的就是解釋器級別的數據,好比垃圾回收的數據),Lock是保護用戶本身開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock。

過程分析:全部線程搶的是GIL鎖,或者說全部線程搶的是執行權限

  線程1搶到GIL鎖,拿到執行權限,開始執行,而後加了一把Lock,尚未執行完畢,即線程1還未釋放Lock,有可能線程2搶到GIL鎖,開始執行,執行過程當中發現Lock尚未被線程1釋放,因而線程2進入阻塞,被奪走執行權限,有可能線程1拿到GIL,而後正常執行到釋放Lock。。。這就致使了串行運行的效果

  既然是串行,那咱們執行

  t1.start()

  t1.join

  t2.start()

  t2.join()

  這也是串行執行啊。

  需知join是等待t1全部的代碼執行完,至關於鎖住了t1的全部代碼,而Lock只是鎖住一部分操做共享數據的代碼

  由於Python解釋器幫你自動按期進行內存回收,你能夠理解爲python解釋器裏有一個獨立的線程,每過一段時間它起wake up作一次全局輪詢看看哪些內存數據是能夠被清空的,此時你本身的程序 裏的線程和 py解釋器本身的線程是併發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程當中的clearing時刻,可能一個其它線程正好又從新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,爲了解決相似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題, 這能夠說是Python早期版本的遺留問題。

  鎖一般被用來實現對共享資源的同步訪問。爲每個共享資源建立一個Lock對象,當你須要訪問該資源時,調用acquire方法來獲取鎖對象(若是其它線程已經得到了該鎖,則當前線程需等待其被釋放),待資源訪問完後,再調用release方法釋放鎖

import threading

R=threading.Lock()

R.acquire()
'''
對公共數據的操做
'''
R.release()
from threading import Thread
import os,time
def work():
    global n
    temp=n
    time.sleep(0.1)
    n=temp-1
if __name__ == '__main__':
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結果可能爲99
未使用Lock的示例
from threading import Thread,Lock
import os,time
def work():
    global n
    lock.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結果確定爲0,由原來的併發執行變成串行,犧牲了執行效率保證了數據安全
使用了Lock的示例

二、GIL鎖和互斥鎖綜合分析

  一、100個線程去搶GIL鎖,即搶執行權限。

  二、確定有一個線程先搶到GIL(暫且稱爲線程1),而後開始執行,一旦執行就會拿到lock.acquire()

  三、極有可能線程1還未運行完畢,就有另一個線程2搶到GIL,而後開始運行,但線程2發現互斥鎖lock尚未被線程1釋放,因而阻塞,被迫交出執行權限,即釋放GIL。

  四、直到線程1從新搶到GIL,開始從上次暫停的位置繼續執行,直到正常釋放互斥鎖lock,而後其餘線程再重複234的過程。

三、互斥鎖和join的區別

#不加鎖:併發執行,速度快,數據不安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    global n
    print('%s is running' %current_thread().getName())
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
'''
不加鎖:併發執行,速度快,數據不安全
#加鎖:未加鎖部分併發執行,加鎖部分串行執行,速度慢,數據安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    #未加鎖的代碼併發運行
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    #加鎖的代碼串行運行
    lock.acquire()
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()

if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0
'''
加鎖:未加鎖部分併發執行,加鎖部分串行執行,速度慢,數據安全

  加鎖會將運行變成串行,一樣適用join也能夠獲得串行的效果,數據也是安全的,可是start後當即join,任務內的全部代碼都是串行執行的,而加鎖只是加鎖的部分(修改共享數據的部分)是串行的,二者從保護數據安全方面來講是同樣的,可是加鎖的效率更高。

from threading import current_thread,Thread,Lock
import os,time
def task():
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        t.start()
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0 #耗時是多麼的恐怖
'''
join方式,所有代碼串行執行,數據安全但效率很低

9、死鎖現象與遞歸鎖

  死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程

from threading import Thread, Lock
import time

# 實例化兩把鎖
mutexA = Lock()
mutexB = Lock()


class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print("%s 拿到A鎖" % self.name)

        mutexB.acquire()
        print("%s 拿到了B鎖" % self.name)

        mutexB.release()
        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print("%s 拿到B鎖" % self.name)
        time.sleep(0.1)   # 線程1在此休息0.1秒
        mutexA.acquire()
        print("%s 拿到了A鎖" % self.name)

        mutexA.release()
        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t = MyThread()
        t.start()   # 信號提交,就幾乎立馬啓動了
"""程序輸出下面內容後,卡住了
Thread-1 拿到A鎖
Thread-1 拿到了B鎖
Thread-1 拿到B鎖
Thread-2 拿到A鎖   ————》線程1睡着時,線程2拿到A鎖,要去拿B鎖,B鎖在線程1手裏,線程1睡完要去拿A鎖,A鎖在線程2手裏,所以產生死鎖。
"""
死鎖示例

  線程1睡着時,線程2拿到A鎖,要去拿B鎖,B鎖在線程1手裏,線程1睡完要去拿A鎖,A鎖在線程2手裏,所以產生死鎖。上述例子也說明:本身處理鎖其實很是繁瑣也很是危險,必定要在適當的時候考慮把鎖釋放掉。處理不當就會出現死鎖,整個程序就會卡在原地。

  這是因爲互斥鎖只能acquire一次,使用方法以下:

from threading import Thread, Lock

mutexA = Lock()
mutexA.acquire()
mutexA.release()

  解決辦法——遞歸鎖能夠連續acquire屢次,每acquire一次計數器加1;只要計數不爲0,就不能被其餘線程搶到(只有計數爲0,才能被搶到acquire)

  在Python中爲了支持同一線程中屢次請求同一資源,提供了可重入鎖RLock

  這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:

mutexA=mutexB=threading.RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1
,這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,即counter遞減到0爲止
from threading import Thread, RLock
import time

"""鏈式賦值"""
mutexA=mutexB=RLock()   # 使用遞歸鎖能夠

class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()    # 遞歸鎖計數器加1
        print("%s 拿到A鎖" % self.name)

        mutexB.acquire()
        print("%s 拿到了B鎖" % self.name)

        mutexB.release()   # 遞歸鎖計數器減1
        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print("%s 拿到B鎖" % self.name)
        time.sleep(1)   # 線程1在此休息0.1秒
        mutexA.acquire()
        print("%s 拿到了A鎖" % self.name)

        mutexA.release()
        mutexB.release()    # 計數爲0,其餘線程能夠搶acquire

if __name__ == '__main__':
    for i in range(10):
        t = MyThread()
        t.start()   # 信號提交,就幾乎立馬啓動了
""" 第一個線程計數器爲0 後,其餘線程能夠開始搶acquire,所以順序是不固定的。
Thread-1 拿到A鎖
Thread-1 拿到了B鎖
Thread-1 拿到B鎖
Thread-1 拿到了A鎖
Thread-2 拿到A鎖
Thread-2 拿到了B鎖
Thread-2 拿到B鎖
Thread-2 拿到了A鎖
Thread-4 拿到A鎖
Thread-4 拿到了B鎖
Thread-4 拿到B鎖
Thread-4 拿到了A鎖
Thread-6 拿到A鎖
Thread-6 拿到了B鎖
Thread-6 拿到B鎖
Thread-6 拿到了A鎖
Thread-8 拿到A鎖
Thread-8 拿到了B鎖
Thread-8 拿到B鎖
Thread-8 拿到了A鎖
Thread-10 拿到A鎖
Thread-10 拿到了B鎖
Thread-10 拿到B鎖
Thread-10 拿到了A鎖
Thread-5 拿到A鎖
Thread-5 拿到了B鎖
Thread-5 拿到B鎖
Thread-5 拿到了A鎖
Thread-9 拿到A鎖
Thread-9 拿到了B鎖
Thread-9 拿到B鎖
Thread-9 拿到了A鎖
Thread-7 拿到A鎖
Thread-7 拿到了B鎖
Thread-7 拿到B鎖
Thread-7 拿到了A鎖
Thread-3 拿到A鎖
Thread-3 拿到了B鎖
Thread-3 拿到B鎖
Thread-3 拿到了A鎖
"""
RLock示例

10、信號量semaphore

  信號量也是一把鎖,能夠指定信號量爲5,對比互斥鎖同一時間只能有一個任務搶到鎖去執行,信號量同一時間能夠有5個任務拿到鎖去執行,若是說互斥鎖是合租房屋的人去搶一個廁所,那麼信號量就至關於一羣路人爭搶公共廁所,公共廁全部多個坑位,這意味着同一時間能夠有多我的上公共廁所,但公共廁所容納的人數是必定的,這即是信號量的大小。

from threading import Thread, Semaphore, currentThread
import time, random

sm = Semaphore(3)    # 定義出坑的個數

def task():
    # sm.acquire()
    # print("%s in" % currentThread().getName())
    # sm.release()
    with sm:
        print("%s in " % currentThread().getName())
        time.sleep(random.randint(1, 3))


if __name__ == '__main__':
    for i in range(10):
        t = Thread(target=task)
        t.start()
"""
Thread-1 in 
Thread-2 in 
Thread-3 in 
Thread-5 in 
Thread-6 in 
Thread-4 in 
Thread-7 in 
Thread-8 in 
Thread-9 in 
Thread-10 in 
"""
sm=Semaphore(信號量大小)

  Semaphore管理一個內置的計算器,每當調用acquire()時內置計數器-1;調用release()時內置計數器+1;計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。

  與進程池是徹底不一樣的概念,進程池Pool(4),最大隻能產生4個進程,並且從頭至尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程

  互斥鎖與信號量推薦博客:http://url.cn/5DMsS9r

11、Event

  同進程的同樣線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其餘線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。

  爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行。

from threading import Event  :調用event
event.isSet():返回event的狀態值;
event.wait():若是 event.isSet()==False將阻塞線程;
event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;
event.clear():恢復event的狀態值爲False。  ————set()後再clear(),重置到初始狀態。

  

  例如,有多個工做線程嘗試連接MySQL,咱們想要在連接前確保MySQL服務正常才讓那些工做線程去鏈接MySQL服務器,若是鏈接不成功,都會去嘗試從新鏈接。那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做。

from threading import Thread, Event
import time

event = Event()

def student(name):
    print("學生%s正在聽課" % name)
    event.wait()   # 在原地等待
    print("學生%s課間活動" % name)


def teacher(name):
    print("老師%s 正在授課" % name)
    time.sleep(7)
    event.set()    # 這個運行後等待結束


if __name__ == '__main__':
    stu1 = Thread(target=student, args=('alex',))
    stu2 = Thread(target=student, args=('wxx',))
    stu3 = Thread(target=student, args=('yxx',))
    t1 = Thread(target=teacher, args=('egon',))

    stu1.start()
    stu2.start()
    stu3.start()
    t1.start()
"""
學生alex正在聽課
學生wxx正在聽課
學生yxx正在聽課
老師egon 正在授課  ------->在這裏等7秒後,學生開始作課間活動
學生alex課間活動
學生wxx課間活動
學生yxx課間活動
"""
Event.wait()不設置等待時間,等到event.set()纔開始激活

  將上例改寫:有的學生線程,須要在老師發出結束信號前就去作其餘工做。

from threading import Thread, Event
import time

event = Event()

def student(name):
    print("學生%s正在聽課" % name)
    event.wait(2)
    print("學生%s課間活動" % name)


def teacher(name):
    print("老師%s 正在授課" % name)
    time.sleep(7)
    event.set()


if __name__ == '__main__':
    stu1 = Thread(target=student, args=('alex',))
    stu2 = Thread(target=student, args=('wxx',))
    stu3 = Thread(target=student, args=('yxx',))
    t1 = Thread(target=teacher, args=('egon',))

    stu1.start()
    stu2.start()
    stu3.start()
    t1.start()
"""
學生alex正在聽課
學生wxx正在聽課
學生yxx正在聽課
老師egon 正在授課   -------》等兩秒後,學生就去作課間活動了,等滿七秒,程序才結束
學生alex課間活動
學生yxx課間活動
學生wxx課間活動
"""
設置event.wait(2),等了兩秒學生就開始課間活動了

  有不少時候,屢次檢測不成功,須要設置超時時間:

from threading import Thread, Event, currentThread
import time

event = Event()

def conn():
    n=0
    while not event.is_set():   # 尚未set()過,值爲False
        if n == 3:
            print("%s try too many times!" % currentThread().getName())
            return   # 結束整個函數,若是break,則連接成功了
        print("%s try %s" % (currentThread().getName(), n))
        event.wait(0.5)   # 原地等待,並設置了超時時間
        n += 1

    print("%s is connected" % currentThread().getName())


def check():
    print("%s is checking" % currentThread().getName())
    time.sleep(5)   # 模擬檢測
    event.set()     # 檢測OK

if __name__ == '__main__':
    for i in range(3):
        t = Thread(target=conn)
        t.start()
    t = Thread(target=check)   # 檢測線程
    t.start()
"""
Thread-1 try 0
Thread-2 try 0
Thread-3 try 0
Thread-4 is checking
Thread-1 try 1
Thread-2 try 1
Thread-3 try 1
Thread-1 try 2
Thread-3 try 2
Thread-2 try 2
Thread-1 try too many times!
Thread-2 try too many times!
Thread-3 try too many times!
"""
檢測達到三次提示超時

12、條件Condition

  使得線程等待,只有知足某條件時,才釋放n個線程

import threading
 
def run(n):
    con.acquire()
    con.wait()
    print("run the thread: %s" %n)
    con.release()
 
if __name__ == '__main__':
 
    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()
 
    while True:
        inp = input('>>>')
        if inp == 'q':
            break
        con.acquire()
        con.notify(int(inp))
        con.release()

函數化:

def condition_func():

    ret = False
    inp = input('>>>')
    if inp == '1':
        ret = True

    return ret


def run(n):
    con.acquire()
    con.wait_for(condition_func)
    print("run the thread: %s" %n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()

十3、定時器

  定時器:指定時間後,隔多少時間以後觸發去執行某操做。

from threading import Timer

def task(name):
    print("hello %s" % name)

t = Timer(5, task, args=('egon',))   # 建立對象,Timer是Thread的子類,其實就是一個線程
t.start()
# hello egon ---->在等五秒後打印
import random

def make_code(n=4):  # 設置默認值爲4
    res = ''
    for i in range(n):
        s1 = str(random.randint(0,9))   # 隨機數字字符
        s2 = chr(random.randint(65, 90))   # 隨機字母 ,注意瞭解chr()內置函數
        res += random.choice([s1, s2])
    return res

print(make_code())
"""
6HS8  \  6S38  ————》結果隨機產生
"""
定時器在驗證碼上的應用

  將定時器改寫爲類:

from threading import Timer
import random

class Code:
    def __init__(self):
        self.make_cache()

    def make_cache(self, interval=8):
        self.cache = self.make_code()   # 緩存驗證碼
        print(self.cache)
        self.t = Timer(interval, self.make_cache)   # 建立定時器,到時間刷新一次
        self.t.start()

    def make_code(self, n=4):  # 設置默認值爲4
        res = ''
        for i in range(n):
            s1 = str(random.randint(0,9))   # 隨機數字字符
            s2 = chr(random.randint(65, 90))   # 隨機字母 ,注意瞭解chr()內置函數
            res += random.choice([s1, s2])
        return res

    def check(self):
        while True:
            code = input("請輸入你的驗證碼>>:").strip()
            if code.upper() == self.cache:
                print("驗證碼輸入正確")
                self.t.cancel()
                break


obj = Code()
obj.check()
"""
E8I2
請輸入你的驗證碼>>:E8I2
驗證碼輸入正確
"""
驗證碼定時器

十4、線程queue

  queue隊列 :使用import queue,用法與進程Queue同樣

q.put方法用以插入數據到隊列中。
q.get方法能夠從隊列讀取而且刪除一個元素。

一、class queue.Queue(maxsize)   定義好隊列存數據的最大值,隊列存取規則是先進先出

import queue

q = queue.Queue(3)  # 生成隊列,存數據最大值是3  先進先出

q.put("first")   # 放值進去
q.put(2)
q.put("third")
# q.put(4)   # 隊列滿了,阻塞

print(q.get())   # 取數據
print(q.get())
print(q.get())
"""
first
2
third
"""
q = queue.Queue(3) 生成隊列,存數據最大值是3

  隊列放滿了繼續放,會形成阻塞;

  若是把默認的block=True改成False,程序會直接報錯:raise full;

  若是不修改block,而是設置timeout,程序會等到timeout時間以後報錯raise full。

  隊列爲空繼續取,和上述的狀況很是相似:

import queue

q = queue.Queue(3)  # 生成隊列,存數據最大值是3  先進先出

q.put("first")   # 放值進去
q.put(2)
q.put("third")
# q.put(4)   # 隊列滿了,阻塞
# q.put(4, block=False)   # 默認是block=True,    改成False後,隊列滿了還加數據,程序報錯raise Full queue.Full
# q.put(4, block=True, timeout=3)   # 設置了block=True隊列滿不會直接報錯了,可是還加上了timeout=3,程序會等3秒後提示報錯queue.Full

# 同理get()方法也有這些參數
print(q.get())   # 取數據
print(q.get())
print(q.get())
# print(q.get(block=False))   # 在隊列空,還取時,通常是卡住,但加入了block=False參數的話,會提示報錯queue.Empty
print(q.get_nowait())       # 這個效果同上
print(q.get(block=True, timeout=3))    # 隊列空,還取數據時,會按照timeout時間等待,到時間後提示queue.Empty
queue的put()和get()方法參數問題

二、class queue.LifeQueue(maxsize)   定義爲堆棧,堆棧的存取規則是後進先出

import queue

q = queue.LifoQueue(3)   # 堆棧
q.put("first")
q.put(2)
q.put("third")

print(q.get())  # third
print(q.get())  # 2
print(q.get())  # first
q = queue.LifoQueue(3) 堆棧,後進先出

三、class queue.PriorityQueue(maxsize)   存儲數據時可設置優先級的隊列

import queue

q = queue.PriorityQueue(3)

q.put(10, 'one')
q.put(40, 'two')
q.put(30, 'three')

print(q.get())   # 10
print(q.get())   # 30
print(q.get())   # 40
q = queue.PriorityQueue(3) 優先級隊列

  數字越小優先級越高(取出的優先級)。

十5、進程池和線程池

  在剛開始學多進程或多線程時,咱們火燒眉毛地基於多進程或多線程實現併發的套接字通訊:

# -*- coding:utf-8 -*-
__author__ = 'Qiushi Huang'

from socket import *
from threading import Thread

# 通信和創建連接分開,啓動不一樣的線程,你們是併發執行。
def communicate(conn):
    while True:
        try:
            data = conn.recv(1024)
            if not data:break
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()


def server(ip, port):
    server = socket(AF_INET, SOCK_STREAM)
    server.bind((ip, port))
    server.listen(5)

    while True:
        conn, addr = server.accept()   # 建連接
        t = Thread(target=communicate, args=(conn,))  # 建一個連接創一個線程
        t.start()
        # communicate(conn)

    server.close()


if __name__ == '__main__':
    server('127.0.0.1', 8091)   # 主線程

"""
這種解決方案的問題是:當客戶端愈來愈多後,線程也會愈來愈多,會帶來服務崩潰的問題。
"""
多線程實現併發套接字——服務端
# -*- coding:utf-8 -*-
__author__ = 'Qiushi Huang'

# 使用時,能夠一個程序運行屢次,這是多個不一樣的in
from socket import *

client = socket(AF_INET, SOCK_STREAM)
client.connect(("127.0.0.1", 8091))

while True:
    msg = input(">>").strip()
    if not msg:continue
    client.send(msg.encode("utf-8"))
    data = client.recv(1024)
    print(data.decode("utf-8"))

client.close()
多線程實現併發套接字——客戶端

  這種實現方式的致命缺陷是:服務的開啓的進程數或線程數都會隨着併發的客戶端數目地增多而增多,這會對服務端主機帶來巨大的壓力,甚至於不堪重負而癱瘓,因而咱們必須對服務端開啓的進程數或線程數加以控制,讓機器在一個本身能夠承受的範圍內運行,這就是進程池或線程池的用途,例如進程池,就是用來存放進程的池子,本質仍是基於多進程,只不過是對開啓進程的數目加上了限制。

  進程池和線程池的接口如出一轍,用法也徹底相同。池就是要對數目加以限制,保證機器一個可承受的範圍,以一個健康的狀態保證它的運行。

一、介紹:

官網:https://docs.python.org/dev/library/concurrent.futures.html

concurrent.futures   模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor: 線程池,提供異步調用
ProcessPoolExecutor: 進程池,提供異步調用
Both implement the same interface, which is defined by the abstract Executor class.

  基本方法:

一、submit(fn, *args, **kwargs)
異步提交任務

二、map(func, *iterables, timeout=None, chunksize=1) 
取代for循環submit的操做

三、shutdown(wait=True) 
至關於進程池的pool.close()+pool.join()操做
wait=True,等待池內全部任務執行完畢回收完資源後才繼續
wait=False,當即返回,並不會等待池內的任務執行完畢
但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢
submit和map必須在shutdown以前

四、result(timeout=None)
取得結果

5、add_done_callback(fn)
回調函數

二、進程池:

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os, time, random

def task(name):
    print("name: %s pid: %s run" % (name, os.getpid()))
    time.sleep(random.randint(1,3))


if __name__ == '__main__':
    pool = ProcessPoolExecutor(4)    # 指定進程池大小,最大進程數,若是不指定默認是CPU核數

    for i in range(10):
        """從始至終四個進程解決這10個任務,誰沒事了接新任務"""
        pool.submit(task, 'egon%s' %i)   # 提交任務的方式————異步調用:提交完任務,不用在原地等任務執行拿到結果。

    print("主進程")
"""
name: egon0 pid: 12445 run
name: egon1 pid: 12444 run
name: egon2 pid: 12446 run
name: egon3 pid: 12447 run
主進程
name: egon4 pid: 12445 run
name: egon5 pid: 12444 run
name: egon6 pid: 12446 run

name: egon7 pid: 12445 run
name: egon8 pid: 12446 run
name: egon9 pid: 12447 run
"""
pool = ProcessPoolExecutor(4) 指定進程池大小,最大進程數,若是不指定默認是CPU核數

  shutdown()方法的使用:

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os, time, random

def task(name):
    print("name: %s pid: %s run" % (name, os.getpid()))
    time.sleep(random.randint(1,3))


if __name__ == '__main__':
    pool = ProcessPoolExecutor(4)    # 指定進程池大小,最大進程數,若是不指定默認是CPU核數

    for i in range(10):
        """從始至終四個進程解決這10個任務,誰沒事了接新任務"""
        pool.submit(task, 'egon%s' %i)   # 提交任務的方式————異步調用:提交完任務,不用在原地等任務執行拿到結果。

    pool.shutdown()   # 把提交任務入口關閉,默認參數wait=True;同時還進行了pool.join()操做,等任務提交結束,再結束主進程

    print("主進程")
"""
name: egon0 pid: 12502 run
name: egon1 pid: 12503 run
name: egon2 pid: 12504 run
name: egon3 pid: 12505 run
name: egon4 pid: 12502 run
name: egon5 pid: 12503 run
name: egon6 pid: 12505 run
name: egon7 pid: 12504 run
name: egon8 pid: 12503 run
name: egon9 pid: 12505 run
主進程
"""
pool.shutdown() 把提交任務入口關閉,同時還進行了pool.join()操做,等任務提交結束,再結束主進程

三、針對線程的狀況:

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os, time, random

def task(name):
    print("name: %s pid: %s run" % (name, os.getpid()))
    time.sleep(random.randint(1,3))


if __name__ == '__main__':
    pool = ThreadPoolExecutor(4) 

    for i in range(10):
        """從始至終四個進程解決這10個任務,誰沒事了接新任務"""
        pool.submit(task, 'egon%s' %i)   # 提交任務的方式————異步調用:提交完任務,不用在原地等任務執行拿到結果。

    pool.shutdown(wait=True)   # 把提交任務入口關閉,默認參數wait=True;同時還進行了pool.join()操做,等任務提交結束,再結束主進程

    print("主進程")
"""
name: egon0 pid: 12528 run
name: egon1 pid: 12528 run
name: egon2 pid: 12528 run
name: egon3 pid: 12528 run
name: egon4 pid: 12528 run
name: egon5 pid: 12528 run
name: egon6 pid: 12528 run
name: egon7 pid: 12528 run
name: egon8 pid: 12528 run
name: egon9 pid: 12528 run
主進程
"""
pool = ThreadPoolExecutor(4) 建立線程池,其餘代碼和進程池同樣

  currentThread()方法查看線程名:

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import currentThread  # 查看線程名
import os, time, random

def task():
    print("name: %s pid: %s run" % (currentThread().getName(), os.getpid()))
    time.sleep(random.randint(1,3))


if __name__ == '__main__':
    pool = ThreadPoolExecutor(4)

    for i in range(10):
        """從始至終四個進程解決這10個任務,誰沒事了接新任務"""
        pool.submit(task,)   # 提交任務的方式————異步調用:提交完任務,不用在原地等任務執行拿到結果。

    pool.shutdown(wait=True)   # 把提交任務入口關閉,默認參數wait=True;同時還進行了pool.join()操做,等任務提交結束,再結束主進程

    print("主進程")
"""
name: <concurrent.futures.thread.ThreadPoolExecutor object at 0x10401af28>_0 pid: 12554 run
name: <concurrent.futures.thread.ThreadPoolExecutor object at 0x10401af28>_1 pid: 12554 run
name: <concurrent.futures.thread.ThreadPoolExecutor object at 0x10401af28>_2 pid: 12554 run
name: <concurrent.futures.thread.ThreadPoolExecutor object at 0x10401af28>_3 pid: 12554 run
name: <concurrent.futures.thread.ThreadPoolExecutor object at 0x10401af28>_2 pid: 12554 run
name: <concurrent.futures.thread.ThreadPoolExecutor object at 0x10401af28>_0 pid: 12554 run
name: <concurrent.futures.thread.ThreadPoolExecutor object at 0x10401af28>_3 pid: 12554 run
name: <concurrent.futures.thread.ThreadPoolExecutor object at 0x10401af28>_2 pid: 12554 run
name: <concurrent.futures.thread.ThreadPoolExecutor object at 0x10401af28>_1 pid: 12554 run
name: <concurrent.futures.thread.ThreadPoolExecutor object at 0x10401af28>_0 pid: 12554 run
主進程
"""
from threading import currentThread 查看線程名

四、map方法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    executor.map(task,range(1,12)) #map取代了for+submit
executor.map(task,range(1,12)) map取代了for+submit

五、異步調用和回調機制

  同步調用:提交完任務後,就在原地等待任務執行完畢,拿到執行結果,再執行下一行。

         致使程序是串行執行。

from concurrent.futures import ThreadPoolExecutor
import time
import random

def la(name):
    print("%s is laing" % name)
    time.sleep(random.randint(3,5))
    res = random.randint(7, 13)*'#'
    return {'name':name, 'res':res}

def weigh(shit):
    name = shit['name']
    size = len(shit['res'])
    print('%s 拉了 《%s》kg' % (name, size))

if __name__ == '__main__':
    pool = ThreadPoolExecutor(13)

    shit1 = pool.submit(la, 'alex').result()
    weigh(shit1)
    shit2 = pool.submit(la, 'wupeiqi').result()
    weigh(shit2)
    shit3 = pool.submit(la, "yuanhao").result()
    weigh(shit3)
"""
alex is laing
alex 拉了 《7》kg
wupeiqi is laing
wupeiqi 拉了 《9》kg
yuanhao is laing
yuanhao 拉了 《11》kg
"""
同步調用——沒有獲得結果前,調用不會返回

  異步調用:與同步相對,一個異步功能調用提交完任務後,調用者不會馬上獲得結果,而在完成後,經過狀態、通知或回調函數來通知調用者。

from concurrent.futures import ThreadPoolExecutor
import time
import random

def la(name):
    print("%s is laing" % name)
    time.sleep(random.randint(3,5))
    res = random.randint(7, 13)*'#'
    # return {'name':name, 'res':res}

    weigh({'name':name, 'res':res})   # 直接把字典傳給稱重weigh(),但形成了程序耦合

def weigh(shit):
    name = shit['name']
    size = len(shit['res'])
    print('%s 拉了 《%s》kg' % (name, size))

if __name__ == '__main__':
    pool = ThreadPoolExecutor(13)
    pool.submit(la, 'alex')
    pool.submit(la, 'wupeiqi')
    pool.submit(la, "yuanhao")
"""併發執行拉的任務,誰執行完,誰把結果傳給稱重功能。
alex is laing
wupeiqi is laing
yuanhao is laing
alex 拉了 《10》kg
wupeiqi 拉了 《7》kg
yuanhao 拉了 《7》kg
"""
異步調用——直接把字典傳給weigh()函數,但形成程序耦合

  能夠爲進程池或線程池內的每一個進程或線程綁定一個函數,該函數在進程或線程的任務執行完畢後自動觸發,並接收任務的返回值看成參數,該函數稱爲回調函數

from concurrent.futures import ThreadPoolExecutor
import time
import random

def la(name):
    print("%s is laing" % name)
    time.sleep(random.randint(3,5))
    res = random.randint(7, 13)*'#'
    return {'name':name, 'res':res}

    # weigh({'name':name, 'res':res})   # 直接把字典傳給稱重weigh(),形成了程序耦合

def weigh(shit):
    shit = shit.result()    # 對象.result()拿到結果並賦值給shit
    name = shit['name']
    size = len(shit['res'])
    print('%s 拉了 《%s》kg' % (name, size))

if __name__ == '__main__':
    pool = ThreadPoolExecutor(13)
    # 回調函數,前面任務執行完,return返回值就會自動觸發weith功能執行,把pool.submit(la, 'alex')對象當作參數傳給weigh()
    pool.submit(la, 'alex').add_done_callback(weigh)
    pool.submit(la, 'wupeiqi').add_done_callback(weigh)
    pool.submit(la, "yuanhao").add_done_callback(weigh)
"""
alex is laing
wupeiqi is laing
yuanhao is laing
alex 拉了 《10》kg
wupeiqi 拉了 《7》kg
yuanhao 拉了 《7》kg
"""
add_done_callback(weigh) 回調函數,前面任務執行完,返回值自動觸發weight功能

阻塞和非阻塞的區別?
  阻塞是進程運行的一種狀態,遇到I/O就會進入阻塞狀態,會被剝奪走CPU的執行權限。
  阻塞調用在調用結果返回前,當前線程會被掛起。直到獲得返回結果,纔會將阻塞線程激活。
  非阻塞調用:指在不能馬上獲得結果以前也會馬上返回,同時函數不會阻塞當前線程。

阻塞和同步調用的區別?
  同步調用:調用會一直等待,直到任務返回結果爲止,即便被搶走cpu也是處於就緒狀態。
  阻塞調用:socket工做在阻塞模式時,若是沒有數據下調用recv(),當前進程掛起,直到有數據爲止。

  同步異步針對的是函數/任務的調用方式;阻塞非阻塞針對的是進程或線程。

# -*- coding:utf-8 -*-
__author__ = 'Qiushi Huang'

# 多I/O的問題採用線程池
from concurrent.futures import ThreadPoolExecutor
import requests
import time

def get(url):
    print('get %s' % url)
    # requests.get()就是目標頁面下載一個文件到本地來
    response = requests.get(url)  # 對象
    time.sleep(3)   # 模擬網絡延遲
    # print(response.text)    # 網頁內容
    return {'url':url, 'content':response.text}


def paese(res):  # 解析,正則表達式
    res = res.result()
    print('%s parse res is %s' % (res['url'], len(res['content'])))



if __name__ == '__main__':
    urls = [
        'http://www.cnblogs.com/linhaifeng',
        'http://www.python.org',
        'http://www.openstack.org'
    ]

    pool = ThreadPoolExecutor(2)

    for url in urls:
        pool.submit(get, url).add_done_callback(paese)   # 回調函數
"""
get http://www.cnblogs.com/linhaifeng
get http://www.python.org    ——————》明顯的等的效果
http://www.cnblogs.com/linhaifeng parse res is 16320
get http://www.openstack.org
http://www.python.org parse res is 49014
http://www.openstack.org parse res is 63429
"""
進程池線程池練習

  多IO問題採用線程池。

  由上例能夠分析出異步調用加回調機制使用的場景。

六、線程池優化實現併發的套接字

# -*- coding:utf-8 -*-
__author__ = 'Qiushi Huang'


"""
原先多線程解決方案的問題是:當客戶端愈來愈多後,線程也會愈來愈多,會帶來服務崩潰的問題。
    不該該隨着客戶端數量增長不斷地增長線程,須要基於線程池實現,限制線程數量
"""
from socket import *
from concurrent.futures import ThreadPoolExecutor


def communicate(conn):
    while True:
        try:
            data = conn.recv(1024)
            # if not data:continue   # 這裏卡了好久,須要注意,這種狀況下關閉客戶端,線程池沒有減小
            # if data.decode('utf-8') == 'q':break  # 測試,這種狀況下,線程池減小,新進程加入進程池
            if not data:break
            conn.send(data.upper())

        except ConnectionResetError:
            break
    conn.close()


def server(ip, port):
    server = socket(AF_INET, SOCK_STREAM)
    server.bind((ip, port))
    server.listen(5)

    while True:
        conn, addr = server.accept()   # 建連接
        pool.submit(communicate, conn)

    server.close()


if __name__ == '__main__':
    pool = ThreadPoolExecutor(2)  # 通常寫機器可承受的範圍內
    server('127.0.0.1', 8092)   # 主線程
服務端
# -*- coding:utf-8 -*-
__author__ = 'Qiushi Huang'

from socket import *

client = socket(AF_INET, SOCK_STREAM)
client.connect(("127.0.0.1", 8092))

while True:
    msg = input(">>").strip()
    if not msg:continue
    client.send(msg.encode("utf-8"))
    data = client.recv(1024)
    print(data.decode("utf-8"))

client.close()
客戶端
相關文章
相關標籤/搜索