python筆記9 線程進程 threading多線程模塊 GIL鎖 multiprocessing多進程模塊 同步鎖Lock 隊列queue IO模型

線程與進程

進程

進程就是一個程序在一個數據集上的一次動態執行過程。進程通常由程序、數據集、進程控制塊三部分組成。咱們編寫的程序用來描述進程要完成哪些功能以及如何完成;數據集則是程序在執行過程當中所須要使用的資源;進程控制塊用來記錄進程的外部特徵,描述進程的執行變化過程,系統能夠利用它來控制和管理進程,它是系統感知進程存在的惟一標誌。python

線程

線程的出現是爲了下降上下文切換的消耗,提升系統的併發性,並突破一個進程只能幹同樣事的缺陷,使到進程內併發成爲可能。linux

進程和線程的關係:

(1)一個線程只能屬於一個進程,而一個進程能夠有多個線程,但至少有一個線程。
(2)資源分配給進程,同一進程的全部線程共享該進程的全部資源。
(3)CPU分給線程,即真正在CPU上運行的是線程。安全

串行 並行 併發

多年前單核cpu同時執行兩個程序就是併發執行.網絡

同步異步

在計算機領域,同步就是指一個進程在執行某個請求的時候,若該請求須要一段時間才能返回信息,那麼這個進程將會一直等待下去,直到收到返回信息才繼續執行下去;異步是指進程不須要一直等下去,而是繼續執行下面的操做,無論其餘進程的狀態。當有消息返回時系統會通知進程進行處理,這樣能夠提升執行的效率。舉個例子,打電話時就是同步通訊,發短息時就是異步通訊。多線程

 進程實例

from multiprocessing import Process
import threading
import os
import time
def info(name):
    print('父進程:',os.getppid())           #打印當前調用函數進程的父進程號
    print('進程',os.getpid(),name)          #打印當前調用函數進程的進程號
    print('----------------------------')
if __name__ == '__main__':                   #判斷是被調用仍是在當前文件執行
    info('主線程')                            #先打印當前進程的進程號與打印當前調用函數進程的父進程
    p1 = Process(target=info,args=('多進程1',))    #開多進程1打印子進程的進程號與父進程的進程號
    p2 = Process(target=info,args=('多進程2',))  #開多進程2打印子進程的進程號與父進程的進程號
    p1.start()
    p2.start()


父進程: 17724 #父進程是pycharm的進程號
進程 12128 主線程                 #當前進程號是pycharm調用python.exe的進程號 ----------------------------
父進程: 12128 #子進程的父進程號是朱金成浩     
進程 15080 多進程2          
----------------------------
父進程: 12128 #主進程下開多個子進程,共有的是一個主進程號
進程 18476 多進程1
---------------------------

threading模塊

普通執行腳本時 是串行 

import time

def coun(n):
    print('running on number: %s' % n)
    time.sleep(n)

start = time.time()
coun(3)
coun(2)
print('用時:%s秒' % (time.time() - start))     #串行,從上到下依次執行,執行完共耗時5秒

running on number: 3
running on number: 2
用時:5.0006632804870605秒

多線程

import time
import threading           #開多線程的模塊

def coun(n):
    print('running on number: %s' % n)
    time.sleep(n)


start = time.time()
t1 = threading.Thread(target=coun,args=(3,))    #此處 開第一個子線程實傳入參數3     此處仍是主線程執行的
t2 = threading.Thread(target=coun,args=(2,))    #此處 開第二個子線程實傳入參數2     此處也是主線程執行的
t1.start()                                      #到此處執行子線程1
t2.start()                               #到此處執行子線程2
print('用時:%s秒' % (time.time() - start))      #整個代碼除了 t1.start() t2.start()是子線程執行的其餘都是主線程執行的


running on number: 3
running on number: 2
用時:0.000995635986328125秒                   #主線程並無 執行函數,所以主線程耗時0.0秒 函數由子線程執行

當啓動子線程時,子線程和主線程 併發着一塊兒執行併發

join()

在子線程運行完以前,這個子線程的主線程一直被阻塞,等子線程運行完在繼續運行主線程app

import time
import threading     #開多線程的模塊

def coun(n):
    print('running on number: %s' % n)
    time.sleep(n)
    print('執行結束')


start = time.time()
t1 = threading.Thread(target=coun,args=(3,)) #此處 開第一個子線程實傳入參數3
t2 = threading.Thread(target=coun,args=(2,)) #此處 開第二個子線程實傳入參數3
t1.start()                     #到此處執行子線程1
t2.start()                     #到此處執行子線程2
t1.join()             #在t1運行完以前,主線程一直被阻塞       
t2.join()             #在t2運行完以前,主線程一直被阻塞 print('用時:%s秒' % (time.time() - start))    #只有在t1和t2共同 運行完以後才繼續運行主線程  

running on number: 3
running on number: 2
執行結束
執行結束
用時:3.002174139022827秒   
循環多個子線程並在最慢的運行完成後結束主線程的方法:
import time
import threading     # 開多線程的模塊


def c(a):
    print('%s開始' %(a))
    if a == 5:                  #若是是5等待1秒
        time.sleep(1)
        print('%s運行完成'% (a))
    elif a == 1:                 #若是是1等待5秒
        time.sleep(5)
        print('%s運行完成' % (a))
    else:
        time.sleep(a)
        print('%s運行完成' % (a))

threadd_list = []
d = time.time()
for i in range(1, 6):                  #循環開新的子進程  
    t = threading.Thread(target=c,args=(i,))
    threadd_list.append(t)            #將每一個子線程的t加入到列表
    t.start()                           #啓動子進程
for i in threadd_list:                  
    i.join()                        #循環等待全部子進程結束在繼續運行

print('共用時%s' % (time.time() - d))   #共耗時 5秒,最慢的線程5秒
  

1開始
2開始
3開始
4開始
5開始
5運行完成
2運行完成
3運行完成
4運行完成
1運行完成
共用時5.00162410736084dom

錯誤方法1:異步

import time
import threading  # 開多線程的模塊


def c(a):
    print('%s開始' %(a))
    if a == 5:                  #若是是5等待1秒
        time.sleep(1)
        print('%s運行完成'% (a))
    elif a == 1:                 #若是是1等待5秒
        time.sleep(5)
        print('%s運行完成' % (a))
    else:
        time.sleep(a)
        print('%s運行完成' % (a))

threadd_list = []
d = time.time()
for i in range(1, 6):                  #循環開新的子進程
    t = threading.Thread(target=c,args=(i,))
    threadd_list.append(t)
    t.start()                           #啓動子進程
    t.join()                            #這樣在循環裏join()會在每次循環時都要等待,結果和串行同樣 print('共用時%s' % (time.time() - d))   #共耗時

1開始
1運行完成
2開始
2運行完成
3開始
3運行完成
4開始
4運行完成
5開始
5運行完成
共用時15.006914854049683

錯誤方法2:socket

import time
import threading  # 開多線程的模塊


def c(a):
    print('%s開始' %(a))
    if a == 5:                  #若是是5等待1秒
        time.sleep(1)
        print('%s運行完成'% (a))
    elif a == 1:                 #若是是1等待5秒
        time.sleep(5)
        print('%s運行完成' % (a))
    else:
        time.sleep(a)
        print('%s運行完成' % (a))

threadd_list = []
d = time.time()
for i in range(1, 6):                  #循環開新的子進程
    t = threading.Thread(target=c,args=(i,))
    threadd_list.append(t)
    t.start()                           #啓動子進程
t.join()                #放在循環外,當循環結束時,會執行最後循環的t ,最後的是5對應是1秒 print('共用時%s' % (time.time() - d))   #共耗時

1開始
2開始
3開始
4開始
5開始
5運行完成
共用時1.0022823810577393
2運行完成
3運行完成
4運行完成
1運行完成

setDaemon(True)

將線程聲明爲守護線程,必須在start()方法以前設置,做用是當主線程運行完成後,無論子線程是否運行完,都隨主線程一塊兒退出,與join的做用相反,join是主線程等待子線程結束在繼續往下執行

import time
import threading  # 開多線程的模塊


def c(a):
    print('%s開始' %(a))
    if a == 5:                  #若是是5等待1秒
        time.sleep(1)
        print('%s運行完成'% (a))
    elif a == 1:                 #若是是1等待5秒
        time.sleep(5)
        print('%s運行完成' % (a))
    else:
        time.sleep(a)
        print('%s運行完成' % (a))

threadd_list = []
d = time.time()
for i in range(1, 6):                  #循環開新的子進程
    t = threading.Thread(target=c,args=(i,))
    threadd_list.append(t)
    t.setDaemon(True)            #主程序結束子程序結束
    t.start()                           #啓動子進程

print('共用時%s' % (time.time() - d))   

1開始 2開始 3開始 4開始 5開始 共用時0.0019898414611816406      #主程序共耗時0秒,主程序結束子程序也結束

當有多個子線程而只設置一個守護線程則沒有意義,

由於其餘子線程並無運行完,因此主線程並無真正意義上的運行完,只有全部的子線程都是守護線程才能在主線程運行完直接所有結束,

不然只要有一個子線程在運行都要繼續等待.

import time
import threading  # 開多線程的模塊


def c(a):
    print('%s開始' %(a))
    if a == 5:                  #若是是5等待1秒
        time.sleep(1)
        print('%s運行完成'% (a))
    elif a == 1:                 #若是是1等待5秒
        time.sleep(5)
        print('%s運行完成' % (a))
    else:
        time.sleep(a)
        print('%s運行完成' % (a))

threadd_list = []
d = time.time()
for i in range(1, 6):                  #循環開新的子進程
    t = threading.Thread(target=c,args=(i,))
    threadd_list.append(t)
    if i == 1:                        #只開啓1的守護線程,
        t.setDaemon(True)            
    t.start()                           #啓動子進程

print('共用時%s' % (time.time() - d))   #共耗時

1開始
2開始
3開始
4開始
5開始
共用時0.0009970664978027344
5運行完成
2運行完成
3運行完成
4運行完成                         #此處缺乏1運行完成,由於當1運行完時以前其餘全部子線程都運行結束了,也意味着主線程結束了,所以1與主線程一塊兒結束了

GIL鎖

因爲GIL鎖的存在,python不能並行,只能併發, 所以python不能算是真正意義上的多線程並行運算.,只能是佔用一個cpu的一個線程的併發運算

所以,對於密集型運算,python不適用 threading多線程模塊

密集型計算串行:

import time
import threading  # 開多線程的模塊

def aa():
    a = 1
    for i in range(1,100000):
        a *= i
def bb():
    a = 1
    for i in range(1,100000):
        a *= i
c = time.time()
aa()
bb()
print('耗時%s' %(time.time() - c)) 

耗時5.802525758743286                #密集型並行計算總耗時5.8秒

密集型threading多線程計算 併發:

import time
import threading  # 開多線程的模塊

def aa():
    a = 1
    for i in range(1,100000):
        a *= i
def bb():
    a = 1
    for i in range(1,100000):
        a *= i
c = time.time()
t1 = threading.Thread(target=aa)
t2 = threading.Thread(target=bb)
t1.start()
t2.start()
t1.join()
t2.join()
print('耗時%s' %(time.time() - c))
 
耗時6.302755832672119     #密集型threading多線程計算併發  耗時6.3秒 , 計算量越大差距越大 

對於密集型計算的開啓multiprocessing多進程會快 ,可是咱們不可能無限量開進程

multiprocessing多進程模塊

def aa():
    a = 1
    for i in range(1, 100000):
        a *= i


def bb():
    a = 1
    for i in range(1, 100000):
        a *= i
if __name__ == '__main__':    #多進程模塊必須這樣寫if判斷,不然報錯 import time
    import multiprocessing  # 引用多進程模塊
    c = time.time()
    t1 = multiprocessing.Process(target=aa)   #和多線程用法基本一致
    t2 = multiprocessing.Process(target=bb)
    t1.start()
    t2.start()
    t1.join()                       #也有join和setDaemon()方法
t2.join()
print('耗時%s' %(time.time() - c))
    耗時4.506261587142944 

   同步鎖Lock

同步鎖保證數據的安全,當一個線程獲取這把鎖後,只有他釋放了其餘線程才能獲取繼續執行,以此類推

不加同步鎖:

import threading
import time
def jnum():
    global num               
    xx = num               #每次代碼走到這裏都把num的值賦值給xx,由於是多線程,每一個線程在循環時,在0.1秒內就循環完成了 所以每一個線程的值都是10
    time.sleep(0.1)        
    num = xx - 1           #每一個線程都是執行10-1

num = 10
thread_list = []
for i in range(10):
    f1 = threading.Thread(target=jnum,)
    thread_list.append(f1)
    f1.start()                       
for i in thread_list:
    i.join()
print(num)

9

加同步鎖:

import threading
import time
R=threading.Lock()      #同步鎖
def jnum():
    R.acquire()          #加鎖,每一個鎖只能給一個線程, 當以一個線程運行到這裏獲取到鎖時,日後的線程會處在等待狀態,等待第一個鎖釋放時在獲取鎖,以此類推. global num
    xx = num
    time.sleep(0.1)
    num = xx - 1
    R.release()          #解鎖

num = 10
thread_list = []
for i in range(10):
    f1 = threading.Thread(target=jnum,)
    thread_list.append(f1)
    f1.start()
for i in thread_list:
    i.join()
print(num)
    
0

隊列queue

import queue       #隊列
q = queue.Queue()        #建立隊列,默認先進先出模式:FIFO 
q.put(111)        #put存
q.put(222)
q.put(333)
print(q.get())    #get取
print(q.get())   
print(q.get())    
print(q.get())    #無值阻塞,等待新的值進入

maxsize

import queue       #隊列
q = queue.Queue(maxsize=3)        #建立隊列,默認先進先出模式:FIFO maxsize=3 只能存3個值,若是要在往裏存,須要取出來才能存

q.put(111)        #put存
q.put(222)
q.put(333)
print(q.get())    #get取
print(q.get())
print(q.get())
print(q.get())    #無值阻塞,等待新的值進入
import queue       #隊列
q = queue.Queue(maxsize=3)        #建立隊列,默認先進先出模式:FIFO

q.put(111)        #put存
q.put(222)
q.put(333)
q.put(444,False)          #加False當存滿後又有值進來會報錯
print(q.get())    #get取
print(q.get())
print(q.get())
print(q.get())    #無值阻塞,等待新的值進入
Traceback (most recent call last): File
"F:/python24期/L010-老男孩教育-Python24期VIP視頻-mp4/練習/lianxi.py", line 8, in <module> q.put(444,False) #加False當存滿後又有值進來會報錯 File "C:\Python36\lib\queue.py", line 130, in put raise Full queue.Full
import queue       #隊列
q = queue.Queue(maxsize=3)        #建立隊列,默認先進先出模式:FIFO

q.put(111)        #put存
q.put(222)
q.put(333)

print(q.get())    #get取
print(q.get())
print(q.get())
print(q.get(block=False))    #加block=False 當取完時 再取會報錯

111
Traceback (most recent call last):
222
  File "F:/python24期/L010-老男孩教育-Python24期VIP視頻-mp4/練習/lianxi.py", line 12, in <module>
333
    print(q.get(block=False))    #無值阻塞,等待新的值進入
  File "C:\Python36\lib\queue.py", line 161, in get raise Empty
queue.Empty
此包中的經常使用方法(q = Queue.Queue()):
q.qsize() 返回隊列的大小 q.empty() 若是隊列爲空,返回True,反之False q.full() 若是隊列滿了,返回True,反之False q.full 與 maxsize 大小對應 q.get([block[, timeout]]) 獲取隊列,timeout等待時間 q.get_nowait() 至關q.get(False)非阻塞
q.put(item) 寫入隊列,timeout等待時間 q.put_nowait(item) 至關q.put(item, False) q.task_done() 在完成一項工做以後,q.task_done() 函數向任務已經完成的隊列發送一個信號 q.join() 實際上意味着等到隊列爲空,再執行別的操做

LifoQueue建立隊列後進先出

import queue       #隊列
q = queue.LifoQueue()            #後進先出隊列
q.put(111)        #put存
q.put(222)
q.put(333)

print(q.get())    #get取
print(q.get())
print(q.get())

333
222
111

queue.PriorityQueue建立優先級隊裏

import queue          #隊列
q = queue.PriorityQueue()       #建立優先級隊裏 根據優先級取
q.put([5,111])        #put存
q.put([7,222])
q.put([1,333])

print(q.get())    #get取
print(q.get())
print(q.get())

[1, 333]
[5, 111]
[7, 222]

生產者消費者模型

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

這就像,在餐廳,廚師作好菜,不須要直接和客戶交流,而是交給前臺,而客戶去飯菜也不須要不找廚師,直接去前臺領取便可,這也是一個結耦的過程。

import random, time
import queue, threading

q = queue.Queue()

 
def chushi(name):                           #廚師作包子
    count = 0
    while count < 10:                         #作十次
        print('making.........') 
        s = random.randrange(3)             #隨機生成3之內的隨機數
        time.sleep(s)
        q.put(count)                         #把生成的包子放到消息隊列,等待客人來取
        print('%s製做了%s個包子' % (name, count))
        count += 1
        print('OK........')


def keren(name):
    count = 0
    while count < 10:
        print('%s來了' %(name))
        s = random.randrange(4)
        time.sleep(s)
        print('%s等待了%s秒' %(name,s))
        if not q.empty():
            data = q.get()
            print('%s吃了%s個包子'%(name,data))
        else:
            print('沒有包子')
        count += 1
p = threading.Thread(target=chushi,args=('庖丁',))
c = threading.Thread(target=keren,args=('小張',))
c1 = threading.Thread(target=keren,args=('小王',))
c2 = threading.Thread(target=keren,args=('小李',))
p.start()
c.start()
c1.start()
c2.start()

IO模型

linux下的 network IO.

 1.阻塞模型    blocking IO

全程阻塞

當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:準備數據。對於network io來講,不少時候數據在一開始尚未到達(好比,尚未收到一個完整的UDP包),這個時候kernel就要等待足夠的數據到來。而在用戶進程這邊,整個進程會被阻塞。當kernel一直等到數據準備好了,它就會將數據從kernel中拷貝到用戶內存,而後kernel返回結果,用戶進程才解除block的狀態,從新運行起來。
因此,blocking IO的特色就是在IO執行的兩個階段都被block了。

咱們常寫的都拿基本都是阻塞模型

 2.非阻塞模型   nonblocking IO

當用戶進程發出read操做時,若是kernel中的數據尚未準備好,那麼它並不會block用戶進程,而是馬上返回一個error。從用戶進程角度講 ,它發起一個read操做後,並不須要等待,而是立刻就獲得了一個結果。用戶進程判斷結果是一個error時,它就知道數據尚未準備好,因而它能夠再次發送read操做。一旦kernel中的數據準備好了,而且又再次收到了用戶進程的system call,那麼它立刻就將數據拷貝到了用戶內存,而後返回。因此,用戶進程實際上是須要不斷的主動詢問kernel數據好了沒有。

在網絡IO時候,非阻塞IO也會進行recvform系統調用,檢查數據是否準備好,與阻塞IO不同,」非阻塞將大的整片時間的阻塞分紅N多的小的阻塞, 因此進程不斷地有機會 ‘被’ CPU光顧」。即每次recvform系統調用之間,cpu的權限還在進程手中,這段時間是能夠作其餘事情的,

      也就是說非阻塞的recvform系統調用調用以後,進程並無被阻塞,內核立刻返回給進程,若是數據還沒準備好,此時會返回一個error。進程在返回以後,能夠乾點別的事情,而後再發起recvform系統調用。重複上面的過程,循環往復的進行recvform系統調用。這個過程一般被稱之爲輪詢。輪詢檢查內核數據,直到數據準備好,再拷貝數據到進程,進行數據處理。須要注意,拷貝數據整個過程,進程仍然是屬於阻塞的狀態。

服務端

import time
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.setsockopt
sk.bind(('127.0.0.1',6667))
sk.listen(5)
sk.setblocking(False)    #非阻塞            
while True:
    try:
        print ('waiting client connection .......')
        connection,address = sk.accept()   # 進程主動輪詢
        print("+++",address)
        client_messge = connection.recv(1024)
        print(str(client_messge,'utf8'))
        connection.close()
    except Exception as e:
        print (e)
        time.sleep(4)

客戶端

import time
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

while True:
    sk.connect(('127.0.0.1',6667))
    print("hello")
    sk.sendall(bytes("hello","utf8"))
    time.sleep(2)
    break

優勢:可以在等待任務完成的時間裏幹其餘活了(包括提交其餘任務,也就是 「後臺」 能夠有多個任務在同時執行)。

缺點:任務完成的響應延遲增大了,由於每過一段時間纔去輪詢一次read操做,而任務可能在兩次輪詢之間的任意時間完成。這會致使總體數據吞吐量的下降。

 3 IO多復路   IO multiplexing(select,poll,epoll)  ****

服務端

import socket
import select
sk=socket.socket()
sk.bind(("127.0.0.1",8800))
sk.listen(5)
sk.setblocking(False)
inputs=[sk,]         #建立一個列表存放套接字,第一個列表放socket對象sk. while True:
    r,w,e=select.select(inputs,[],[],5)        #監聽事件驅動 一但inputs列表裏有變更,就將變更的值賦給r,若是有連接進來就是sk有變更,r就等於sk. for obj in r:
        if obj==sk:                      #若是r等於sk .
            conn,add=obj.accept()        #取sk的conn
            inputs.append(conn)  #將conn添加至列表 此時因sk變更發生的執行結束,而且將conn加入到inputs列表,所以列表有變更,變更爲conn,conn賦值給r繼續執行. else:   #此時r爲conn不是sk 執行else

            data_byte=obj.recv(1024)             #如下爲正常socket的tcp鏈接
            print(str(data_byte,'utf8'))
            if not data_byte:          #在linux系統時,若是強行關掉了客戶端 客戶端的返回值是空
                inputs.remove(obj)     #刪除此時的客戶端的conn鏈接 continue
            inp=input('回答%s: >>>'%inputs.index(obj))
            obj.sendall(bytes(inp,'utf8'))

客戶端

import socket
sk=socket.socket()
sk.connect(('127.0.0.1',8800))

while True:
    inp=input(">>>>")   # how much one night?
    sk.sendall(bytes(inp,"utf8"))
    data=sk.recv(1024)
    print(str(data,'utf8'))

 4. 異步    asynchronous IO

相關文章
相關標籤/搜索