An executing instance of a program is called a process.
Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.
在多道編程中,我們允許多個程序同時加載到內存中,在操作系統的調度下,可以實現併發地執行。這是這樣的設計,大大提高了CPU的利用率。進程的出現讓每個用戶感覺到自己獨享CPU,因此,進程就是爲了在CPU上實現多道編程而提出的。
進程有很多優點,它提供了多道編程,讓我們感覺我們每個人都擁有自己的CPU和其他資源,可以提高計算機的利用率。很多人就不理解了,既然進程這麼優秀,爲什麼還要線程呢?其實,仔細觀察就會發現進程還是有很多缺陷的,主要體現在兩點上:
進程只能在一個時間幹一件事,如果想同時幹兩件事或多件事,進程就無能爲力了。
進程在執行的過程中如果阻塞,
例如等待輸入,整個進程就會掛起,即使進程中有些工作不依賴於輸入的數據,也將無法執行。
例如,我們在使用qq聊天, qq做爲一個獨立進程如果同一時間只能幹一件事,那他如何實現在同一時刻 既能監聽鍵盤輸入、又能監聽其它人給你發的消息、同時還能把別人發的消息顯示在屏幕上呢?你會說,操作系統不是有分時麼?但我的親,分時是指在不同進程間的分時呀, 即操作系統處理一會你的qq任務,又切換到word文檔任務上了,每個cpu時間片分給你的qq程序時,你的qq還是隻能同時幹一件事呀。
再直白一點, 一個操作系統就像是一個工廠,工廠裏面有很多個生產車間,不同的車間生產不同的產品,每個車間就相當於一個進程,且你的工廠又窮,供電不足,同一時間只能給一個車間供電,爲了能讓所有車間都能同時生產,你的工廠的電工只能給不同的車間分時供電,但是輪到你的qq車間時,發現只有一個幹活的工人,結果生產效率極低,爲了解決這個問題,應該怎麼辦呢?。。。。沒錯,你肯定想到了,就是多加幾個工人,讓幾個人工人並行工作,這每個工人,就是線程!
線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以併發多個線程,每條線程並行執行不同的任務
A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions.
Suppose you're reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped. One way to achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers.
If you have a roommate, and she's using the same technique, she can take the book while you're not using it, and resume reading from where she stopped. Then you can take it back, and resume it from where you were.
Threads work in the same way. A CPU is giving you the illusion that it's doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU.
On a more technical level, an execution context (therefore a thread) consists of the values of the CPU's registers.
Last: threads are different from processes. A thread is a context of execution, while a process is a bunch of resources associated with a computation. A process can have one or many threads.
Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
上面的核心意思就是,無論你啓多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只允許一個線程運行,擦。。。,那這還叫什麼多線程呀?莫如此早的下結結論,聽我現場講。
首先需要明確的一點是GIL
並不是Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(語法)標準,但是可以用不同的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行。像其中的JPython就沒有GIL。然而因爲CPython是大部分環境下默認的Python執行環境。所以在很多人的概念裏CPython就是Python,也就想當然的把GIL
歸結爲Python語言的缺陷。所以這裏要先明確一點:GIL並不是Python的特性,Python完全可以不依賴於GIL
這篇文章透徹的剖析了GIL對python多線程的影響,強烈推薦看一下:http://www.dabeaz.com/python/UnderstandingGIL.pdf
線程有2種調用方式,如下:
直接調用
import threading
import time
def sayhi(num): # 定義每個線程要運行的函數
print("running on number:%s" % num)
time.sleep(3)
if __name__ == '__main__':
t1 = threading.Thread(target=sayhi, args=(1,)) # 生成一個線程實例
t2 = threading.Thread(target=sayhi, args=(2,)) # 生成另一個線程實例
t1.start() # 啓動線程
t2.start() # 啓動另一個線程
print(t1.getName()) # 獲取線程名
print(t2.getName())
# D:\Python\python\python-3.6.1\Python36-64\python.exe
# running on number:1
# running on number:2
# Thread-1
# Thread-2
繼承式調用
import threading
import time
class MyThread(threading.Thread):
def __init__(self, num):
threading.Thread.__init__(self)
self.num = num
def run(self): # 定義每個線程要運行的函數
print("running on number:%s" % self.num)
time.sleep(3)
if __name__ == '__main__':
t1 = MyThread(1)
t2 = MyThread(2)
t1.start()
t2.start()
# D:\Python\python\python-3.6.1\Python36-64\python.exe
# running on number:1
# running on number:2
Join & Daemon
Some threads do background tasks, like sending keepalive packets, or performing periodic garbage collection, or whatever. These are only useful when the main program is running, and it's okay to kill them off once the other, non-daemon, threads have exited.
Without daemon threads, you'd have to keep track of them, and tell them to exit, before your program can completely quit. By setting them as daemon threads, you can let them run and forget about them, and when your program quits, any daemon threads are killed automatically.
1 Python 默認參數創建線程後,不管主線程是否執行完畢,都會等待子線程執行完畢才一起退出,有無join結果一樣
2 如果創建線程,並且設置了daemon爲true,即thread.setDaemon(True), 則主線程執行完畢後自動退出,
不會等待子線程的執行結果。而且隨着主線程退出,子線程也消亡。
3 join方法的作用是阻塞,等待子線程結束,join方法有一個參數是timeout,
即如果主線程等待timeout,子線程還沒有結束,則主線程強制結束子線程。
4 如果線程daemon屬性爲False, 則join裏的timeout參數無效。主線程會一直等待子線程結束。
5 如果線程daemon屬性爲True, 則join裏的timeout參數是有效的, 主線程會等待timeout時間後,結束子線程。
此處有一個坑,即如果同時有N個子線程join(timeout),那麼實際上主線程會等待的超時時間最長爲 N * timeout,
因爲每個子線程的超時開始時刻是上一個子線程超時結束的時刻。
Event
.
import threading
import time
class MyThread(threading.Thread):
def __init__(self,id):
threading.Thread.__init__(self)
self.id = id
def run(self):
x = 0
time.sleep(10)
print(self.id)
if __name__ == "__main__":
t1=MyThread(999)
t1.start()
for i in range(5):
print(i)
# D:\Python\python\python-3.6.1\Python36-64\python.exe
# 0
# 1
# 2
# 3
# 4
import threading
import time
class MyThread(threading.Thread):
def __init__(self, id):
threading.Thread.__init__(self)
self.id = id
def run(self):
x = 0
time.sleep(10)
print(self.id)
if __name__ == "__main__":
t1 = MyThread(999)
t1.start()
t1.join()
for i in range(5):
print(i)
# D:\Python\python\python-3.6.1\Python36-64\python.exe
# 999
# 0
# 1
# 2
# 3
# 4
import threading
import time
class MyThread(threading.Thread):
def __init__(self, id):
threading.Thread.__init__(self)
def run(self):
time.sleep(5)
print("This is " + self.getName())
if __name__ == "__main__":
t1 = MyThread(999)
t1.setDaemon(True)
t1.start()
print("I am the father thread.")
# D:\Python\python\python-3.6.1\Python36-64\python.exe
# I am the father thread.
線程鎖(互斥鎖Mutex)
一個進程下可以啓動多個線程,多個線程共享父進程的內存空間,也就意味着每個線程可以訪問同一份數據,此時,如果2個線程同時要修改同一份數據,會出現什麼狀況?
# -*- coding:UTF-8 -*-
import time
import threading
def addNum():
global num # 在每個線程中都獲取這個全局變量
print('--get num:', num)
time.sleep(1)
num -= 1 # 對此公共變量進行-1操作
num = 100 # 設定一個共享變量
thread_list = []
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)
for t in thread_list: # 等待所有線程執行完畢
t.join()
print('final num:', num)
# D:\Python\python\python-3.6.1\Python36-64\python.exe
# --get num: 100
# --get num: 100
# --get num: 100
# --get num: 100
# ...
# --get num: 100
# --get num: 100
# final num: 0
# D:\Python\python\python-2.7.13\Python27\python2.exe
# ('--get num:', 100)
# ('--get num:', 100)
# ('--get num:', 100)
# ('--get num:', 100)
# ('--get num:', (100'--get num:'),
# 100)
#
# ('--get num:', 100)
# ('--get num:', 100)
# ('--get num:', 100)
# ('final num:', 1)
正常來講,這個num結果應該是0, 但在python 2.7上多運行幾次,會發現,最後打印出來的num結果不總是0,爲什麼每次運行的結果不一樣呢? 哈,很簡單,假設你有A,B兩個線程,此時都 要對num 進行減1操作, 由於2個線程是併發同時運行的,所以2個線程很有可能同時拿走了num=100這個初始變量交給cpu去運算,當A線程去處完的結果是99,但此時B線程運算完的結果也是99,兩個線程同時CPU運算的結果再賦值給num變量後,結果就都是99。那怎麼辦呢? 很簡單,每個線程在要修改公共數據時,爲了避免自己在還沒改完的時候別人也來修改此數據,可以給這個數據加一把鎖, 這樣其它線程想修改此數據時就必須等待你修改完畢並把鎖釋放掉後才能再訪問此數據。
*注:不要在3.x上運行,不知爲什麼,3.x上的結果總是正確的,可能是自動加了鎖
# -*- coding:UTF-8 -*-
import time
import threading
def addNum():
global num # 在每個線程中都獲取這個全局變量
print('--get num:', num)
time.sleep(1)
lock.acquire() # 修改數據前加鎖
num -= 1 # 對此公共變量進行-1操作
lock.release() # 修改後釋放
num = 100 # 設定一個共享變量
thread_list = []
lock = threading.Lock() # 生成全局鎖
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)
for t in thread_list: # 等待所有線程執行完畢
t.join()
print('final num:', num)
# D:\Python\python\python-2.7.13\Python27\python2.exe
# ('--get num:', 100)
# ('--get num:', 100)
# ...
# ('--get num:', 100)
# ('--get num:', 100)
# ('final num:', 0)
GIL VS Lock
那你又問了, 既然用戶程序已經自己有鎖了,那爲什麼C python還需要GIL呢?
加入GIL主要的原因是爲了降低程序的開發的複雜度,比如現在的你寫python不需要關心內存回收的問題,因爲Python解釋器幫你自動定期進行內存回收,你可以理解爲python解釋器裏有一個獨立的線程,每過一段時間它起wake up做一次全局輪詢看看哪些內存數據是可以被清空的,此時你自己的程序 裏的線程和 py解釋器自己的線程是併發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程中的clearing時刻,可能一個其它線程正好又重新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,爲了解決類似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題, 這可以說是Python早期版本的遺留問題。
RLock(遞歸鎖)
import threading, time
def run1():
print("grab the first part data")
lock.acquire()
global num
num += 1
lock.release()
return num
def run2():
print("grab the second part data")
lock.acquire()
global num2
num2 += 1
lock.release()
return num2
def run3():
lock.acquire()
res = run1()
print('--------between run1 and run2-----')
res2 = run2()
lock.release()
print(res, res2)
if __name__ == '__main__':
num, num2 = 0, 0
lock = threading.RLock()
for i in range(10):
t = threading.Thread(target=run3)
t.start()
while threading.active_count() != 1:
print(threading.active_count())
else:
print('----all threads done---')
print(num, num2)
# D:\Python\python\python-2.7.13\Python27\python2.exe
# grab the first part data
# --------between run1 and run2-----
# grab the second part data
# (grab the first part data1
# , --------between run1 and run2-----1
# )grab the second part data
#
# (2, 2)
# grab the first part data
# --------between run1 and run2-----
# grab the second part data
# (3, 3)
# grab the first part data
# --------between run1 and run2-----
# grab the second part data
# (4, 4)
# grab the first part data
# --------between run1 and run2-----
# grab the second part data
# (5, 5)
# grab the first part data
# --------between run1 and run2-----
# grab the second part data
# (6, 6grab the first part data)
#
# --------between run1 and run2-----
# grab the second part data
# (grab the first part data7
# , --------between run1 and run2-----7
# )grab the second part data
#
# (8, 8)grab the first part data
#
# --------between run1 and run2-----
# grab the second part data
# (9, 9)
# grab the first part data
# --------between run1 and run2-----
# grab the second part data
# (10, 10)
# 1
# ----all threads done---
# (10, 10)
#
Semaphore(信號量)
import threading, time
def run(n):
global num
semaphore.acquire()
time.sleep(1)
print("run the thread: %s\n" % n)
num += n;
semaphore.release()
if __name__ == '__main__':
num = 0
semaphore = threading.BoundedSemaphore(5) # 最多允許5個線程同時運行
for i in range(20):
t = threading.Thread(target=run, args=(i,))
t.start()
while threading.active_count() != 1:
# print ("threading.active_count():",threading.active_count())
pass
else:
print('----all threads done---')
print(num)
# D:\Python\python\python-3.6.1\Python36-64\python.exe
# run the thread: 2
# run the thread: 0
# run the thread: 4
# run the thread: 1
#
# run the thread: 3
#
#
#
#
# run the thread: 6
# run the thread: 5
#
# run the thread: 7
#
# run the thread: 9
# run the thread: 8
#
#
#
# run the thread: 14
# run the thread: 12
#
# run the thread: 13
# run the thread: 11
#
#
#
# run the thread: 10
#
# run the thread: 19
# run the thread: 16
# run the thread: 17
#
#
# run the thread: 15
#
#
# run the thread: 18
#
# ----all threads done---
# 190
#
start()
method. The timer can be stopped (before its action has begun) by calling the cancel()
method. The interval the timer will wait before executing its action may not be exactly the same as the interval specified by the user.from threading import Timer
def hello():
print("hello, world")
t = Timer(3.0, hello)
t.start()
# after 3 seconds, "hello, world" will be printed
Events
event = threading.Event()
event.wait()
event.set()
event.clear()
通過Event來實現兩個或多個線程間的交互:
import random
import threading
import time
def light():
if not event.isSet():
event.set() # wait就不阻塞 #綠燈狀態
count = 0
while True:
print("count:",count)
if count < 10:
print('\033[42;1m--green light on---\033[0m')
elif count < 13:
print('\033[43;1m--yellow light on---\033[0m')
elif count < 20:
if event.isSet():
event.clear()
print('\033[41;1m--red light on---\033[0m')
else:
count = 0
event.set() # 打開綠燈
time.sleep(1)
count += 1
def car(n):
while 1:
time.sleep(random.randrange(10))
if event.isSet(): # 綠燈
print("car [%s] is running.." % n)
else:
print("car [%s] is waiting for the red light.." % n)
if __name__ == '__main__':
event = threading.Event()
Light = threading.Thread(target=light)
Light.start()
for i in range(3):
t = threading.Thread(target=car, args=(i,))
t.start()
#_*_coding:utf-8_*_
__author__ = 'Alex Li'
import threading
import time
import random
def door():
door_open_time_counter = 0
while True:
if door_swiping_event.is_set():
print("\033[32;1mdoor opening....\033[0m")
door_open_time_counter +=1
else:
print("\033[31;1mdoor closed...., swipe to open.\033[0m")
door_open_time_counter = 0 #清空計時器
door_swiping_event.wait()
if door_open_time_counter > 3:#門開了已經3s了,該關了
door_swiping_event.clear()
time.sleep(0.5)
def staff(n):
print("staff [%s] is comming..." % n )
while True:
if door_swiping_event.is_set():
print("\033[34;1mdoor is opened, passing.....\033[0m")
break
else:
print("staff [%s] sees door got closed, swipping the card....." % n)
print(door_swiping_event.set())
door_swiping_event.set()
print("after set ",door_swiping_event.set())
time.sleep(0.5)
door_swiping_event = threading.Event() #設置事件
door_thread = threading.Thread(target=door)
door_thread.start()
for i in range(5):
p = threading.Thread(target=staff,args=(i,))
time.sleep(random.randrange(3))
p.start()
Queue是python標準庫中的線程安全的隊列(FIFO)實現,提供了一個適用於多線程編程的先進先出的數據結構,即隊列,用來在生產者和消費者線程之間的信息傳遞
class Queue.Queue(maxsize=0)
FIFO即First in First Out,先進先出。Queue提供了一個基本的FIFO容器,使用方法很簡單,maxsize是個整數,指明瞭隊列中能存放的數據個數的上限。一旦達到上限,插入會導致阻塞,直到隊列中的數據被消費掉。如果maxsize小於或者等於0,隊列大小沒有限制。
舉個栗子:
import queue
q = queue.Queue()
for i in range(5):
q.put(i)
while not q.empty():
print(q.get())
# D:\Python\python\python-3.6.1\Python36-64\python.exe
# 0
# 1
# 2
# 3
# 4
class Queue.LifoQueue(maxsize=0)
LIFO即Last in First Out,後進先出。與棧的類似,使用也很簡單,maxsize用法同上
再舉個栗子:
import queue
q = queue.LifoQueue()
for i in range(5):
q.put(i)
while not q.empty():
print(q.get())
# D:\Python\python\python-3.6.1\Python36-64\python.exe
# 4
# 3
# 2
# 1
# 0
可以看到僅僅是將Queue.Quenu類
替換爲Queue.LifiQueue類
class Queue.PriorityQueue(maxsize=0)
構造一個優先隊列。maxsize用法同上。
import Queue
import threading
class Job(object):
def __init__(self, priority, description):
self.priority = priority
self.description = description
print 'Job:',description
return
def __cmp__(self, other):
return cmp(self.priority, other.priority)
q = Queue.PriorityQueue()
q.put(Job(3, 'level 3 job'))
q.put(Job(10, 'level 10 job'))
q.put(Job(1, 'level 1 job'))
def process_job(q):
while True:
next_job = q.get()
print 'for:', next_job.description
q.task_done()
workers = [threading.Thread(target=process_job, args=(q,)),
threading.Thread(target=process_job, args=(q,))
]
for w in workers:
w.setDaemon(True)
w.start()
q.join()
# D:\Python\python\python-2.7.13\Python27\python2.exe
# Job: level 3 job
# Job: level 10 job
# Job: level 1 job
# for: level 1 job
# for: level 3 job
# for: level 10 job
意味着之前入隊的一個任務已經完成。由隊列的消費者線程調用。
每一個get()調用得到一個任務,接下來的task_done()調用告訴隊列該任務已經處理完畢。
如果當前一個join()正在阻塞,
它將在隊列中的所有任務都處理完時恢復執行(即每一個由put()調用入隊的任務都有一個對應的task_done()調用)。
阻塞調用線程,直到隊列中的所有任務被處理掉。
只要有數據被加入隊列,未完成的任務數就會增加。
當消費者線程調用task_done()(意味着有消費者取得任務並完成任務),未完成的任務數就會減少。
當未完成的任務數降到0,join()解除阻塞。
將item放入隊列中。
如果可選的參數block爲True且timeout爲空對象(默認的情況,阻塞調用,無超時)。
如果timeout是個正整數,阻塞調用進程最多timeout秒,如果一直無空空間可用,拋出Full異常(帶超時的阻塞調用)。
如果block爲False,如果有空閒空間可用將數據放入隊列,否則立即拋出Full異常
其非阻塞版本爲put_nowait等同於put(item, False)
從隊列中移除並返回一個數據。block跟timeout參數同put方法
其非阻塞方法爲`get_nowait()`相當與get(False)
如果隊列爲空,返回True,反之返回False
在併發編程中使用生產者和消費者模式能夠解決絕大多數併發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
爲什麼要使用生產者和消費者模式
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題於是引入了生產者和消費者模式。
什麼是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。
下面來學習一個最基本的生產者消費者模型的例子
import threading
import queue
def producer():
for i in range(10):
q.put("骨頭 %s" % i)
print("開始等待所有的骨頭被取走...")
q.join()
print("所有的骨頭被取完了...")
def consumer(n):
while q.qsize() > 0:
print("%s 取到" % n, q.get())
q.task_done() # 告知這個任務執行完了
q = queue.Queue()
p = threading.Thread(target=producer, )
p.start()
c1 = consumer("李闖")
# D:\Python\python\python-3.6.1\Python36-64\python.exe
# 開始等待所有的骨頭被取走...
# 李闖 取到 骨頭 0
# 李闖 取到 骨頭 1
# 李闖 取到 骨頭 2
# 李闖 取到 骨頭 3
# 李闖 取到 骨頭 4
# 李闖 取到 骨頭 5
# 李闖 取到 骨頭 6
# 李闖 取到 骨頭 7
# 李闖 取到 骨頭 8
# 李闖 取到 骨頭 9
# 所有的骨頭被取完了...
multiprocessing
is a package that supports spawning processes using an API similar to the threading
module. The multiprocessing
package offers both local and remote concurrency, effectively side-stepping(迴避) the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing
module allows the programmer to fully leverage( 槓桿作用; 優勢,力量) multiple processors on a given machine. It runs on both Unix and Windows.
from multiprocessing import Process
import time
def f(name):
time.sleep(2)
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
# D:\Python\python\python-3.6.1\Python36-64\python.exe
# hello bob
To show the individual process IDs involved, here is an expanded example:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
print("\n\n")
def f(name):
info('\033[31;1mfunction f\033[0m')
print('hello', name)
if __name__ == '__main__':
info('\033[32;1mmain process line\033[0m')
p = Process(target=f, args=('bob',))
p.start()
p.join()
# D:\Python\python\python-3.6.1\Python36-64\python.exe
# main process line
# module name: __main__
# parent process: 7652
# process id: 5576
#
#
#
# function f
# module name: __mp_main__
# parent process: 5576
# process id: 7888
#
#
#
# hello bob
#
Queues
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get())
p.join()
# D:\Python\python\python-3.6.1\Python36-64\python.exe
# [42, None, 'hello']
Pipes
Pipe()
function returns a pair of connection objects connected by a pipe which by default is duplex (two-way)(有兩部分的). For example:from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())
p.join()
# D:\Python\python\python-3.6.1\Python36-64\python.exe
# [42, None, 'hello']
Pipe()
represent the two ends of the pipe. send()
and recv()
methods (among others).
Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.Manager()
will support types list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
and Array
. For example,from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.append("A")
print(l)
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(5))
p_list = []
for i in range(10):
p = Process(target=f, args=(d, l))
p.start()
p_list.append(p)
for res in p_list:
res.join()
print(d)
print(l)
# D:\Python\python\python-3.6.1\Python36-64\python.exe
# [0, 1, 2, 3, 4, 'A']
# [0, 1, 2, 3, 4, 'A', 'A']
# [0, 1, 2, 3, 4, 'A', 'A', 'A']
# [0, 1, 2, 3, 4, 'A', 'A', 'A', 'A']
# [0, 1, 2, 3, 4, 'A', 'A', 'A', 'A', 'A']
# [0, 1, 2, 3, 4, 'A', 'A', 'A', 'A', 'A', 'A']
# [0, 1, 2, 3, 4, 'A', 'A', 'A', 'A', 'A', 'A', 'A']
# [0, 1, 2, 3, 4, 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A']
# [0, 1, 2, 3, 4, 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A']
# [0, 1, 2, 3, 4, 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A']
# {1: '1', '2': 2, 0.25: None}
# [0, 1, 2, 3, 4, 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A']
進程同步
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
# D:\Python\python\python-3.6.1\Python36-64\python.exe
# hello world 9
# hello world 7
# hello world 8
# hello world 6
# hello world 3
# hello world 1
# hello world 2
# hello world 5
# hello world 4
# hello world 0
import multiprocessing
import os
import time
from datetime import datetime
def subprocess(number):
# 子進程
print('這是第%d個子進程' % number)
pid = os.getpid() # 得到當前進程號
print('當前進程號:%s,開始時間:%s' % (pid, datetime.now().isoformat()))
time.sleep(30) # 當前進程休眠30秒
print('當前進程號:%s,結束時間:%s' % (pid, datetime.now().isoformat()))
def Bar(arg):
print('-->exec done:', arg)
def mainprocess():
# 主進程
print('這是主進程,進程編號:%d' % os.getpid())
t_start = datetime.now()
pool = multiprocessing.Pool()
for i in range(8):
pool.apply_async(subprocess, args=(i,), callback=Bar)
pool.close()
pool.join()
t_end = datetime.now()
print('主進程用時:%d毫秒' % (t_end - t_start).microseconds)
if __name__ == '__main__':
# 主測試函數
mainprocess()
# D:\Python\python\python-3.6.1\Python36-64\python.exe
# 這是主進程,進程編號:11224
# 這是第0個子進程
# 當前進程號:10640,開始時間:2017-08-10T08:34:36.821712
# 這是第1個子進程
# 當前進程號:10076,開始時間:2017-08-10T08:34:36.850713
# 這是第2個子進程
# 當前進程號:10996,開始時間:2017-08-10T08:34:36.859714
# 這是第3個子進程
# 當前進程號:10720,開始時間:2017-08-10T08:34:36.904716
# 當前進程號:10640,結束時間:2017-08-10T08:35:06.822428
# 這是第4個子進程
# 當前進程號:10640,開始時間:2017-08-10T08:35:06.822428
# -->exec done: None
# 當前進程號:10076,結束時間:2017-08-10T08:35:06.851429
# 這是第5個子進程
# 當前進程號:10076,開始時間:2017-08-10T08:35:06.851429
# -->exec done: None
# 當前進程號:10996,結束時間:2017-08-10T08:35:06.860430
# 這是第6個子進程
# 當前進程號:10996,開始時間:2017-08-10T08:35:06.860430
# -->exec done: None
# 當前進程號:10720,結束時間:2017-08-10T08:35:06.905432
# -->exec done: None
# 這是第7個子進程
# 當前進程號:10720,開始時間:2017-08-10T08:35:06.905432
# 當前進程號:10640,結束時間:2017-08-10T08:35:36.823144
# -->exec done: None
# 當前進程號:10076,結束時間:2017-08-10T08:35:36.852145
# -->exec done: None
# 當前進程號:10996,結束時間:2017-08-10T08:35:36.861146
# -->exec done: None
# 當前進程號:10720,結束時間:2017-08-10T08:35:36.906148
# -->exec done: None
# 主進程用時:417456毫秒
#
1、新建單一進程
如果我們新建少量進程,可以如下:
import multiprocessing
import time
def func(msg):
for i in range(3):
print(msg)
time.sleep(1)
if __name__ == "__main__":
p = multiprocessing.Process(target=func, args=("hello",))
p.start()
p.join()
print("Sub-process done.")
# D:\Python\python\python-3.6.1\Python36-64\python.exe
# hello
# hello
# hello
# Sub-process done.
2、使用進程池
是的,你沒有看錯,不是線程池。它可以讓你跑滿多核CPU,而且使用方法非常簡單。
import multiprocessing
import time
def func(msg):
for i in range(3):
print(msg)
time.sleep(1)
print("++++++++++")
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
for i in range(10):
msg = "hello %d" % (i)
pool.apply_async(func, (msg,))
pool.close()
pool.join()
print("Sub-process(es) done.")
# D:\Python\python\python-3.6.1\Python36-64\python.exe
# hello 0
# hello 1