線程、進程、協程

線程:python

線程能夠理解成輕量的進程,實際在linux二者幾乎沒有區別,惟一的區別是線程並不產生新的地址空間和資源。linux

Threading用於提供線程的相關操做,線程是應用程序中工做的最小單元。程序員

import threading 
import time 
   
def show(arg): 
    time.sleep(1) 
    print('thread'+str(arg))
   
for i in range(10): 
    t = threading.Thread(target=show, args=(i,)) 
    t.start() 

上述代碼建立了10個前臺線程,而後控制器就交給了cpu,CPU根據內部算法進行調度,算法

更多的方法:shell

  • start 線程準備就緒,等待CPU調度
  • setName  爲線程設置名字
  • getName  獲取線程名稱
  • setDaemon  設置後臺線程或者前臺線程(默認False) 

    若是是後臺線程,主線程執行過程當中,後臺線程也在進行,主線程執行完畢後,後臺線程不論成功與否,均中止編程

    若是是前臺線程,主線程執行過程當中,前臺線程也在進行,主線程執行完畢後,等待前臺線程也執行完成後,程序中止設計模式

  •  join   逐個執行每一個線程,執行完畢後繼續往下執行,該方法使得多線程變得無心義
  • run  線程被cpu調度後自動執行線程對象的run方法

線程鎖:緩存

因爲線程之間是進行隨機調度,而且每一個線程可能只執行n條執行以後,CPU接着執行其餘線程。因此,可能出現以下問題:安全

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time

gl_num = 0

def show(arg):
    global gl_num
    time.sleep(1)
    gl_num +=1
    print gl_num

for i in range(10):
    t = threading.Thread(target=show, args=(i,))
    t.start()

print 'main thread stop'
未使用鎖
#!/usr/bin/env python 
#coding:utf-8 
    
import threading 
import time 
    
gl_num = 0
#生成鎖  
lock = threading.RLock() 
    
def Func(): 
    lock.acquire() #給線程上鎖
    global gl_num 
    gl_num +=1
    time.sleep(1) 
    print gl_num 
    lock.release() #釋放線程鎖
        
for i in range(10): #建立10個線程
    t = threading.Thread(target=Func) 
    t.start() 
加上線程鎖

event網絡

python線程的事件用於主線程控制其餘線程的執行,線程主要提供了三個方法:set、wait、clear

事件處理的機制:全局定義了一個"flag",值爲False,,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。

  • clear:將"flag"設置爲False
  • set:將"flag"設置爲True
#!/usr/bin/env python 
# -*- coding:utf-8 -*-  
import threading 
  
def do(event): 
    print 'start'
    event.wait() 
    print 'execute'
  
event_obj = threading.Event() 
for i in range(10): 
    t = threading.Thread(target=do, args=(event_obj,)) 
    t.start() 
  
event_obj.clear() 
inp = input('input:') 
if inp == 'true': 
    event_obj.set() 

使用線程隊列

當多個線程須要共享數據的或者資源的時候,可能會使線程的使用變得複雜,線程的模塊提供了許多同步原語,包括信號量、條件變量、事件和鎖。當這些選項存在時,最佳實踐是轉而關注於使用隊列,相比較而言,隊列更容易處理,而且更容易處理,而且可使線程編程更加安全,由於他們可以有效地傳遞單個線程資源的全部訪問,並支持更加清晰的、可讀性更高的設計模式。

url獲取序列:

import urllib2
import time
        
hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
        "http://ibm.com", "http://apple.com"]
        
start = time.time()
        #grabs urls of hosts and prints first 1024 bytes of page
for host in hosts:
     url = urllib2.urlopen(host)
     print url.read(1024)
        
print("Elapsed Time: %s" % (time.time() - start))
 

簡單版本的線程池:

#!/usr/bin/env python 
# -*- coding:utf-8 -*- 
import Queue 
import threading 
  
  
class ThreadPool(object): 
  
    def __init__(self, max_num=20): 
        self.queue = queue.Queue(max_num) 
        for i in xrange(max_num): 
            self.queue.put(threading.Thread) 
  
    def get_thread(self): 
        return self.queue.get() 
  
    def add_thread(self): 
        self.queue.put(threading.Thread) 
  
pool = ThreadPool(10) 
  
def func(arg, p): 
    print arg 
    import time 
    time.sleep(2) 
    p.add_thread() 
  
  
for i in xrange(30): 
    thread = pool.get_thread() 
    t = thread(target=func, args=(i, pool)) 
    t.start() 
簡單版的線程池
import queue 
import threading 
import contextlib 
  
StopEvent = object() 
  
class ThreadPool(object): 
  
    def __init__(self, max_num): 
        self.q = queue.Queue(max_num) 
        self.max_num = max_num 
        self.cancel = False
        self.generate_list = [] 
        self.free_list = [] 
  
    def run(self, func, args, callback=None): 
        """ 
        線程池執行一個任務 
        :param func: 任務函數 
        :param args: 任務函數所需參數 
        :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值(默認爲None,即:不執行回調函數) 
        :return: 若是線程池已經終止,則返回True不然None 
        """
        if self.cancel: 
            return True
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: 
            self.generate_thread() 
        w = (func, args, callback,) 
        self.q.put(w) 
  
    def generate_thread(self): 
        """ 
        建立一個線程 
        """
        t = threading.Thread(target=self.call) 
        t.start() 
  
    def call(self): 
        """ 
        循環去獲取任務函數並執行任務函數 
        """
        current_thread = threading.currentThread 
        self.generate_list.append(current_thread) 
  
        event = self.q.get() 
        while event != StopEvent: 
            func, arguments, callback = event 
            try: 
                result = func(*arguments) 
                success = True
            except Exception, e: 
                success = False
                result = None
  
            if callback is not None: 
                try: 
                    callback(success, result) 
                except Exception, e: 
                    pass
            with self.worker_state(self.free_list, current_thread): 
                event = self.q.get() 
        else: 
            self.generate_list.remove(current_thread) 
  
    def terminal(self): 
        """ 
        終止線程池中的全部線程 
        """
        self.cancel = True
        full_size = len(self.generate_list) 
        while full_size: 
            self.q.put(StopEvent) 
            full_size -= 1
  
    @contextlib.contextmanager 
    def worker_state(self, state_list, worker_thread): 
        """ 
        用於記錄線程中正在等待的線程數 
        """
        state_list.append(worker_thread) 
        try: 
            yield
        finally: 
            state_list.remove(worker_thread) 
進階版的線程池
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import queue
import threading
import contextlib
import time

StopEvent = object()


class ThreadPool(object):

    def __init__(self, max_num):
        self.q = queue.Queue()
        self.max_num = max_num

        self.terminal = False
        self.generate_list = []
        self.free_list = []

    def run(self, func, args, callback=None):
        """
        線程池執行一個任務
        :param func: 任務函數
        :param args: 任務函數所需參數
        :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值(默認爲None,即:不執行回調函數)
        :return: 若是線程池已經終止,則返回True不然None
        """

        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)
        self.q.put(w)

    def generate_thread(self):
        """
        建立一個線程
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循環去獲取任務函數並執行任務函數
        """
        current_thread = threading.currentThread
        self.generate_list.append(current_thread)

        event = self.q.get()
        while event != StopEvent:

            func, arguments, callback = event
            try:
                result = func(*arguments)
                success = True
            except Exception as e:
                success = False
                result = None

            if callback is not None:
                try:
                    callback(success, result)
                except Exception as e:
                    pass

            with self.worker_state(self.free_list, current_thread):
                if self.terminal:
                    event = StopEvent
                else:
                    event = self.q.get()
        else:

            self.generate_list.remove(current_thread)

    def close(self):
        """
        執行完全部的任務後,全部線程中止
        """
        full_size = len(self.generate_list)
        while full_size:
            self.q.put(StopEvent)
            full_size -= 1

    def terminate(self):
        """
        不管是否還有任務,終止線程
        """
        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)

        self.q.empty()

    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        """
        用於記錄線程中正在等待的線程數
        """
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)

# How to use

pool = ThreadPool(5)

def callback(status, result):
    # status, execute action status
    # result, execute action return value
    pass

def action(i):
    time.sleep(1)
    print(i)

for i in range(30):
    ret = pool.run(action, (i,), callback)

# pool.close()
# pool.terminate()
線程池更新一
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue
import threading
import contextlib
import time

StopEvent = object()

class ThreadPool(object):
    def __init__(self, max_num, max_task_num = None):
        if max_task_num:
            self.q = queue.Queue(max_task_num)
        else:
            self.q = queue.Queue()
        self.max_num = max_num
        self.cancel = False
        self.terminal = False
        self.generate_list = []
        self.free_list = []

    def run(self, func, args, callback=None):
        """
        線程池執行一個任務
        :param func: 任務函數
        :param args: 任務函數所需參數
        :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值(默認爲None,即:不執行回調函數)
        :return: 若是線程池已經終止,則返回True不然None
        """
        if self.cancel:
            return
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)
        self.q.put(w)

    def generate_thread(self):
        """
        建立一個線程
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循環去獲取任務函數並執行任務函數
        """
        current_thread = threading.currentThread
        self.generate_list.append(current_thread)

        event = self.q.get()
        while event != StopEvent:

            func, arguments, callback = event
            try:
                result = func(*arguments)
                success = True
            except Exception as e:
                success = False
                result = None

            if callback is not None:
                try:
                    callback(success, result)
                except Exception as e:
                    pass

            with self.worker_state(self.free_list, current_thread):
                if self.terminal:
                    event = StopEvent
                else:
                    event = self.q.get()
        else:

            self.generate_list.remove(current_thread)

    def close(self):
        """
        執行完全部的任務後,全部線程中止
        """
        self.cancel = True
        full_size = len(self.generate_list)
        while full_size:
            self.q.put(StopEvent)
            full_size -= 1

    def terminate(self):
        """
        不管是否還有任務,終止線程
        """
        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)

        self.q.empty()

    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        """
        用於記錄線程中正在等待的線程數
        """
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)

# How to use

pool = ThreadPool(5)

def callback(status, result):
    # status, execute action status
    # result, execute action return value
    pass

def action(i):
    print(i)

for i in range(30):
    ret = pool.run(action, (i,), callback)

time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
# pool.close()
# pool.terminate()
線程池更新二

queue模塊:

queue就是對隊列,它是線程安全的

舉例來講,咱們去肯德基吃飯。廚房是給咱們作飯的地方,前臺負責把廚房作好的飯賣給顧客,顧客則去前臺領取作好的飯。這裏的前臺就至關於咱們的隊列。

這個模型也叫生產者-消費者模型

import queue

q = queue.Queue(maxsize=0)  # 構造一個先進顯出隊列,maxsize指定隊列長度,爲0 時,表示隊列長度無限制。

q.join()    # 等到隊列爲kong的時候,在執行別的操做
q.qsize()   # 返回隊列的大小 (不可靠)
q.empty()   # 當隊列爲空的時候,返回True 不然返回False (不可靠)
q.full()    # 當隊列滿的時候,返回True,不然返回False (不可靠)
q.put(item, block=True, timeout=None) #  將item放入Queue尾部,item必須存在,能夠參數block默認爲True,表示當隊列滿時,會等待隊列給出可用位置,                         爲False時爲非阻塞,此時若是隊列已滿,會引起queue.Full 異常。 可選參數timeout,表示 會阻塞設置的時間,事後,                          若是隊列沒法給出放入item的位置,則引起 queue.Full 異常q.get(block=True, timeout=None) #   移除並返回隊列頭部的一個值,可選參數block默認爲True,表示獲取值的時候,若是隊列爲空,則阻塞,爲False時,不阻塞,                      若此時隊列爲空,則引起 queue.Empty異常。 可選參數timeout,表示會阻塞設置的時候,事後,若是隊列爲空,則引起Empty異常。q.put_nowait(item) #   等效於 put(item,block=False)q.get_nowait() #    等效於 get(item,block=False)

生產者--消費者:

#!/usr/bin/env python
import Queue
import threading


message = Queue.Queue(10)


def producer(i):
    while True:
        message.put(i)


def consumer(i):
    while True:
        msg = message.get()


for i in range(12):
    t = threading.Thread(target=producer, args=(i,))
    t.start()

for i in range(10):
    t = threading.Thread(target=consumer, args=(i,))
    t.start()

multiprocessing模塊

multiprocessing是python的多進程管理包,和threading.Thread相似。直接從側面用subprocesses替換線程使用GIL的方式,因爲這一點,multiprocessing模塊可讓程序員在給定的機器上充分的利用CPU。

在multiprocessing中,經過建立Process對象生成進程,而後調用它的start()方法,

from multiprocessing import Process
 
def f(name):
    print('hello', name)
 
if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

 

進程:

進程間的數據共享

在使用併發設計的時候最好儘量的避免共享數據,尤爲是在使用多進程的時候。 若是你真有須要 要共享數據, multiprocessing提供了兩種方式。

Shared memory

數據能夠用Value或Array存儲在一個共享內存地圖裏,以下:

from multiprocessing import Process, Value, Array
 
def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]
 if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))
 
    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()
 
    print(num.value)
    print(arr[:])

輸出:

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

建立num和arr時,「d」和「i」參數由Array模塊使用的typecodes建立:「d」表示一個雙精度的浮點數,「i」表示一個有符號的整數,這些共享對象將被線程安全的處理。Array(‘i’, range(10))中的‘i’參數:

‘c’: ctypes.c_char     ‘u’: ctypes.c_wchar    ‘b’: ctypes.c_byte     ‘B’: ctypes.c_ubyte‘h’: ctypes.c_short     ‘H’: ctypes.c_ushort    ‘i’: ctypes.c_int      ‘I’: ctypes.c_uint
‘l’: ctypes.c_long,    ‘L’: ctypes.c_ulong    ‘f’: ctypes.c_float    ‘d’: ctypes.c_double

Server process

由Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array類型的支持

from multiprocessing import Process, Manager
 def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()
 
if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))
 
        p = Process(target=f, args=(d, l))
        p.start()
        p.join()
 
        print(d)
        print(l)

輸出:

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

Server process manager比 shared memory 更靈活,由於它能夠支持任意的對象類型。另外,一個單獨的manager能夠經過進程在網絡上不一樣的計算機之間共享,不過他比shared memory要慢。

進程池:

Pool類描述了一個工做進程池,他有幾種不一樣的方法讓任務卸載工做進程。

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止咱們能夠用Pool類建立一個進程池, 展開提交的任務給進程池。 例:

from multiprocessing import Pool
import time
def myFun(i):
    time.sleep(2)
    return i+100

def end_call(arg):
    print("end_call",arg)
p = Pool(5)

# print(p.map(myFun,range(10)))
for i in range(10):
    p.apply_async(func=myFun,args=(i,),callback=end_call)
print("end")p.close()
p.join()
from multiprocessing import Pool, TimeoutError
import time
import os
 
def f(x):
    return x*x
 
if __name__ == '__main__':
    # 建立4個進程 
    with Pool(processes=4) as pool:
 
        # 打印 "[0, 1, 4,..., 81]" 
        print(pool.map(f, range(10)))
 
        # 使用任意順序輸出相同的數字, 
        for i in pool.imap_unordered(f, range(10)):
            print(i)
 
        # 異步執行"f(20)" 
        res = pool.apply_async(f, (20,))      # 只運行一個進程 
        print(res.get(timeout=1))             # 輸出 "400" 
 
        # 異步執行 "os.getpid()" 
        res = pool.apply_async(os.getpid, ()) # 只運行一個進程 
        print(res.get(timeout=1))             # 輸出進程的 PID 
 
        # 運行多個異步執行可能會使用多個進程 
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])
 
        # 是一個進程睡10秒 
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("發現一個 multiprocessing.TimeoutError異常")
 
        print("目前,池中還有其餘的工做")
 
    # 退出with塊中已經中止的池 
    print("Now the pool is closed and no longer available")
進程池官方版

 

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
  • processes :使用的工做進程的數量,若是processes是None那麼使用 os.cpu_count()返回的數量。
  • initializer: 若是initializer是None,那麼每個工做進程在開始的時候會調用initializer(*initargs)。
  • maxtasksperchild:工做進程退出以前能夠完成的任務數,完成後用一個心的工做進程來替代原進程,來讓閒置的資源被釋放。maxtasksperchild默認是None,意味着只要Pool存在工做進程就會一直存活。
  • context: 用在制定工做進程啓動時的上下文,通常使用 multiprocessing.Pool() 或者一個context對象的Pool()方法來建立一個池,兩種方法都適當的設置了context注意:Pool對象的方法只能夠被建立pool的進程所調用。進程池的方法
  • apply(func[, args[, kwds]]) :使用arg和kwds參數調用func函數,結果返回前會一直阻塞,因爲這個緣由,apply_async()更適合併發執行,另外,func函數僅被pool中的一個進程運行。

  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一個變體,會返回一個結果對象。若是callback被指定,那麼callback能夠接收一個參數而後被調用,當結果準備好回調時會調用callback,調用失敗時,則用error_callback替換callback。 Callbacks應被當即完成,不然處理結果的線程會被阻塞。

  • close() : 阻止更多的任務提交到pool,待任務完成後,工做進程會退出。

  • terminate() : 無論任務是否完成,當即中止工做進程。在對pool對象進程垃圾回收的時候,會當即調用terminate()。

  • join() : wait工做線程的退出,在調用join()前,必須調用close() or terminate()。這樣是由於被終止的進程須要被父進程調用wait(join等價與wait),不然進程會成爲殭屍進程。

  • map(func, iterable[, chunksize])¶

  • map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶

  • imap(func, iterable[, chunksize])¶

  • imap_unordered(func, iterable[, chunksize])

  • starmap(func, iterable[, chunksize])¶

  • starmap_async(func, iterable[, chunksize[, callback[, error_back]]])

協程

協程又叫微線程,從技術的角度來講,「協程就是你能夠暫停執行的函數」。若是你把它理解成「就像生成器同樣」,那麼你就想對了。 線程和進程的操做是由程序觸發系統接口,最後的執行者是系統;協程的操做則是程序員。

協程存在的意義:對於多線程應用,CPU經過切片的方式來切換線程間的執行,線程切換時須要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。

協程的適用場景:當程序中存在大量不須要CPU的操做時(IO),適用於協程;

Event Loop

Event Loop是一種等待程序分配時間或消息的編程架構。簡單的說就是 當事件A發生的時候,咱們就去執行事件B。 最簡單的例子就是:當咱們瀏覽網頁的時候,咱們點擊頁面的某個元素,這個點擊事件會被 JavaScript 捕捉到,而後 JavaScript 就會檢查這個事件是否綁定了onclick()回調函數來處理這個事件,只要綁定了,onclick()回調函數就會被執行。

event loop是協程執行的控制點, 若是你但願執行協程, 就須要用到它們。

event loop提供了以下的特性:

  • 註冊、執行、取消延時調用(異步函數)
  • 建立用於通訊的client和server協議(工具)
  • 建立和別的程序通訊的子進程和協議(工具)
  • 把函數調用送入線程池中

協程示例:

import asyncio
 
async def cor1():
    print("COR1 start")
    await cor2()
    print("COR1 end")
 
async def cor2():
    print("COR2")
 
loop = asyncio.get_event_loop()
loop.run_until_complete(cor1())
loop.close()

最後三行是重點:

  • asyncio.get_event_loop()  : asyncio啓動默認的event loop 
  • run_until_complete()  :  這個函數是阻塞執行的,知道全部的異步函數執行完成,
  • close()  :  關閉event loop。

subprocess模塊

經過使用subprocess模塊能夠建立新的進程,鏈接到他們的輸入/輸出/錯誤管道,並獲取他們的返回值。 該模塊計劃替代及一箇舊的模塊的方法:

os.system 
os.spawn* 

使用subprocess模塊

在全部用例調用subprocess時推薦使用run()方法,更高級的用例,能夠直接使用subprocess.Popen接口。

run()方法

在Python3.5增長的。

subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, timeout=None, check=False) 

run()默認不會捕捉到標準輸出和標準錯誤輸出,要捕捉的話,能夠爲標準輸出和標準錯誤輸出指定subprocess.PIPE(一個特殊值,可被用於Popen的stdin, stdout或 stderr參數,表示一個標準流的管道應該被打開, Popen.communicate()用的最多)。

  • args :args應該是一個字符串或者一個序列。
  • timeout:設置超時時間,會傳遞給subprocess.Popen.communicate()。若是超時,子進程會被殺死並等待。子進程被終止後會報告一個 TimeoutExpired異常。
  • input參數會傳遞給subprocess.Popen.communicate(),從而做爲subprocess的標準輸入。當咱們使用的時候,內部的Popen對象會自動建立stdin=PIPE,stdin參數可能不會被使用。
  • check:若是check參數爲True,且進程退出的時候獲得退出碼一個非0的值,會報告一個 CalledProcessError異常
  • shell:shell參數默認爲False,此時arg參數應該是一個列表。subprocess.run(["ls","-l"]) ; 當shell=True時,args能夠是一個字符串。subprocess.run("ls -l",shell=True)
>>> ret = subprocess.run(["ls", "-l"])  # doesn't capture output 
CompletedProcess(args=['ls', '-l'], returncode=0)
>>> print(ret.stdout)
None
 
>>> subprocess.run("exit 1", shell=True, check=True)
Traceback (most recent call last):
  ...
subprocess.CalledProcessError: Command 'exit 1' returned non-zero exit status 1
 
>>> ret1 = subprocess.run(["ls", "-l", "/dev/null"], stdout=subprocess.PIPE)
CompletedProcess(args=['ls', '-l', '/dev/null'], returncode=0,
stdout=b'crw-rw-rw- 1 root root 1, 3 Jan 23 16:23 /dev/null\n')
 
>>> print(ret.stdout)
b'crw-rw-rw- 1 root root 1, 3 6\xe6\x9c\x88   8 06:50 /dev/null\n'

call方法

subprocess.call(args, *, stdin=Nonestdout=Nonestderr=Noneshell=Falsetimeout=None

call()方法等價於:run(..., check=True)和run()方法類因此,只是不支持input參數和check參數; 

注意: 不要在這個方法裏使用stdout=PIPE 或 stderr=PIPE,當到一個管道的輸出填滿系統的管道緩存時,子進程會被阻塞。  

check_call方法

subprocess.check_output(args, *, stdin=None, stderr=None, shell=False, universal_newlines=False, timeout=None)

check_call()方法等價於: run(..., check=True, stdout=PIPE).stdout

3.1新增,3.3時增長了timeout參數,3.4時增長了對關鍵字參數的支持

check_output()方法

內部調用的是run()方法,可是會捕捉到stdout

>>> ret = subprocess.check_output(["ls", "-l", "/dev/null"])
>>> print(ret)
b'crw-rw-rw- 1 root root 1, 3 6\xe6\x9c\x88   8 06:50 /dev/null\n'

Popen類

上面的四個方法本質上調用的都是subprocess中的Popen類。

Popen對象都有如下方法:

poll() : 檢查子進程是否已經終止,返回一個returncode,至關於exit code。

wait() : 等待子進程終止,返回一個returncode

communicate(input=None) :和進程進行交互:發送數據到stdin;從stdout和stderr讀取數據,直到讀取完。等待進程終止。可選參數input會傳遞數據給子進程,當input=None事,沒有數據傳遞給子進程。 communicate() 返回一個元組 (stdout, stderr). 注意: 讀取的數據在內存的buffer中,因此當數據大小大於buffer或者沒有限制時,不要使用這個方法。

相關文章
相關標籤/搜索