Python 7 多線程及進程

進程與線程:

進程的概念:

一、程序的執行實例稱爲進程。
二、每一個進程都提供執行程序所需的資源。一個進程有一個虛擬地址空間、可執行代碼、對系統對象的開放句柄、一個安全上下文、一個獨特的進程標識符、環境變量、一個優先級類、最小和最大工做集大小,
以及至少一個執行線程。每一個進程以一個線程開始,一般稱爲主線程,但能夠從它的任何線程建立額外的線程。 三、程序並不能單獨運行,只有將程序裝載到內存中,系統爲它分配資源才能運行,而這種執行的程序就稱之爲進程。程序和進程的區別就在於:程序是指令的集合,它是進程運行的靜態描述文本;
進程是程序的一次執行活動,屬於動態概念。 四、在多道編程中,咱們容許多個程序同時加載到內存中,在操做系統的調度下,能夠實現併發地執行。這是這樣的設計,大大提升了CPU的利用率。進程的出現讓每一個用戶感受到本身獨享CPU,
所以,進程就是爲了在CPU上實現多道編程而提出的。

有了進程爲何還要線程?python

   進程有不少優勢,它提供了多道編程,讓咱們感受咱們每一個人都擁有本身的CPU和其餘資源,能夠提升計算機的利用率。不少人就不理解了,既然進程這麼優秀,爲何還要線程呢?
其實,仔細觀察就會發現進程仍是有不少缺陷的,主要體如今兩點上: 一、進程只能在一個時間幹一件事,若是想同時幹兩件事或多件事,進程就無能爲力了。 二、進程在執行的過程當中若是阻塞,例如等待輸入,整個進程就會掛起,即便進程中有些工做不依賴於輸入的數據,也將沒法執行。

 線程的概念:

一、線程是操做系統可以進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務
二、線程是執行上下文,它是CPU執行指令流所需的所有信息。
三、假設你正在讀一本書,如今你想休息一下,但你但願可以回來,從你停下來的確切地點恢復閱讀。一個實現的方法就是記下頁碼,行數和字數。因此你閱讀書的執行上下文是這3個數字。
若是你有一個室友,並且她使用相同的技巧,她能夠在你不使用的時候拿書,而後從她中止閱讀的地方繼續閱讀。而後你能夠收回它,從你原來的地方恢復它。
四、線程以一樣的方式工做。CPU給了你一個錯覺,那就是它在同時進行多個運算。它經過在每一個計算上花費一點時間來實現這一點。它能夠這樣作,由於它爲每一個計算都有一個執行上下文。
就像你能夠和朋友共享一本書同樣,許多任務能夠共享一個CPU。 五、在更高的技術層面上,執行上下文(所以線程)由CPU寄存器的值組成。
最後:線程不一樣於進程。線程是執行的上下文,而進程是與計算相關聯的一堆資源。一個進程能夠有一個或多個線程。 說明:與進程相關的資源包括內存頁(進程中的全部線程對內存具備相同的視圖)、文件描述符(例如,打開套接字)和安全憑據(例如啓動進程的用戶ID)。

進程與線程的區別:

1、線程共享建立它的進程的地址空間;進程有本身的地址空間。
2、線程直接訪問進程的數據段;進程擁有父進程的數據段的自身副本。
3、線程能夠直接與其餘線程的過程;過程必須使用進程間通訊與兄弟姐妹的過程。
4、很容易建立新線程;新進程須要重複父進程。
5、線程能夠對相同進程的線程進行至關的控制;進程只能對子進程進行控制。
六、對主線程的更改(取消、優先級更改等)可能影響進程的其餘線程的行爲;對父進程的更改不會影響子進程。

 Python threading模塊:

直接調用:
import
threading import time def sayhi(num): print("is number %s"%num) time.sleep(2) if __name__ == "__main__": t1 = threading.Thread(target=sayhi,args=(1,)) t2 = threading.Thread(target=sayhi,args=(1,)) t1.start() t2.start() print(t1.getName()) print(t2.getName())
輸出:

     is number 1
     is number 1
     Thread-1
     Thread-2程序員

繼承調用:
import
threading,time class Thread_class(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) self.num = num def run(self): print("is run on %d"%self.num) time.sleep(2) if __name__ == '__main__': t1 = Thread_class(1) t2 = Thread_class(2) t1.start() t2.start()
輸出:
is run on 1
is run on 2

Join 和 Daemon數據庫

一些線程作後臺任務,如發送實時數據包,或進行按期的垃圾收集,或什麼的。只有當主程序運行時,這些纔有用,一旦另外一個非守護進程線程退出,就能夠把它們殺掉。
沒有守護線程,您必須跟蹤它們,並告訴它們在程序徹底退出以前退出。經過將它們設置爲守護線程,可讓它們運行並忘記它們,而且當程序退出時,任何守護線程都會自動終止。編程

thread.join()  #等待線程執行完再向後執行數組

thread.setDaemon(True)  #將設爲守護線程安全

import threading,time
def run(num):
    print("is run %d" %num)
    time.sleep(2)
    print("is ok")
def main():
    for i in range(5):
        t = threading.Thread(target=run,args=(i,))
        t.start()
        t.join()
        print("thread is done %s"%str(t.getName()))
m = threading.Thread(target=main,args=[])
m.setDaemon(True) #將main線程設置爲Daemon線程,它作爲程序主線程的守護線程,當主線程退出時,m線程也會退出,由m啓動的其它子線程會同時退出,無論是否執行完任務
m.start()
m.join(timeout=5)  
輸出:
    is run 0
    is ok
    thread is done Thread-2
    is run 1
    is ok
    thread is done Thread-3
    s run 2

注意:守護線程在關閉時忽然中止。它們的資源(如打開的文件、數據庫事務等)可能沒法正常釋放。服務器

線程鎖(互斥鎖Mutex)併發

一個進程下能夠啓動多個線程,多個線程共享父進程的內存空間,也就意味着每一個線程能夠訪問同一份數據,此時,若是2個線程同時要修改同一份數據,會出現什麼情況?app

import time
import threading
def addNum():
    global num  # 在每一個線程中都獲取這個全局變量
    time.sleep(1)
    num -= 1  # 對此公共變量進行-1操做
    print('--get num:', num)
num = 100  # 設定一個共享變量
thread_list = []
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
for t in thread_list:  # 等待全部線程執行完畢
    t.join()
print('final num:', num)

正常來說,這個num結果應該是0, 但在python 2.7上多運行幾回,會發現,最後打印出來的num結果不老是0,爲何每次運行的結果不同呢? 哈,很簡單,假設你有A,B兩個線程,此時都 要對num 進行減1操做, 因爲2個線程是併發同時運行的,因此2個線程頗有可能同時拿走了num=100這個初始變量交給cpu去運算,當A線程去處完的結果是99,但此時B線程運算完的結果也是99,兩個線程同時CPU運算的結果再賦值給num變量後,結果就都是99。那怎麼辦呢? 很簡單,每一個線程在要修改公共數據時,爲了不本身在還沒改完的時候別人也來修改此數據,能夠給這個數據加一把鎖, 這樣其它線程想修改此數據時就必須等待你修改完畢並把鎖釋放掉後才能再訪問此數據。 dom

*注:不要在3.x上運行,不知爲何,3.x上的結果老是正確的,多是自動加了鎖

加鎖:
import
time import threading def addNum(): global num #在每一個線程中都獲取這個全局變量 print('--get num:',num ) time.sleep(1) lock.acquire() #修改數據前加鎖 num -=1 #對此公共變量進行-1操做 lock.release() #修改後釋放 num = 100 #設定一個共享變量 thread_list = [] lock = threading.Lock() #生成全局鎖 for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待全部線程執行完畢 t.join() print('final num:', num )

GIL 與 Lock

Python已經有一個GIL來保證同一時間只能有一個線程來執行了,爲何這裏還須要lock? 注意啦,這裏的lock是用戶級的lock,跟那個GIL不要緊 ,具體關係以下圖:

RLock(遞歸鎖)

import threading,time
def run1():
    print("grab the first part data")
    lock.acquire()
    global num
    num +=1
    lock.release()
    return num
def run2():
    print("grab the second part data")
    lock.acquire()
    global  num2
    num2+=1
    lock.release()
    return num2
def run3():
    lock.acquire()
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    lock.release()
    print(res,res2)
 
if __name__ == '__main__':
    num,num2 = 0,0
    lock = threading.RLock()
    for i in range(10):
        t = threading.Thread(target=run3)
        t.start()
while threading.active_count() != 1:
    print(threading.active_count())
else:
    print('----all threads done---')
    print(num,num2)

Semaphore(信號量)

互斥鎖 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 。

import threading,time
def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("run the thread is %s"%n)
    semaphore.release()
if __name__ == '__main__':
    num = 0
    semaphore = threading.BoundedSemaphore(5)
    for i in range(20):
        t = threading.Thread(target=run,args=(i,))
        t.start()
while threading.active_count() != 1:
    pass
else:
    print("___all thread is done___")
    print(num)

Timer  

此類表示僅在通過必定數量的時間以後才能運行的操做。

def hello():
    print("hello, world")
t = Timer(30.0, hello)
t.start()  # after 30 seconds, "hello, world" will be printed

Event

事件是一個簡單的同步對象,該事件表示內部標誌和線程。能夠等待標誌設置,或設置或清除標誌自己。event = threading.Event()

客戶端線程能夠等待標誌被設置   event.wait()

服務器線程能夠設置或重置   event.set()    event.clear()

一、若是設置了標誌,則等待方法不執行任何操做   二、若是標誌已清除,等待將阻塞,直到它再次設置。三、任意數量的線程均可能等待相同的事件。

import threading,time
import random
def light():
    if not event.isSet():
        event.set() #wait就不阻塞 #綠燈狀態
    count = 0
    while True:
        if count < 10:
            print('\033[42;1m--green light on---\033[0m')
        elif count <13:
            print('\033[43;1m--yellow light on---\033[0m')
        elif count <20:
            if event.isSet():
                event.clear()
            print('\033[41;1m--red light on---\033[0m')
        else:
            count = 0
            event.set() #打開綠燈
        time.sleep(1)
        count +=1
def car(n):
    while 1:
        time.sleep(random.randrange(10))
        if  event.isSet(): #綠燈
            print("car [%s] is running.." % n)
        else:
            print("car [%s] is waiting for the red light.." %n)
if __name__ == '__main__':
    event = threading.Event()
    Light = threading.Thread(target=light)
    Light.start()
    for i in range(3):
        t = threading.Thread(target=car,args=(i,))
        t.start()

 Queue

queue用於創建和操做隊列,常和threading類一塊兒用來創建一個簡單的線程隊列。

首先,隊列有不少種,根據進出順序來分類,能夠分紅

 queue.Queue(maxsize)  (先進先出隊列)

 queue.LifoQueue(maxsize)  (先進後出隊列)

 queue.PriorityQueue(maxsize)  爲優先度越低的越先出來

 注:若是設置的maxsize小於1,則表示隊列的長度無限長

經常使用的方法有:

 queue.qsize()  返回隊列大小

 queue.empty()  判斷隊列是否爲空

 queue.full()  判斷隊列是否滿了

 queue.get([block[,timeout]])  從隊列頭刪除並返回一個item,block默認爲True,表示當隊列爲空卻去get的時候會阻塞線程,等待直到有有item出現爲止來get出這個item。若是是False的話代表當隊列爲空你卻去get的時候,會引起異常。在block爲True的狀況下能夠再設置timeout參數。表示當隊列爲空,get阻塞timeout指定的秒數以後尚未get到的話就引起Full異常。

 queue.put(...[,block[,timeout]])  向隊尾插入一個item,一樣若block=True的話隊列滿時就阻塞等待有空位出來再put,block=False時引起異常。同get的timeout,put的timeout是在block爲True的時候進行超時設置的參數。

 queue.task_done()  從場景上來講,處理完一個get出來的item以後,調用task_done將向隊列發出一個信號,表示本任務已經完成

 queue.join()  監視全部item並阻塞主線程,直到全部item都調用了task_done以後主線程才繼續向下執行。這麼作的好處在於,假如一個線程開始處理最後一個任務,它從任務隊列中拿走最後一個任務,此時任務隊列就空了但最後那個線程還沒處理完。當調用了join以後,主線程就不會由於隊列空了而擅自結束,而是等待最後那個線程處理完成了。

優先隊列:
import
queue q = queue.PriorityQueue() for i in range(10): i = 100 - i q.put([i,"sdsds%s"%i]) for i in range(10): print(q.get()[1]) 輸出: sdsds91 sdsds92 sdsds93 sdsds94 sdsds95 sdsds96 sdsds97 sdsds98 sdsds99 sdsds100

 多進程multiprocessing:

   多進程是一個支持使用相似於線程模塊的API來支持生成進程的包。多處理包提供本地和遠程併發性,經過使用子進程代替線程,有效地避免了全局解釋器鎖。因爲這個緣由,多處理模塊容許程序員在給定的機器上充分利用多個處理器.

from  multiprocessing import Process
import time,os
def pr(name):
    time.sleep(2)
    print("hello %s"%name)

if __name__ == '__main__':
    for i in range(5):
        p = Process(target=pr,args=( i,) )
        p.start()
輸出:
  等兩秒後輸出:
  hello 1
  hello 0
  hello 4
  hello 2
  hello 3

進程間通信:

不一樣進程間內存是不共享的,要想實現兩個進程間的數據交換,能夠用如下方法:

Queues:

from  multiprocessing import Process,Queue
def put_q(s):
    s.put({"name":"cheng","age":"24"})
if __name__ == "__main__":
    q = Queue()
    p = Process(target=put_q,args=(q,))
    p.start()
    print(q.get()["name"])
    p.join()
輸出:
  cheng

Pipe:

Pipe函數返回由管道鏈接的一對鏈接對象,該管道默認是雙向的。

from multiprocessing import Process,Pipe
def call(con):
    con.send("hello !!!!")
if __name__ == '__main__':
    parent_conn,child_conn = Pipe()
    p = Process(target=call,args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    p.join()
輸出:
hello !!!!

Managers:

manager()返回的manager對象控制一個保存Python對象的服務器進程,並容許其餘進程使用代理來操做它們。

manager返回的管理器()將支持類型列表、名稱空間、名稱空間、鎖、RLock、信號量、信號量、條件、事件、障礙、隊列、值和數組。例如:

from multiprocessing import Process,Manager
import os
def s(d,l):
    d['name'] = "cheng"
    d['age'] = 24
    l.append(os.getpid())
if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list()
        pid = []
        for i in range(5):
            p = Process(target=s,args=(d,l,))
            p.start()
            pid.append(p)
        for i in pid:
            i.join()
        print(d)
        print(l)
輸出:
{'name': 'cheng', 'age': 24}
[7272, 7108, 7824, 4884, 6936]

進程同步:

from multiprocessing import Process, Lock
import time
def f(l, i):
    l.acquire()                   #拿到鎖以後,得等鎖釋放才向下走
    try:
        print('hello world', i)
        time.sleep(2)
    finally:
        l.release()
if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()
輸出:
每隔兩秒輸出

 進程池:

from multiprocessing import Process,Pool
import time
def F(i):
    time.sleep(2)
    return i+100
def B(j):
    print(j)
if __name__ == '__main__':
    pool =Pool(5)    #只容許5個進程在運行
    for i in range(10):
        pool.apply_async(func=F,args=(i,),callback=B)
    print('end')
    pool.close()
    pool.join()
相關文章
相關標籤/搜索