Python進程線程協程

線程
  • 線程(線程鎖、threading.Event、queue 隊列、生產者消費者模型、自定義線程池)
  • 進程(數據共享、進程池)
  • 協程

 

線程

Threading用於提供線程相關的操做。線程是應用程序中工做的最小單元,它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務。python

threading 模塊創建在 _thread 模塊之上。thread 模塊以低級、原始的方式來處理和控制線程,而 threading 模塊經過對 thread 進行二次封裝,提供了更方便的 api 來處理線程。git

1程序員

2github

3編程

4api

5安全

6網絡

7數據結構

8多線程

9

10

11

import threading

import time

 

def worker(num):

    time.sleep(1)

    print(num)

    return

 

for in range(10):

    = threading.Thread(target=worker, args=(i,), name="t.%d" % i)

    t.start()

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

# 繼承式調用

 

import threading

import time

  

  

class MyThread(threading.Thread):

    def __init__(self,num):

        threading.Thread.__init__(self)

        self.num = num

  

    def run(self):    #定義每一個線程要運行的函數

  

        print("running on number:%s" %self.num)

  

        time.sleep(2)

  

if __name__ == '__main__':

  

    t1 = MyThread(1)

    t2 = MyThread(2)

    t1.start()

    t2.start()

 

thread方法:

  • t.start() : 激活線程
  • t.getName() : 獲取線程的名稱
  • t.setName() : 設置線程的名稱 
  • t.name : 獲取或設置線程的名稱
  • t.is_alive() : 判斷線程是否爲激活狀態
  • t.isAlive() :判斷線程是否爲激活狀態
  • t.setDaemon() 設置爲後臺線程或前臺線程(默認:False);經過一個布爾值設置線程是否爲守護線程,必須在執行start()方法以前纔可使用。若是是後臺線程,主線程執行過程當中,後臺線程也在進行,主線程執行完畢後,後臺線程不論成功與否,均中止;若是是前臺線程,主線程執行過程當中,前臺線程也在進行,主線程執行完畢後,等待前臺線程也執行完成後,程序中止
  • t.isDaemon() : 判斷是否爲守護線程
  • t.ident :獲取線程的標識符。線程標識符是一個非零整數,只有在調用了start()方法以後該屬性纔有效,不然它只返回None
  • t.join() :逐個執行每一個線程,執行完畢後繼續往下執行,該方法使得多線程變得無心義
  • t.run() :線程被cpu調度後自動執行線程對象的run方法

 

 線程鎖

threading.RLock & threading.Lock

咱們使用線程對數據進行操做的時候,若是多個線程同時修改某個數據,可能會出現不可預料的結果,爲了保證數據的準確性,引入了鎖的概念。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

import threading

import time

 

num = 0

 

lock = threading.RLock()    # 實例化鎖類

 

def work():

    lock.acquire()  # 加鎖

    global num

    num += 1

    time.sleep(1)

    print(num)

    lock.release()  # 解鎖

 

for in range(10):

    = threading.Thread(target=work)

    t.start()

  

threading.RLock和threading.Lock 的區別

RLock容許在同一線程中被屢次acquire。而Lock卻不容許這種狀況。 若是使用RLock,那麼acquire和release必須成對出現,即調用了n次acquire,必須調用n次的release才能真正釋放所佔用的鎖。

1

2

3

4

5

6

7

import threading

 

lock = threading.Lock()

lock.acquire()

lock.acquire()  # 產生死鎖

lock.release()

lock.release()

1

2

3

4

5

6

7

8

import threading

 

rlock = threading.RLock()

rlock.acquire()

rlock.acquire()      # 在同一線程內,程序不會堵塞。

rlock.release()

rlock.release()

print("end.")

  

threading.Event

Event是線程間通訊最間的機制之一:一個線程發送一個event信號,其餘的線程則等待這個信號。用於主線程控制其餘線程的執行。 Events 管理一個flag,這個flag可使用set()設置成True或者使用clear()重置爲False,wait()則用於阻塞,在flag爲True以前。flag默認爲False。

  • Event.wait([timeout]) : 堵塞線程,直到Event對象內部標識位被設爲True或超時(若是提供了參數timeout)
  • Event.set() :將標識位設爲Ture
  • Event.clear() : 將標識伴設爲False
  • Event.isSet() :判斷標識位是否爲Ture

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

import threading

  

def do(event):

    print('start')

    event.wait()

    print('execute')

  

event_obj = threading.Event()

for in range(10):

    = threading.Thread(target=do, args=(event_obj,))

    t.start()

  

event_obj.clear()

inp = input('input:')

if inp == 'true':

    event_obj.set()

  當線程執行的時候,若是flag爲False,則線程會阻塞,當flag爲True的時候,線程不會阻塞。它提供了本地和遠程的併發性。

 

threading.Condition

Python提供的Condition對象提供了對複雜線程同步問題的支持。Condition被稱爲條件變量,除了提供與Lock相似的acquire和release方法外,還提供了wait和notify方法。線程首先acquire一個條件變量,而後判斷一些條件。若是條件不知足則wait;若是條件知足,進行一些處理改變條件後,經過notify方法通知其餘線程,其餘處於wait狀態的線程接到通知後會從新判斷條件。不斷的重複這一過程,從而解決複雜的同步問題。

在典型的設計風格里,利用condition變量用鎖去通許訪問一些共享狀態,線程在獲取到它想獲得的狀態前,會反覆調用wait()。修改狀態的線程在他們狀態改變時調用 notify() or notify_all(),用這種方式,線程會盡量的獲取到想要的一個等待者狀態。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

import threading

import time<br>

def consumer(cond):

    with cond:

        print("consumer before wait")

        cond.wait()

        print("consumer after wait")

  

def producer(cond):

    with cond:

        print("producer before notifyAll")

        cond.notifyAll()

        print("producer after notifyAll")

  

condition = threading.Condition()

c1 = threading.Thread(name="c1", target=consumer, args=(condition,))

c2 = threading.Thread(name="c2", target=consumer, args=(condition,))

  

= threading.Thread(name="p", target=producer, args=(condition,))

  

c1.start()

time.sleep(2)

c2.start()

time.sleep(2)

p.start()

 

# consumer()線程要等待producer()設置了Condition以後才能繼續。

  

queue 隊列

適用於多線程編程的先進先出數據結構,能夠用來安全的傳遞多線程信息。

queue 方法:

  • q = queue.Queue(maxsize=0) # 構造一個先進顯出隊列,maxsize指定隊列長度,爲0 時,表示隊列長度無限制。
  • q.join()   # 等到隊列爲kong的時候,在執行別的操做
  • q.qsize()   # 返回隊列的大小 (不可靠)
  • q.empty()    # 當隊列爲空的時候,返回True 不然返回False (不可靠)
  • q.full()     # 當隊列滿的時候,返回True,不然返回False (不可靠)
  • q.put(item, block=True, timeout=None) # 將item放入Queue尾部,item必須存在,能夠參數block默認爲True,表示當隊列滿時,會等待隊列給出可用位置,爲False時爲非阻塞,此時若是隊列已滿,會引起queue.Full 異常。 可選參數timeout,表示 會阻塞設置的時間,事後,若是隊列沒法給出放入item的位置,則引起 queue.Full 異常
  • q.get(block=True, timeout=None) # 移除並返回隊列頭部的一個值,可選參數block默認爲True,表示獲取值的時候,若是隊列爲空,則阻塞,爲False時,不阻塞,若此時隊列爲空,則引起 queue.Empty異常。 可選參數timeout,表示會阻塞設置的時候,事後,若是隊列爲空,則引起Empty異常。
  • q.put_nowait(item) # 等效於 put(item,block=False)
  • q.get_nowait()     # 等效於 get(item,block=False)

生產者消費者模型

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

import queue

import threading

 

que = queue.Queue(10)

 

def s(i):

    que.put(i)

    # print("size:", que.qsize())

 

def x(i):

    = que.get(i)

    print("get:", g)

 

for in range(113):

    = threading.Thread(target=s, args=(i,))

    t.start()

 

for in range(111):

    = threading.Thread(target=x, args=(i,))

    t.start()

     

print("size:", que.qsize())

 

# 輸出結果:

get: 1

get: 2

get: 3

get: 4

get: 5

get: 6

get: 7

get: 8

get: 9

get: 10

size: 2

 

自定義線程池:

# 自定義線程池(一)
import queue
import threading
import time

class TreadPool:

    def __init__(self, max_num=20):
        self.queue = queue.Queue(max_num)
        for i in range(max_num):
            self.queue.put(threading.Thread)

    def get_thread(self):
        return self.queue.get()

    def add_thread(self):
        self.queue.put(threading.Thread)

def func(pool, n):
    time.sleep(1)
    print(n)
    pool.add_thread()

p = TreadPool(10)
for i in range(1, 100):
    thread = p.get_thread()
    t = thread(target=func, args=(p, i,))
    t.start()
# 線程池(二)
import queue
import threading
import contextlib
import time

StopEvent = object()

class Threadpool:

    def __init__(self, max_num=10):
        self.q = queue.Queue()
        self.max_num = max_num

        self.terminal = False
        self.generate_list = []     # 以建立線程列表
        self.free_list = []         # 以建立的線程空閒列表

    def run(self, func, args, callback=None):
        """
        線程池執行一個任務
        :param func: 任務函數
        :param args: 任務函數所需參數
        :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值(默認爲None,即:不執行回調函數)
        :return: 若是線程池已經終止,則返回True不然None
        """
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)
        self.q.put(w)

    def generate_thread(self):
        """
        建立一個線程
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循環去獲取任務函數並執行任務函數
        """
        current_thread = threading.currentThread    # 當前線程
        self.generate_list.append(current_thread)

        event = self.q.get()
        while event != StopEvent:

            func, arguments, callback = event
            try:
                result = func(*arguments)
                status = True
            except Exception as e:
                status = False
                result = e

            if callback is not None:
                try:
                    callback(status, result)
                except Exception as e:
                    pass

            if self.terminal:
                event = StopEvent
            else:
                with self.worker_state(self.free_list, current_thread):
                    event = self.q.get()
                # self.free_list.append(current_thread)
                # event = self.q.get()
                # self.free_list.remove(current_thread)

        else:
            self.generate_list.remove(current_thread)

    def close(self):
        """
        執行完全部的任務後,全部線程中止
        """
        num = len(self.generate_list)
        while num:
            self.q.put(StopEvent)
            num -= 1

    def terminate(self):
        """
        不管是否還有任務,終止線程
        """
        self.terminal = True
        while self.generate_list:
            self.q.put(StopEvent)
        self.q.empty()  # 清空隊列

    @contextlib.contextmanager      # with上下文管理
    def worker_state(self, frelist, val):
        """
        用於記錄線程中正在等待的線程數
        """
        frelist.append(val)
        try:
            yield
        finally:
            frelist.remove(val)


def work(i):
    time.sleep(1)
    print(i)

pool = Threadpool()
for item in range(50):
    pool.run(func=work, args=(item,))
pool.close()
# pool.terminate()

自定義線程池(二)

 

進程

1

2

3

4

5

6

7

8

9

10

# 進程

from multiprocessing import Process

 

def work(name):

    print("Hello, %s" % name)

 

if __name__ == "__main__":

    = Process(target=work, args=("nick",))

    p.start()

    p.join()

  注意:因爲進程之間的數據須要各自持有一份,因此建立進程須要的很是大的開銷。

 

數據共享

不一樣進程間內存是不共享的,要想實現兩個進程間的數據交換,能夠用如下方法:

Shared memory

數據能夠用Value或Array存儲在一個共享內存地圖裏,以下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

from multiprocessing import Process, Value, Array

  

def f(n, a):

    n.value = 3.1415927

    for in range(len(a)):

        a[i] = -a[i]

  

if __name__ == '__main__':

    num = Value('d'0.0)

    arr = Array('i'range(10))

  

    = Process(target=f, args=(num, arr))

    p.start()

    p.join()

  

    print(num.value)

    print(arr[:])

 

 

# 輸出:

3.1415927

[0-1-2-3-4-5-6-7-8-9]

建立num和arr時,「d」和「i」參數由Array模塊使用的typecodes建立:「d」表示一個雙精度的浮點數,「i」表示一個有符號的整數,這些共享對象將被線程安全的處理。

‘c’: ctypes.c_char     ‘u’: ctypes.c_wchar    ‘b’: ctypes.c_byte     ‘B’: ctypes.c_ubyte
‘h’: ctypes.c_short     ‘H’: ctypes.c_ushort    ‘i’: ctypes.c_int      ‘I’: ctypes.c_uint
‘l’: ctypes.c_long,    ‘L’: ctypes.c_ulong    ‘f’: ctypes.c_float    ‘d’: ctypes.c_double

1

2

3

4

5

6

7

8

9

10

11

from multiprocessing import Process,Array

temp = Array('i', [11,22,33,44])

  

def Foo(i):

    temp[i] = 100+i

    for item in temp:

        print i,'----->',item

  

for in range(2):

    = Process(target=Foo,args=(i,))

    p.start()

Server process

由Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array類型的支持。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

from multiprocessing import Process, Manager

  

def f(d, l):

    d[1= '1'

    d['2'= 2

    d[0.25= None

    l.reverse()

  

if __name__ == '__main__':

    with Manager() as manager:

        = manager.dict()

        = manager.list(range(10))

  

        = Process(target=f, args=(d, l))

        p.start()

        p.join()

  

        print(d)

        print(l)

 

# 輸出結果:

{0.25None1'1''2'2}

[9876543210]

Server process manager比 shared memory 更靈活,由於它能夠支持任意的對象類型。另外,一個單獨的manager能夠經過進程在網絡上不一樣的計算機之間共享,不過他比shared memory要慢。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

# manage.dict()共享數據

from multiprocessing import Process,Manager

  

manage = Manager()

dic = manage.dict()

  

def Foo(i):

    dic[i] = 100+i

    print dic.values()

  

for in range(2):

    = Process(target=Foo,args=(i,))

    p.start()

    p.join()

當建立進程時(非使用時),共享數據會被拿到子進程中,當進程中執行完畢後,再賦值給原值。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from multiprocessing import Process, Array, RLock

def Foo(lock,temp,i):
    """
    將第0個數加100
    """
    lock.acquire()
    temp[0] = 100+i
    for item in temp:
        print i,'----->',item
    lock.release()

lock = RLock()
temp = Array('i', [11, 22, 33, 44])

for i in range(20):
    p = Process(target=Foo,args=(lock,temp,i,))
    p.start()

進程鎖實例

 

進程池

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。

方法:

  • apply(func[, args[, kwds]]) :使用arg和kwds參數調用func函數,結果返回前會一直阻塞,因爲這個緣由,apply_async()更適合併發執行,另外,func函數僅被pool中的一個進程運行。

  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一個變體,會返回一個結果對象。若是callback被指定,那麼callback能夠接收一個參數而後被調用,當結果準備好回調時會調用callback,調用失敗時,則用error_callback替換callback。 Callbacks應被當即完成,不然處理結果的線程會被阻塞。

  • close() : 阻止更多的任務提交到pool,待任務完成後,工做進程會退出。

  • terminate() : 無論任務是否完成,當即中止工做進程。在對pool對象進程垃圾回收的時候,會當即調用terminate()。

  • join() : wait工做線程的退出,在調用join()前,必須調用close() or terminate()。這樣是由於被終止的進程須要被父進程調用wait(join等價與wait),不然進程會成爲殭屍進程

進程池中有兩個方法:

  • apply
  • apply_async

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

from multiprocessing import Pool

import time

def myFun(i):

    time.sleep(2)

    return i+100

 

def end_call(arg):

    print("end_call",arg)

 

= Pool(5)

 

# print(p.map(myFun,range(10)))

 

for in range(10):

    p.apply_async(func=myFun,args=(i,),callback=end_call)

 

print("end")

p.close()

p.join()

 官方示例

from multiprocessing import Pool, TimeoutError
import time
import os
 
def f(x):
    return x*x
 
if __name__ == '__main__':
    # 建立4個進程 
    with Pool(processes=4) as pool:
 
        # 打印 "[0, 1, 4,..., 81]" 
        print(pool.map(f, range(10)))
 
        # 使用任意順序輸出相同的數字, 
        for i in pool.imap_unordered(f, range(10)):
            print(i)
 
        # 異步執行"f(20)" 
        res = pool.apply_async(f, (20,))      # 只運行一個進程 
        print(res.get(timeout=1))             # 輸出 "400" 
 
        # 異步執行 "os.getpid()" 
        res = pool.apply_async(os.getpid, ()) # 只運行一個進程 
        print(res.get(timeout=1))             # 輸出進程的 PID 
 
        # 運行多個異步執行可能會使用多個進程 
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])
 
        # 是一個進程睡10秒 
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("發現一個 multiprocessing.TimeoutError異常")
 
        print("目前,池中還有其餘的工做")
 
    # 退出with塊中已經中止的池 
    print("Now the pool is closed and no longer available")

官方示例

 

協程

  協程又叫微線程,從技術的角度來講,「協程就是你能夠暫停執行的函數」。若是你把它理解成「就像生成器同樣」,那麼你就想對了。 線程和進程的操做是由程序觸發系統接口,最後的執行者是系統;協程的操做則是程序員。

  協程存在的意義:對於多線程應用,CPU經過切片的方式來切換線程間的執行,線程切換時須要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。

  協程的適用場景:當程序中存在大量不須要CPU的操做時(IO),適用於協程。

1

2

3

4

5

# 安裝

pip install gevent

 

# 導入模塊

import gevent

 

greenlet

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

# greenlet

from greenlet import greenlet

 

def test1():

    print(11)

    gr2.switch()

    print(22)

    gr2.switch()

 

def test2():

    print(33)

    gr1.switch()

    print(44)

 

gr1 = greenlet(test1)

gr2 = greenlet(test2)

gr1.switch()

 

# 輸出結果:

11

33

22

44

  

gevent

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

# gevent

import gevent

 

def foo():

    print("Running in foo")

    gevent.sleep(0)

    print("Explicit context switch to foo angin")

 

def bar():

    print("Explicit context to bar")

    gevent.sleep(0)

    print("Implicit context swich back to bar")

 

gevent.joinall([

    gevent.spawn(foo),

    gevent.spawn(bar),

])

 

# 輸出結果:

Running in foo

Explicit context to bar

Explicit context switch to foo angin

Implicit context swich back to bar

 

 遇到IO操做自動切換

# 遇到IO自動切換
from gevent import monkey
monkey.patch_all()
import gevent
import requests

def f(url):
    print("FET: %s" % url)
    resp = requests.get(url)
    data = len(resp.text)
    print(url, data)

gevent.joinall([
    gevent.spawn(f, 'https://www.python.org/'),
    gevent.spawn(f, 'https://www.yahoo.com/'),
    gevent.spawn(f, 'https://github.com/'),
])

遇到IO操做自動切換
相關文章
相關標籤/搜索