[Python] 09 - Multi-processing

前言


資源

Ref: Python3 多線程html

Ref: Python3之多進程         # python中的多線程沒法利用多核優點python

更多的提升效率的策略,請參見:[Pandas] 01 - A guy based on NumPygit

 

 

 

多線程


1、認識線程

與進程的區別

線程在執行過程當中與進程仍是有區別的。

1. 每一個獨立的線程有一個程序運行的入口、順序執行序列和程序的出口。

2. 可是線程不可以獨立執行,必須依存在應用程序中,由應用程序提供多個線程執行控制。

3. 每一個線程都有他本身的一組CPU寄存器,稱爲線程的上下文,該上下文反映了線程上次運行該線程的CPU寄存器的狀態。

4. 指令指針 和 堆棧指針寄存器 是線程上下文中兩個最重要的寄存器,線程老是在進程獲得上下文中運行的,這些地址都用於標誌擁有線程的進程地址空間中的內存。
View Code

 

獲取CPU信息

Ref: https://github.com/giampaolo/psutilgithub

from multiprocessing import cpu_count
print(cpu_count())

 

 

2、建立線程 

Python3 經過兩個標準庫 _thread 和 threading 提供對線程的支持。安全

_thread 提供了低級別的、原始的線程以及一個簡單的鎖,它相比於 threading 模塊的功能仍是比較有限的。多線程

 

低級別:建立 _thread

提供了低級別,原始的線程以及一個簡單的鎖。app

#!/usr/bin/python3

import _thread
import time

# 爲線程定義一個函數
def print_time( threadName, delay):
   count = 0
   while count < 5:
      time.sleep(delay)
      count += 1
      print ("%s: %s" % ( threadName, time.ctime(time.time()) ))

----------------------------------------------------------------
# 建立兩個線程,參數是:函數名 以及對應的參數 try: _thread.start_new_thread( print_time, ("Thread-1", 2, ) ) _thread.start_new_thread( print_time, ("Thread-2", 4, ) ) except: print ("Error: 沒法啓動線程")
# 讓主線程不要提早結束
while 1: pass

 

高級別:建立 threading

採用了線程類的手法,該方法比較 engineering。dom

#!/usr/bin/python3

import threading
import time

exitFlag = 0

# 線程類
class myThread (threading.Thread):
def __init__(self, threadID, name, counter): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter
def run(self): print ("開始線程:" + self.name) print_time(self.name, self.counter, 5) print ("退出線程:" + self.name)
----------------------------------------------------------------
def print_time(threadName, delay, counter): while counter: if exitFlag: threadName.exit() time.sleep(delay) print ("%s: %s" % (threadName, time.ctime(time.time()))) counter -= 1
----------------------------------------------------------------
# (1) 建立新 線程'類‘ thread1 = myThread(1, "Thread-1", 1) thread2 = myThread(2, "Thread-2", 2) # (2) 啓動新線程 thread1.start() thread2.start()

# (3) 等待全部線程結束 thread1.join() thread2.join()
print ("退出主線程")

 

 

3、線程同步(鎖)

使用 Thread 對象的 Lock 和 Rlock 能夠實現簡單的線程同步,這兩個對象都有 acquire 方法和 release 方法;ide

對於那些須要每次只容許一個線程操做的數據,能夠將其操做放到 acquire 和 release 方法之間函數

#!/usr/bin/python3

import threading
import time

class myThread (threading.Thread):
def __init__(self, threadID, name, counter): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter
def run(self): print ("開啓線程: " + self.name)
------------------------------------------------------- threadLock.acquire()   # <---- print_time(self.name, self.counter, 3)
threadLock.release() # <---- -------------------------------------------------------


# 做爲線程共享資源
def print_time(threadName, delay, counter): while counter: time.sleep(delay) print ("%s: %s" % (threadName, time.ctime(time.time()))) counter -= 1

threadLock= threading.Lock() threads = []
# (1) 建立新線程 thread1 = myThread(1, "Thread-1", 1) thread2 = myThread(2, "Thread-2", 2) # (2) 開啓新線程 thread1.start() thread2.start() # (3) 等待線程 threads.append(thread1) threads.append(thread2) for t in threads: t.join() print ("退出主線程")

 

 

4、守護線程

setDaemon 設置

不添加setDaemon時,主線程和子線程分別在執行,約在主線程執行完5秒後子線程也執行完畢。

添加setDaemon的話,主進程執行完後不會等待 「做爲守護線程」 的子進程,以下代碼中,不會給child thread留有運行的機會。  

import threading
import time
from datetime import datetime


class MyThread(threading.Thread):
    def __init__(self, id):
        threading.Thread.__init__(self)
        self.id = id

    def run(self):
        time.sleep(5)
        print "子線程動做",threading.current_thread().name, datetime.now()


if __name__ == "__main__":
    t1 = MyThread(999)
    t1.setDaemon(True)        # 添加守護線程!
    t1.start()
    for i in range(5):
        print "主線程動做",threading.current_thread().name, datetime.now()

 

join() 方法

只是添加了join函數一行代碼,咱們發現主線程和子線程執行的順序就改變了。

主線程會等待子線程。

if __name__ == "__main__":
    t1 = MyThread(999)
    t1.start()
    t1.join()  # 添加join函數!
    for i in range(5):
        print "主線程動做",threading.current_thread().name, datetime.now()

Output: 等待child執行完,再執行join()以後main thread的內容。

child thread Thread-4 2019-09-26 17:50:16.049128
main thread MainThread 2019-09-26 17:50:16.050622
main thread MainThread 2019-09-26 17:50:16.050930
main thread MainThread 2019-09-26 17:50:16.051079
main thread MainThread 2019-09-26 17:50:16.051915
main thread MainThread 2019-09-26 17:50:16.05206

 

守護線程 + join函數

主線程一直等待所有的子線程結束以後,主線程自身才結束,程序退出。(其實守護線程的設置就沒用了)

if __name__ == "__main__":
    t1 = MyThread(999)
    t1.setDaemon(True)  # 添加守護線程!
    t1.start()
    t1.join()           # 添加join函數!
    for i in range(5):
        print "主線程動做",threading.current_thread().name, datetime.now()

 

 

 

多進程


1、僞並行 - GIL

Ref: 爲何老說python是僞多線程,怎麼解決?

GIL 的全名是 the Global Interpreter Lock (全局解釋鎖),是常規 python 解釋器(固然,有些解釋器沒有)的核心部件。

GIL 是 Python 解釋器正確運行的保證,Python 語言自己沒有提供任何機制訪問它。但在特定場合,咱們仍有辦法下降它對效率的影響。

使用多進程

經過cpython啓動多進程,能 "繞過" GIL。

from multiprocessing import Process def spawn_n_processes(n, target):

    threads = []

    for _ in range(n):
        thread = Process(target=target)
        thread.start()
        threads.append(thread)

    for thread in threads:
        thread.join()

經過 cpython 執行以上程序。

def test(target, number=10, spawner=spawn_n_threads):
    """
    分別啓動 1, 2, 3, 4 個控制流,重複 number 次,計算運行耗時
    """

    for n in (1, 2, 3, 4, ):

        start_time = time()
        for _ in range(number):
 spawner(n, target)
        end_time = time()

        print('Time elapsed with {} branch(es): {:.6f} sec(s)'.format(n, end_time - start_time))


test(fib, spawner=spawn_n_processes)

 

 

 

線程優先級隊列


1、Queue模塊

寫在前面

操做性質

Python 的 「Queue 模塊」 中提供了同步的、線程安全的隊列類,包括 

    1. FIFO(先入先出)隊列Queue
    2. LIFO(後入先出)隊列LifoQueue
    3. 優先級隊列 PriorityQueue

 

操做方法

三種隊列均提供以下方法:

這些隊列都實現了鎖原語,可以在多線程中直接使用,可使用隊列來實現線程間的同步。

Queue 模塊中的經常使用方法:

import Queue

      • Queue.qsize()                                     返回隊列的大小
      • Queue.empty()                                    若是隊列爲空,返回True,反之False
      • Queue.full()                                         若是隊列滿了,返回True,反之False
      • Queue.full                                           與 maxsize 大小對應
      • Queue.get([block[, timeout]])              獲取隊列,timeout等待時間
      • Queue.get_nowait()                            至關Queue.get(False)
      • Queue.put(item)                                  寫入隊列,timeout等待時間
      • Queue.put_nowait(item)                      至關Queue.put(item, False)
      • Queue.task_done()                             在完成一項工做以後,Queue.task_done()函數向任務已經完成的隊列發送一個信號
      • Queue.join()                                        實際上意味着等到隊列爲空,再執行別的操做

 

(1) FIFO隊列先進先出

From: python多線程-queue隊列類型優先級隊列,FIFO,LIFO

默認隊列:Queue.Queue()

#coding=utf8
import Queue

queuelist = Queue.Queue() for i in range(5):
    if not queuelist.full():
       queuelist.put(i)        print "put list : %s ,now queue size is %s "%(i,queuelist.qsize())

while not queuelist.empty():
    print "get list : %s , now queue size is %s"%(queuelist.get(),queuelist.qsize())

Output:

put list : 0 ,now queue size is 1 
put list : 1 ,now queue size is 2 
put list : 2 ,now queue size is 3 
put list : 3 ,now queue size is 4 
put list : 4 ,now queue size is 5 
get list : 0 , now queue size is 4
get list : 1 , now queue size is 3
get list : 2 , now queue size is 2
get list : 3 , now queue size is 1
get list : 4 , now queue size is 0

 

(2) LIFO隊列先進後出

原本是個stack,非要叫成是LIFO隊列,汗~

#coding=utf8
import Queue

queuelist = Queue.LifoQueue() for i in range(5):
    if not queuelist.full():
       queuelist.put(i)        print "put list : %s ,now queue size is %s "%(i,queuelist.qsize())

while not queuelist.empty():
    print "get list : %s , now queue size is %s"%(queuelist.get(),queuelist.qsize())

Output:

put list : 0 ,now queue size is 1 
put list : 1 ,now queue size is 2 
put list : 2 ,now queue size is 3 
put list : 3 ,now queue size is 4 
put list : 4 ,now queue size is 5 
get list : 4 , now queue size is 4
get list : 3 , now queue size is 3
get list : 2 , now queue size is 2
get list : 1 , now queue size is 1
get list : 0 , now queue size is 0

 

(3) 優先隊列

put方法的參數是個元組 (<優先級> ,<value>)。

#coding=utf8
import queue as Queue
import random

queuelist = Queue.PriorityQueue() for i in range(5):
    if not queuelist.full():
        x=random.randint(1,20)
        y=random.randint(1,20)
        print x
        queuelist.put((x,y)) while not queuelist.empty():
    print "get list : %s , now queue size is %s"%(queuelist.get(),queuelist.qsize())

Output:

 

11
5
10
7
10
get list : (5, 10) , now queue size is 4
get list : (7, 10) , now queue size is 3
get list : (10, 10) , now queue size is 2
get list : (10, 10) , now queue size is 1
get list : (11, 10) , now queue size is 0

 

 

 

2、綜合例子

栗子:模擬檢票過程

內容:一個隊,三個檢票口 (三個線程)

鎖機制:不能同時「取」,因此取的過程須要加「鎖」。

#coding=utf8

import Queue
import threading
import time

exitsingle = 0

class myThread(threading.Thread):
def __init__(self, threadname, queuelist): threading.Thread.__init__(self) self.threadname = threadname self.queuelist = queuelist def run(self): print "Starting queue %s"%self.threadname queue_enter(self.threadname, self.queuelist)  # 每個線程從管道中」取數據「 time.sleep(1) print "close " + self.threadname
def queue_enter(threadname, queuelist): while not exitsingle: queueLock.acquire() if not workQueue.empty(): data = queuelist.get() queueLock.release()    # 取完就能夠釋放「鎖」
print "%s check ticket %s" % (threadname, data) else: queueLock.release() time.sleep(1)
####################################################
# 初始化
#################################################### threadList
= ["list-1", "list-2", "list-3"] queueLock = threading.Lock() workQueue = Queue.Queue() threads = [] queueLock.acquire() for num in range(100001,100020): workQueue.put(num)        # 計入「票的編號」 queueLock.release()
print "start .."
# 三個線程從一個管道里取數據,但不能同時取 for name in threadList: thread = myThread( name, workQueue) thread.start() threads.append(thread) while not workQueue.empty(): pass exitsingle = 1 for t in threads: t.join() print "stop enter.."

 

 

栗子:生產者消費者問題

但這裏貌似少了lock相關,具體可參考以上兩個栗子。

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author      : 
# @File        : text.py
# @Software    : PyCharm
# @description : XXX
 
 
from queue import Queue
import random
import threading
import time
 
 
# Producer thread
class Producer(threading.Thread):
def __init__(self, t_name, queue): threading.Thread.__init__(self, name=t_name) self.data = queue
def run(self): for i in range(5): print("%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), i)) self.data.put(i) time.sleep(random.randrange(10) / 5) print("%s: %s finished!" % (time.ctime(), self.getName())) # Consumer thread class Consumer(threading.Thread):
def __init__(self, t_name, queue): threading.Thread.__init__(self, name=t_name) self.data = queue
def run(self): for i in range(5): val = self.data.get() print("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val)) time.sleep(random.randrange(10)) print("%s: %s finished!" % (time.ctime(), self.getName()))
# Main thread def main(): queue = Queue() producer = Producer('Pro.', queue) consumer = Consumer('Con.', queue)
producer.start() consumer.start() producer.join() consumer.join()
print('All threads terminate!') if __name__ == '__main__': main()

 

End.

相關文章
相關標籤/搜索