python 併發和線程

併發和線程

基本概念 - 並行、併發

並行, parallelpython

    互不干擾的在同一時刻作多件事;windows

    如,同一時刻,同時有多輛車在多條車道上跑,即同時發生的概念.緩存

 

併發, concurrency安全

    同時作某些事,可是強調同一時段作多件事.服務器

    如,同一路口,發生了車輛要同時經過路面的事件.網絡

 

隊列, 緩衝區 數據結構

    相似排隊,是一種自然解決併發的辦法.排隊區域就是緩衝區.多線程

 

解決併發:併發

   【 "食堂打飯模型", 中午12點,你們都涌向食堂,就是併發.人不少就是高併發.】ide

 

    一、隊列, 緩衝區:

        隊列: 即排隊.

        緩衝區: 排成的隊列.

        優先隊列: 若是有男生隊伍和女生隊伍,女生隊伍優先打飯,就是優先隊列.

 

    二、爭搶:

        鎖機制: 爭搶打飯,有人搶到,該窗口在某一時刻就只能爲這我的服務,鎖定窗口,即鎖機制.

        爭搶也是一種高併發解決方案,可是有可能有人很長時間搶不到,因此不推薦.

 

    三、預處理:

        統計你們愛吃的菜品,最愛吃的80%熱門菜提早作好,20%冷門菜現作,這樣即便有人鎖定窗口,也能很快釋放.

        這是一種提早加載用戶須要的數據的思路,預處理思想,緩存經常使用.

 

    四、並行:

        開多個打飯窗口,同時提供服務.

        IT平常能夠經過購買更多服務器,或多開線程,進程實現並行處理,解決併發問題.

        這是一種水平擴展的思路.

        注: 若是線程在單CPU上處理,就不是並行了.

 

    五、提速:

        經過提升單個窗口的打飯速度,也是解決併發的方式.

        IT方面提升單個CPU性能,或單個服務器安裝更多的CPU.   

        這是一種垂直擴展的思想.

 

    六、消息中間件:

        如上地地鐵站的九曲迴腸的走廊,緩衝人流.

        常見消息中間件: RabbitMQ, ActiveMQ(Apache), RocketMQ(阿里Apache), kafka(Apache)等.

 

【以上例子說明: 技術源於生活! 】

 

進程和線程

a)   在實現了線程的操做系統中,線程是操做系統可以運算調度的最小單位.

b)   線程被包含在進程中,是進程的實際運做單位.

c)   一個程序的執行實例就是一個進程.

 

  進程(Process)是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操做系統結構的基礎.

    進程和程序的關係: 進程是線程的容器.

  Linux進程有父進程和子進程之分,windows的進程是平等關係.

          線程有時稱爲輕量級進程,一個標準的線程由線程ID,當前指令指針,寄存器集合和堆棧組成.

 

當運行一個程序時,OS會建立一個進程。它會使用系統資源(CPU、內存和磁盤空間)和OS內核中的數據結構(文件、網絡鏈接、用量統計等)。

進程之間是互相隔離的,即一個進程既沒法訪問其餘進程的內容,也沒法操做其餘進程。

操做系統會跟蹤全部正在運行的進程,給每一個進程一小段運行時間,而後切換到其餘進程,這樣既能夠作到公平又能夠響應用戶操做。

能夠在圖形界面中查看進程狀態,在在Windows上可使用任務管理器。也能夠本身編寫程序來獲取進程信息。

 

# 獲取正在運行的python解釋器的進程號和當前工做目錄,及用戶ID、用戶組ID。

In [1]: import os

In [2]: os.getpid()

Out[2]: 2550


In [3]: os.getuid()

Out[3]: 0

 
In [4]: os.getcwd()

Out[4]: '/root'

 
In [5]: os.getgid()

Out[5]: 0

 
In [6]:

  

    對線程、線程的理解:

  • 進程是獨立的王國,進程間不能隨便共享數據.
  • 線程是省份,同一進程內的線程能夠共享進程的資源,每個線程有本身獨立的堆棧.       

    線程的狀態: 

  • 就緒(Ready): 線程一旦運行,就在等待被調度.
  • 運行(Running): 線程正在運行.
  • 阻塞(Blocked): 線程等待外部事件發生而沒法運行,如I/O操做.
  • 終止(Terminated): 線程完成或退出,或被取消.

   

python中的進程和線程: 進程會啓動一個解釋器進程,線程共享一個解釋器進程. 

python的線程開發

python線程開發使用標準庫threading.

thread類

# 簽名

def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)

 

  • target: 線程調用的對象,就是目標函數.
  • name: 爲線程起個名字.
  • args: 爲目標函數傳遞實參, 元組.
  • kwargs: 爲目標函數關鍵字傳參, 字典.

線程啓動

import threading

 

# 最簡單的線程程序

def worker():

    print("I'm working")

    print("Finished")

 

t = threading.Thread(target=worker, name='worker')  # 線程對象.

t.start()

經過threading.Thread建立一個線程對象,target是目標函數,name能夠指定名稱.

可是線程沒有啓動,須要調用start方法.

線程會執行函數,是由於線程中就是執行代碼的,而最簡單的封裝就是函數,因此仍是函數調用.

函數執行完,線程也會隨之退出.

若是不讓線程退出,或者讓線程一直工做: 函數內部使用while循環.

import threading

import time

def worker():

    while True:

        time.sleep(1)

        print("I'm work")

    print('Finished')

 

t = threading.Thread(target=worker, name='worker')  # 線程對象.

t.start()   # 啓動.

  

線程退出

python沒有提供線程退出的方法,在下面狀況時會退出:

  • 線程函數內語句執行完畢.
  • 線程函數中拋出未處理的異常.
import threading

import time  

 

def worker():

    count = 0

    while True:

        if (count > 5):

            raise RuntimeError()

            # return

        time.sleep(1)

        print("I'm working")

        count += 1

t = threading.Thread(target=worker, name='worker')  # 線程對象.

t.start()  # 啓動.

 

print("==End==")

 

python的線程沒有優先級,沒有線程組的概念,也不能被銷燬、中止、掛起,天然也沒有恢復、中斷.

 線程的傳參

import threading

import time

 

def add(x, y):

    print('{} + {} = {}'.format(x, y, x + y, threading.current_thread()))

 

thread1 = threading.Thread(target=add, name='add', args=(4, 5))  # 線程對象.

thread1.start()  # 啓動.

time.sleep(2)

 

thread2 = threading.Thread(target=add, name='add',args=(5, ), kwargs={'y': 4})  # 線程對象.

thread2.start()  # 啓動.

time.sleep(2)

 

thread3 = threading.Thread(target=add, name='add', kwargs={'x': 4, 'y': 5})  # 線程對象.

thread3.start()  # 啓動.

  

線程傳參和函數傳參沒什麼區別,本質上就是函數傳參.

 

threading的屬性和方法

current_thread()  # 返回當前線程對象.

main_thread()  # 返回主線程對象.

active_count()  # 當前處於alive狀態的線程個數.

enumerate()  # 返回全部活着的線程的列表,不包括已經終止的線程和未開始的線程.

get_ident()  # 返回當前線程ID,非0整數.

 

active_count、enumerate方法返回的值還包括主線程。

import threading

import time

 

def showthreadinfo():

    print('currentthread = {}'.format(threading.current_thread()))

    print('main thread = {}'.format(threading.main_thread()), '"主線程對象"')

    print('active count = {}'.format(threading.active_count()), '"alive"')

 

def worker():

    count = 1

    showthreadinfo()

    while True:

        if (count > 5):

            break

        time.sleep(1)

        count += 1

        print("I'm working")

 

t = threading.Thread(target=worker, name='worker')  # 線程對象.

showthreadinfo()

t.start()  # 啓動.

 

print('==END==')

  

thread實例的屬性和方法

name: 只是一個名稱標識,能夠重名, getName()、setName()來獲取、設置這個名詞。

ident: 線程ID, 它是非0整數。線程啓動後纔會有ID,不然爲None。線程退出,此ID依舊能夠訪問。此ID能夠重複使用。

is_alive(): 返回線程是否活着。

 

注: 線程的name是一個名稱,能夠重複; ID必須惟一,但能夠在線程退出後再利用。

import threading

import time

 

def worker():

    count = 0

    while True:

        if (count > 5):

            break

        time.sleep(1)

        count += 1

        print(threading.current_thread().name, '~~~~~~~~~~~~~~~~~~~~~~~')

 

t = threading.Thread(name='worker', target=worker)

print(t.ident)

t.start()

 

while True:

    time.sleep(1)

    if t.is_alive():

        print('{} {} alive'.format(t.name, t.ident))

    else:

        print('{} {} dead'.format(t.name, t.ident))

t.start()

  

start(): 啓動線程。每個線程必須且只能執行該方法一次。

run(): 運行線程函數。

爲了演示,派生一個Thread子類

# start方法.

import threading

import time

 

def worker():

    count = 0

    while True:

        if (count >= 5):

            break

        time.sleep(1)

        count += 1

        print('worker running')

 

class MyThread(threading.Thread):

    def start(self):

        print('start~~~~~~~~~~~~~')

        super().start()

 

    def run(self):

        print('run~~~~~~~~~~~~~~~~~')

        super().run()

 

t = MyThread(name='worker', target=worker)

t.start()

 

run方法

import threading

import time

 

def worker():

    count = 0

    while True:

        if (count > 5):

            break

        time.sleep(1)

        count += 1

        print('worker running')

 

class MyThread(threading.Thread):

    def start(self):

        print('start~~~~~~~~~~~~~~~')

        super().start()

 

    def run(self):

        print('run~~~~~~~~~~~~~~~~~')

        super().run()

 

t = MyThread(name='worker', target=worker)

  

# t.start()

t.run()

 start()方法會調用run()方法,而run()方法能夠運行函數。

 這兩個方法看似功能重複,但不能只留其一,以下:

import threading

import time

 

def worker():

    count = 0

    while True:

        if (count > 5):

            break

        time.sleep(1)

        count += 1

        print("worker running")

        print(threading.current_thread().name)

 

class MyThread(threading.Thread):

    def start(self):

        print('start~~~~~~~~~~~~~')

        super().start()

 

    def run(self):

        print('run~~~~~~~~~~~~~~~')

        super().run()

 

t = MyThread(name='worker', target=worker)

# t.start()

  

t.run()  # 分別執行start或者run方法。

 

使用start方法啓動線程,啓動了一個新的線程,名字叫作worker running,可是使用run方法啓動的線程,並無啓動新的線程,只是在主線程中調用了一個普通的函數而已。

所以,啓動線程要使用start方法,才能啓動多個線程。

多線程

顧名思義,多個線程,一個進程中若是有多個線程,就是多線程,實現一種併發。

 

import threading

import time

 

def worker():

    count = 0

    while True:

        if (count > 5):

            break

        time.sleep(2)

        count += 1

        print('worker running')

        print(threading.current_thread().name, threading.current_thread().ident)

 

class MyThread(threading.Thread):

    def start(self):

        print('start~~~~~~~~~~~~~~')

        super().start()

 

    def run(self):

        print('run~~~~~~~~~~~~~~~~~~')

        super().run()  # 查看父類在作什麼?

 

t1 = MyThread(name='worker1', target=worker)

t2 = MyThread(name='worker2', target=worker)

 

t1.start()

t2.start()

能夠看到worker1和worker2交替執行。

 

換成run方法試試:

import threading

import time

def worker():

    count = 0

    while True:

        if (count > 5):

            break

        time.sleep(1)

        count += 1

        print('worker running')

        print(threading.current_thread().name, threading.current_thread().ident)

 

class MyThread(threading.Thread):

    def start(self):

        print('start~~~~~~')

        super().start()

 

    def run(self):

        print('run~~~~~~~~~~~~')

        super().run()

 

t1 = MyThread(name='worker1', target=worker)

t2 = MyThread(name='worker2', target=worker)

 

# t1.start()

# t2.start()

t1.run()

t2.run()

 

沒有開新的線程,這就是普通函數調用,因此執行完t1.run(),而後執行t2.run(),這裏就不是多線程。

當使用start方法啓動線程後,進程內有多個活動的線程並行的工做,就是多線程。

一個進程中至少有一個線程,並做爲程序的入口,這個線程就是主線程。一個進程至少有一個主線程。

其餘線程稱爲工做線程。

線程安全

須要在ipython中演示:

 

In [1]: import threading

   ...:  def worker():

   ...:     for x in range(5):

   ...:         print("{} is running".format(threading.current_thread().name))

   ...:

   ...:  for x in range(1, 5):

   ...:     name = 'worker{}'.format(x)

   ...:     t = threading.Thread(name=name, target=worker)

   ...:     t.start()

   ...:

  

能夠看到運行結果中,本應該是一行行打印,但不少字符串打印在了一塊兒,這說明print函數被打斷了,被線程切換打斷了。

print函數分兩步,第一步打印字符串,第二部換行,就在這之間,發生了線程的切換。

說明print函數不是線程安全函數。

 

線程安全: 線程執行一段代碼,不會產生不肯定的結果,那這段代碼就是線程安全的。

 

一、不讓print打印換行:

import threading

def worker():

    for x in range(100):

        print('{} is running\n'.format(threading.current_thread().name), end='')

 

for x in range(1, 5):

    name = 'worker{}'.format(x)

    t = threading.Thread(name=name, target=worker)

    t.start()

  

字符串是不可變類型,它能夠做爲一個總體不可分割輸出。end=''的做用就是不讓print輸出換行。

 

二、使用logging.

標準庫裏面的logging模塊、日誌處理模塊、線程安全、生成環境代碼都使用logging。

import threading

import logging

 

def worker():

    for x in range(100):

        # print("{} is running.\n".format(threading.current_thread().name), end='')

        logging.warning('{} is running'.format(threading.current_thread().name))

 

for x in range(1, 5):

    name = 'work{}'.format(x)

    t = threading.Thread(name=name, target=worker)

    t.start()

  

daemon線程和non-daemon線程

注:這裏的daemon不是Linux中的守護進程。

 

進程靠線程執行代碼,至少有一個主線程,其餘線程是工做線程。

主線程是第一個啓動的線程。

父線程:若是線程A中啓動了一個線程B,A就是B的父線程。

子線程:B就是A的子線程。

 

python中構造線程的時候能夠設置daemon屬性,這個屬性必須在start方法以前設置好。

 

源碼Thread的__init__方法中:

if daemon is not None:

    self._daemonic = daemon  # 用戶設定bool值。

else:

    self._daemonic = current_thread().daemon

self._ident = None

  

線程daemon屬性,若是設定就是用戶的設置,不然就取當前線程的daemon值。

主線程是non-daemon,即daemon=False。

 

import time

import threading

 

def foo():

    time.sleep(5)

    for i in range(20):

        print(i)

# 主線程是non-daemon線程.

t = threading.Thread(target=foo, daemon=False)

t.start()

 

print('Main Thread Exiting')

 

運行發現線程t依然執行,主線程已經執行完,可是一直等着線程t.

修改成 t = threading.Threading(target=foo, daemon=True),運行發現主線程執行完程序當即結束了,根本沒有等線程t.

 

import threading

import logging

logging.basicConfig(level=logging.INFO) #警告級別

import time

 

def worker():

    for x in range(10):

        time.sleep(1)

        msg = ("{} is running".format(threading.current_thread()))

        logging.info(msg)

        t = threading.Thread(target=worker1,name="worker1-{}".format(x),daemon=False)

        t.start()

        # t.join()

 

def worker1():

    for x in range(10):

        time.sleep(0.3)

        msg = ("¥¥¥¥¥{} is running".format(threading.current_thread()))

        logging.info(msg)

 

t = threading.Thread(target=worker,name='worker-{}'.format(0),daemon=True)

t.start()

# t.join()

time.sleep(0.3)

print('ending')

print(threading.enumerate())

  

結論:

daemon=False 運行發現子線程依然執行,主線程已經執行完,可是主線程會一直等着子線程執行完.

daemon=True 運行發現主線程執行完程序當即結束了。

 

 daemon屬性:表示線程是不是daemon線程,這個值必須在start()以前設置,不然引起RuntimeError異常。

isDaemon():是不是daemon線程。

setDaemon:設置爲daemon線程,必須在start方法以前設置。

 

總結:

線程具備一個daemon屬性,能夠顯式設置爲True或False,也能夠不設置,不設置則取默認值None。

若是不設置daemon,就取當前線程的daemon來設置它。子子線程繼承子線程的daemon值,做用和設置None同樣。

主線程是non-daemon線程,即daemon=False。

從主線程建立的全部線程不設置daemon屬性,則默認都是daemon=False,也就是non-daemon線程。

python程序在沒有活着的non-daemon線程運行時退出,也就是剩下的只能是daemon線程,主線程才能退出,不然主線程就只能等待。

 

以下程序輸出:

import time

import threading

def bar():

    time.sleep(10)

    print('bar')

 

def foo():

    for i in range(20):

        print(i)

    t = threading.Thread(target=bar, daemon=False)

    t.start()

 

# 主線程是non-daemon線程.

t = threading.Thread(target=foo, daemon=True)

t.start()

print('Main Threading Exiting')

上例中,沒有輸出bar這個字符串,如何修改纔會打印出來bar?

 import time

import threading

 

def bar():

    time.sleep(1)

    print('bar')

 

def foo():

    for i in range(5):

        print(i)

    t = threading.Thread(target=bar, daemon=False)

    t.start()

 

# 主線程是non-daemon線程.

t = threading.Thread(target=foo, daemon=True)

t.start()

time.sleep(1)

print('Main Threading Exiting')

 

再看一個例子,看看主線程合適結束daemon線程。

 import time

import threading

 

def foo(n):

    for i in range(n):

        print(i)

        time.sleep(1)

 

t1 = threading.Thread(target=foo, args = (10, ), daemon=True)  # 調換10和20,看看效果。

t1.start()

t2 = threading.Thread(target=foo, args = (20, ), daemon=False)

t2.start()

 

time.sleep(2)

print('Main Threading Exiting')

 

上例說明,若是有non-daemon線程的時候,主線程退出時,也不會殺掉全部daemon線程,直到全部non-daemon線程所有結束,

若是還有daemon線程,主線程須要退出,會結束全部 daemon線程,退出。

join方法

import time

import threading

 

def foo(n):

    for i in range(n):

        print(i)

        time.sleep(1)

t1 = threading.Thread(target=foo, args=(10, ), daemon=False)

t1.start()

t1.join()  # 設置join.

print('Main Thread Exiting')

 

使用了join方法後,daemon線程執行完了,主線程才退出。

join(timeout=None),是線程的標準方法之一。

一個線程中調用另外一個線程的join方法,調用者將被阻塞,直到被調用線程終止。

一個線程能夠被join屢次。

timeout參數指定調用者等待多久,沒有設置超時,就一直等待被調用線程結束。

調用誰的join方法,就是join誰,就要等誰。

相關文章
相關標籤/搜索