python多線程,多進程

threading.active_count()html

返回當前存活的線程類 Thread 對象。返回的計數等於 enumerate() 返回的列表長度。python

threading.current_thread()程序員

返回當前對應調用者的控制線程的 Thread 對象。若是調用者的控制線程不是利用 threading 建立,會返回一個功能受限的虛擬線程對象。編程

threading.get_ident()安全

 

返回當前線程的 「線程標識符」。它是一個非零的整數。它的值沒有直接含義,主要是用做 magic cookie,好比做爲含有線程相關數據的字典的索引。線程標識符可能會在線程退出,新線程建立時被複用。服務器

threading.enumerate()cookie

以列表形式返回當前全部存活的 Thread 對象。 該列表包含守護線程,current_thread() 建立的虛擬線程對象和主線程。它不包含已終結的線程和還沒有開始的線程。網絡

threading.main_thread()併發

 

返回主 Thread 對象。通常狀況下,主線程是Python解釋器開始時建立的線程。app

threading.setprofile(func)

爲全部 threading 模塊開始的線程設置性能測試函數。在每一個線程的 run() 方法被調用前,func 會被傳遞給 sys.setprofile() 。

threading.stack_size([size])

返回建立線程時用的堆棧大小。可選參數 size 指定以後新建的線程的堆棧大小,並且必定要是0(根據平臺或者默認配置)或者最小是32,768(32KiB)的一個正整數。若是 size 沒有指定,默認是0。若是不支持改變線程堆棧大小,會拋出 RuntimeError 錯誤。若是指定的堆棧大小不合法,會拋出 ValueError 錯誤而且不會修改堆棧大小。32KiB是當前最小的能保證解釋器有足夠堆棧空間的堆棧大小。須要注意的是部分平臺對於堆棧大小會有特定的限制,例如要求大於32KiB的堆棧大小或者須要根據系統內存頁面的整數倍進行分配 - 應當查閱平臺文檔有關詳細信息(4KiB頁面比較廣泛,在沒有更具體信息的狀況下,建議的方法是使用4096的倍數做爲堆棧大小)。

threading.TIMEOUT_MAX

阻塞函數( Lock.acquire()RLock.acquire()Condition.wait(), ...)中形參 timeout 容許的最大值。傳入超過這個值的 timeout 會拋出 OverflowError 異常。

有個 "主線程" 對象;這對應Python程序裏面初始的控制線程。它不是一個守護線程。

"虛擬線程對象" 是能夠被建立的。這些是對應於「外部線程」的線程對象,它們是在線程模塊外部啓動的控制線程,例如直接來自C代碼。虛擬線程對象功能受限;他們老是被認爲是存活的和守護模式,不能被 join() 。由於沒法檢測外來線程的終結,它們永遠不會被刪除。

class  threading. Thread (group=Nonetarget=Nonename=Noneargs=()kwargs={}*daemon=None)

調用這個構造函數時,必需帶有關鍵字參數。參數以下:

group 應該爲 None;爲了往後擴展 ThreadGroup 類實現而保留。

target 是用於 run() 方法調用的可調用對象。默認是 None,表示不須要調用任何方法。

name 是線程名稱。默認狀況下,由 "Thread-N" 格式構成一個惟一的名稱,其中 N 是小的十進制數。

args 是用於調用目標函數的參數元組。默認是 ()

kwargs 是用於調用目標函數的關鍵字參數字典。默認是 {}

若是 daemon 不是 None,線程將被顯式的設置爲 守護模式,無論該線程是不是守護模式。若是是 None (默認值),線程將繼承當前線程的守護模式屬性。

若是子類型重載了構造函數,它必定要確保在作任何事前,先發起調用基類構造器(Thread.__init__())。

在 3.3 版更改: 加入 daemon 參數。

start ()

開始線程活動。

它在一個線程裏最多隻能被調用一次。它安排對象的 run() 方法在一個獨立的控制進程中調用。

若是同一個線程對象中調用這個方法的次數大於一次,會拋出 RuntimeError 。

run ()

表明線程活動的方法。

你能夠在子類型裏重載這個方法。 標準的 run() 方法會對做爲 target 參數傳遞給該對象構造器的可調用對象(若是存在)發起調用,並附帶從 args 和 kwargs 參數分別獲取的位置和關鍵字參數。

join (timeout=None)

等待,直到線程終結。這會阻塞調用這個方法的線程,直到被調用 join() 的線程終結 -- 不論是正常終結仍是拋出未處理異常 -- 或者直到發生超時,超時選項是可選的。

當 timeout 參數存在並且不是 None 時,它應該是一個用於指定操做超時的以秒爲單位的浮點數(或者分數)。由於 join() 老是返回 None ,因此你必定要在 join() 後調用 is_alive() 才能判斷是否發生超時 -- 若是線程仍然存貨,則 join() 超時。

當 timeout 參數不存在或者是 None ,這個操做會阻塞直到線程終結。

一個線程能夠被 join() 不少次。

若是嘗試加入當前線程會致使死鎖, join() 會引發 RuntimeError 異常。若是嘗試 join() 一個還沒有開始的線程,也會拋出相同的異常。

name

只用於識別的字符串。它沒有語義。多個線程能夠賦予相同的名稱。 初始名稱由構造函數設置。

getName ()
setName ()

舊的 name 取值/設值 API;直接當作特徵屬性使用它。

ident

這個線程的 '線程標識符',若是線程還沒有開始則爲 None 。這是個非零整數。參見 get_ident() 函數。當一個線程退出而另一個線程被建立,線程標識符會被複用。即便線程退出後,仍可獲得標識符。

is_alive ()

返回線程是否存活。

當 run() 方法剛開始直到 run() 方法剛結束,這個方法返回 True 。模塊函數 enumerate() 返回包含全部存活線程的列表。

daemon

一個表示這個線程是(True)否(False)守護線程的布爾值。必定要在調用 start() 前設置好,否則會拋出 RuntimeError 。初始值繼承於建立線程;主線程不是守護線程,所以主線程建立的全部線程默認都是 daemon = False

當沒有存活的非守護線程時,整個Python程序纔會退出。

isDaemon ()
setDaemon ()

舊的 name 取值/設值 API;建議直接當作特徵屬性使用它。

#線程是cpu最小的單元

from threading import Thread,active_count,current_thread,enumerate,main_thread,stack_size,Lock
import time
import random

# print(active_count()) #返回當前的線程數
# print(current_thread()) #返回當前的線程Thread對象
# print(enumerate()) #反回當前活動的線程
# print(main_thread()) #返回當前的主線程
# print(stack_size()) #返回建立線程時使用的棧的大小,若是size參數,則用來指定後續建立的線程使用的棧的大小,size必須是0,(表示使用系統默認的值)大於32k的正整數

#
arr = []
def task(num):
    time.sleep(3)
    print(num)
#
start = time.time()

for i in range(0,5):
    t =Thread(target=task,args=(i,))
    t.start()
    arr.append(t)
    # print(enumerate())
    # t.join()

for j in arr:
    j.join()

end = time.time()
print('時間:%s'%(end-start))

'''
Thread對象經常使用方法:
start()運行線程
join()阻塞主線程 time阻塞時間
name 線程的名字
is_alive() 判斷線程是否存活
ident 線程的標識
daemon 是否爲守護進程
'''

# class MyI(Thread):
#     def __init__(self):
#         super(MyI,self).__init__()
#     def run(self):
#         time.sleep(3)
#         print(self.name,self.ident,self.daemon)
#
# t1 = MyI()
# t1.name = "任務一"
# # t1.daemon=True
# t1.start()

#
# h2=[]
# l = Lock()
# class xm(Thread):
#
#     def __init__(self):
#         super(xm,self).__init__()
#         self.h = True
#     def run(self):
#
#         if len(h2)<5:
#             l.acquire()
#             while self.h:
#                 h2.append(1)
#                 print('小明往鍋里加了%s'%len(h2)+'個丸子','鍋裏有%s'%len(h2)+'個丸子')
#                 if len(h2)==5:
#                     l.release()
#                     self.h = False
#
# class xh(Thread):
#     def __init__(self):
#         super(xh,self).__init__()
#         self.h = True
#
#     def run(self):
#         i = 0
#         if len(h2)==5:
#             l.acquire()
#             while self.h:
#                 i+=1
#                 h2.pop(0)
#                 print('小紅吃了鍋裏的%s'%(i)+'個丸子','鍋裏有%s'%len(h2)+'個丸子')
#                 if len(h2)==0:
#                     l.release()
#                     self.h = False
#
#
# x1 = xm()
# x1.start()
#
# x2 = xh()
# x2.start()
#
# '''
# def m(*args):
#     print(random.randint(1,args))
# Thread(group=None,target=m(),args=(1,2,34,6)) #構造函數
# target 線程啓動執行函數
# args 是元組 是線程執行函數的參數
# kwargs 是線程執行函數的參數
# '''

multiprocessing 是一個用與 threading 模塊類似API的支持產生進程的包。 multiprocessing 包同時提供本地和遠程併發,使用子進程代替線程,有效避免 Global Interpreter Lock 帶來的影響。所以, multiprocessing 模塊容許程序員充分利用機器上的多個核心。Unix 和 Windows 上均可以運行。

multiprocessing 模塊還引入了在 threading 模塊中沒有相似物的API。這方面的一個主要例子是 Pool 對象,它提供了一種方便的方法,能夠跨多個輸入值並行化函數的執行,跨進程分配輸入數據(數據並行)。如下示例演示了在模塊中定義此類函數的常見作法,以便子進程能夠成功導入該模塊。這個數據並行的基本例子使用 Pool ,

from multiprocessing import Pool def f(x): return x*x if __name__ == '__main__': with Pool(5) as p: print(p.map(f, [1, 2, 3])) 

將打印到標準輸出

[1, 4, 9]

Process 類

在 multiprocessing 中,經過建立一個 Process 對象而後調用它的 start() 方法來生成進程。 Process 和 threading.Thread API 相同。 一個簡單的多進程程序示例是:

from multiprocessing import Process def f(name): print('hello', name) if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() p.join() 

要顯示所涉及的各個進程ID,這是一個擴展現例:

from multiprocessing import Process import os def info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid()) print('process id:', os.getpid()) def f(name): info('function f') print('hello', name) if __name__ == '__main__': info('main line') p = Process(target=f, args=('bob',)) p.start() p.join()

POOL進程池
# from multiprocessing import Pool
# def fun():
# sum = 0
# for i in range(0,100000):
# sum+=i
# print(sum)
#
# if __name__ == '__main__':
# pool = Pool(5) #5個進程的進程池
# start = time.time()
# for i in range(100):
# # pool.apply(fun)
# obj = pool.apply_async(fun)
# print(obj)
#
# end =time.time()
# print(end-start)

 
      

隊列

Queue 類是一個近似 queue.Queue 的克隆。 例如:

from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join() 

隊列是線程和進程安全的。

管道

Pipe() 函數返回一個由管道鏈接的鏈接對象,默認狀況下是雙工(雙向)。例如:

from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join() 

返回的兩個鏈接對象 Pipe() 表示管道的兩端。每一個鏈接對象都有 send() 和 recv() 方法(相互之間的)。請注意,若是兩個進程(或線程)同時嘗試讀取或寫入管道的 同一 端,則管道中的數據可能會損壞。固然,同時使用管道的不一樣端的進程不存在損壞的風險。

 
      

進程之間的同步

multiprocessing 包含來自 threading 的全部同步基本體的等價物。例如,可使用鎖來確保一次只有一個進程打印到標準輸出:

from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start() 

不使用來自不一樣進程的鎖輸出容易產生混淆。

 
      

在進程之間共享狀態

如上所述,在進行併發編程時,一般最好儘可能避免使用共享狀態。使用多個進程時尤爲如此。

可是,若是你真的須要使用一些共享數據,那麼 multiprocessing 提供了兩種方法。

共享內存

可使用 Value 或 Array 將數據存儲在共享內存映射中。例如,如下代碼:

from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:]) 

將打印

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9] 

建立 num 和 arr 時使用的 'd' 和 'i' 參數是 array 模塊使用的類型的 typecode : 'd' 表示雙精度浮點數, 'i' 表示有符號整數。這些共享對象將是進程和線程安全的。

爲了更靈活地使用共享內存,可使用 multiprocessing.sharedctypes 模塊,該模塊支持建立從共享內存分配的任意ctypes對象。

服務器進程

由 Manager() 返回的管理器對象控制一個服務器進程,該進程保存Python對象並容許其餘進程使用代理操做它們。

Manager() 返回的管理器支持類型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array 。例如

from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l) 

將打印

{0.25: None, 1: '1', '2': 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0] 

服務器進程管理器比使用共享內存對象更靈活,由於它們能夠支持任意對象類型。此外,單個管理器能夠經過網絡由不一樣計算機上的進程共享。可是,它們比使用共享內存慢。

服務端的建立
from multiprocessing.managers import BaseManager
from queue import Queue
queue = Queue()
queue.put('lolololol')
class QueueManager(BaseManager): pass
QueueManager.register('get_queue', callable=lambda:queue)
m = QueueManager(address=('127.0.0.1', 5000), authkey=b'2ab')
s = m.get_server()
s.serve_forever() #建立服務器

客戶端的建立
from multiprocessing.managers import BaseManagerclass QueueManager(BaseManager): passQueueManager.register('get_queue')m = QueueManager(address=('127.0.0.1', 5000), authkey=b'2ab')m.connect()queue = m.get_queue()print(queue.get())
相關文章
相關標籤/搜索