python基礎之線程、進程、協程

線程

線程基礎知識

一個應用程序,能夠多進程、也能夠多線程.
一個python腳本,默認是單進程,單線程的。
I/O操做(音頻、視頻、顯卡操做),不佔用CPU,因此:java

  • 對於I/O密集型操做,不會佔用CPU,使用多線程操做,能提升效率
  • 對於計算密集型操做,因爲佔用CPU,使用多進程操做,能提升效率

python中有個全局解釋器鎖,叫GIL(全稱Global Interpreter Lock),致使一個進程只能由一個線程讓CPU去調度,但在java c#可使用多個線程。
多線程,多進程的目的,是爲了提升併發,I/O密集型用多線程,計算密集型,用多進程。python

咱們來看看怎麼建立多線程:git

def f1(args):
     print(args)
import threading
t=threading.Thread(target=f1,args=(123,))    #建立一個線程,target表示線程執行的目標,args表示參數
t.start()     #並不表明當前當即被執行,系統來決定
f1(111)

以上代碼結果print順序會隨機!程序員

更多的方法:github

  • start 不表明當前線程並不會當即被執行,而是等待CPU調度,(準備就緒,等待調度)
  • setName 爲線程設置名稱
  • setDaemon(True) True表示主線程不等待子線程,執行完本身的任務後,自動關閉,子線程有可能未執行完畢。(默認狀況下,主線程要等待子線程執行完畢後再關閉主線程),(True:後臺線程,主線程執行過程當中,後臺線程也在進行,主線程執行完畢後,後臺線程不論成功與否,均中止;False:前臺線程,主線程執行過程當中,前臺線程也在進行,主線程執行完畢後,等待前臺線程也執行完成後,程序中止)
  • join(2) 若是不想讓線程併發的操做,表示主線程到此等待,等待直到子線程執行完畢。若是加上參數,表示主線程在此最多等幾秒。該方法使得多線程變得無心義
  • run 線程被cpu調度後自動執行線程對象的run方法
import time

def f1(args):
    time.sleep(5)
    print(args)

import threading
t1=threading.Thread(target=f1,args=(123,))
t1.setDaemon(True)  #表示主線程不等待子線程
t.start()     #並不表明當前被當即被執行,系統來決定
f1(111)

t.join(2) #表示主程序執行到此,等待...直到子線程執行完畢
print(222222)
print(333333)

下面看下run方法:c#

class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num = num

    def run(self):#定義每一個線程要運行的函數

        print("running on number:%s" %self.num)

        time.sleep(3)

if __name__ == '__main__':

    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()

線程的調用方式:

線程有兩種調用方式:數組

  1. 簡單的調用方式
  2. 自定義類的調用方式

簡單調用方式:自定義一個繼承threading.thread的子類,經過自定義類的對象調用網絡

import threading

def f1(arg):
    print(arg)

t = threading.Thread(target=f1,args=(123,))
t.run()

自定義類的調用方式多線程

import threading

def f2(arg):
    print(arg)

class MyThread(threading.Thread):
    def __init__(self,func,args):
        self.func=func
        self.args=args
        super(MyThread,self).__init__()

    def run(self):
        self.func(self.args)


obj=MyThread(f2,123)
obj.run()

線程鎖

因爲線程之間進行隨機調度,而且每一個線程可能只執行n條操做後,當多個線程同時修改同一條數據時,可能會出現髒數據:同一時刻只能容許指定的線程數執行操做.
python中的線程鎖有Lock, RLock兩種,其中RLock用的較多,由於支持多層嵌套的方式,Lock用的較少,不支持多層嵌套鎖.併發

def func(l):
    global NUM
    #上鎖
    l.acquire()
    NUM-=1
    time.sleep(2)
    print(NUM)
    #開鎖
    l.release()

lock=threading.RLock() #放行幾個線程出去執行

for i in range(30):
    t=threading.Thread(target=func,args=(lock,))
    t.start()

若是不使用線程鎖,上面程序會有30個線程同時執行,結果爲30個-20

信號量(互斥鎖)

semaphore,同時容許指定數量的線程更改數據,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去。

import threading,time

NUM=10

def func(l):
    global NUM
    #上鎖
    l.acquire()
    NUM-=1
    time.sleep(2)
    print(NUM)
    #開鎖
    l.release()

lock=threading.BoundedSemaphore(3)  #放行幾個線程出去執行

for i in range(30):
    t=threading.Thread(target=func,args=(lock,))
    t.start()

從上面兩個代碼對比,咱們會發現,semaphore若是設置爲1時,也可實現信號鎖的功能.

事件(event)

python線程中的event主要用於讓主線程控制其子線程的執行方式(有點相似交警控制紅綠燈),event主要提供三個方法:set wait clear
事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。

  • clear:將'Flag'設置爲False
  • set:將'Flag'設置爲True
import threading

def func(i,e):
    print(i)
    e.wait()        #檢測是什麼燈
    print(i+100)

event=threading.Event()
for i in range(10):
    t = threading.Thread(target=func,args=(i,event))
    t.start()

event.clear() #默認是紅燈
inp = input('>>>')
if inp == '1':
    event.set() #設置成綠燈

條件(condition)

使線程等待,當條件成立時,釋放線程執行.

import threading

def func(i,con):
    print(i)
    con.acquire()   #配合,固定格式,線程hold住
    con.wait()
    print(i+100)
    con.release()

c=threading.Condition()
for i in range(10):
    t = threading.Thread(target=func,args=(i,c,))
    t.start()

while True:
    inp = input('>>>|')
    if inp == 'q':
        break
    c.acquire()     #如下都是固定格式
    c.notify(int(inp))
    c.release()

例子1中,寫到函數func中的:c.acquire(),c.notify(args),c.release()是固定格式.
例子2:wait_for

import threading

def condition():
    ret=False
    r = input('>>|')
    if r == 'true':
        ret= True
    else:
        ret=False
    return ret

def func(i,con):
    print(i)
    con.acquire()   #配合,固定格式,線程hold住
    con.wait_for(condition)
    print(i+100)
    con.release()

c=threading.Condition()
for i in range(10):
    t = threading.Thread(target=func,args=(i,c,))
    t.start()

其中例子2中,con.acquire(),con.wait_for(condition)是固定格式配合使用,攔截線程,con.release()釋放線程.

定時器(timer):

定時器,延遲多長時間(單位:秒)執行

import threading

def hello():
    print('hello,world!!')

t=threading.Timer(1,hello)
t.start()

線程池

python的線程池有兩種實現方式,咱們先來看一個比較簡單的實現方式.
實現思路:

  1. 經過隊列(先進先出隊列,隊列都是在內存中操做,進程退出,隊列清空)來實現,線程池中沒有線程時爲阻塞狀態.
  2. 自定義一個線程池類,構造方法時,建立一個指定元素數量的隊列,隊列中的元素爲線程類
  3. 使用線程時,使用隊列的get方法獲得一個線程類,使用__call__方法建立一個線程,使用線程執行指定的程序
  4. 程序執行完成後,在隊列中添加一個新的線程類
import threading,time,queue

class ThreadPool:
    def __init__(self,maxsize):
        self.maxsize=maxsize
        self._q=queue.Queue(maxsize)
        for i in range(maxsize):
            self._q.put(threading.Thread)

    def get_thread(self):
        return self._q.get()
    def add_thread(self):
        self._q.put(threading.Thread)

pool=ThreadPool(5)

def task(arg,p):
    print(arg)
    time.sleep(1)
    p.add_thread()

for i in range(100):
    t = pool.get_thread()   #線程池中沒有線程爲阻塞狀態
    obj=t(target=task,args=(i,pool))
    obj.start()

此方式的缺點:沒有將線程重複利用,要直到建立一個線程的耗時多是一個線程執行的好幾倍,因此有了第二種方式.
第二種方式是也是使用隊列,但隊列中的元素爲爲一個個(函數名,函數參數,)的元組,建立一個線程組成的列表,線程輪流去隊列中取到元組,分解後執行函數,而後取下一個函數.

import queue
import threading
import contextlib
import time

StopEvent = object()


class ThreadPool(object):

    def __init__(self, max_num, max_task_num = None):
        if max_task_num:
            self.q = queue.Queue(max_task_num)
        else:
            self.q = queue.Queue()
        self.max_num = max_num
        self.cancel = False
        self.terminal = False
        self.generate_list = []
        self.free_list = []

    def run(self, func, args, callback=None):
        """
        線程池執行一個任務
        :param func: 任務函數
        :param args: 任務函數所需參數
        :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值(默認爲None,即:不執行回調函數)
        :return: 若是線程池已經終止,則返回True不然None
        """
        if self.cancel:
            return
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)
        self.q.put(w)

    def generate_thread(self):
        """
        建立一個線程
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循環去獲取任務函數並執行任務函數
        """
        current_thread = threading.currentThread
        self.generate_list.append(current_thread)

        event = self.q.get()
        while event != StopEvent:

            func, arguments, callback = event
            try:
                result = func(*arguments)
                success = True
            except Exception as e:
                success = False
                result = None

            if callback is not None:
                try:
                    callback(success, result)
                except Exception as e:
                    pass

            with self.worker_state(self.free_list, current_thread):
                if self.terminal:
                    event = StopEvent
                else:
                    event = self.q.get()
        else:

            self.generate_list.remove(current_thread)

    def close(self):
        """
        執行完全部的任務後,全部線程中止
        """
        self.cancel = True
        full_size = len(self.generate_list)
        while full_size:
            self.q.put(StopEvent)
            full_size -= 1

    def terminate(self):
        """
        不管是否還有任務,終止線程
        """
        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)

        self.q.empty()

    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        """
        用於記錄線程中正在等待的線程數
        """
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)



# How to use

pool = ThreadPool(5)

def callback(status, result):
    # status, execute action status
    # result, execute action return value
    pass

def action(i):
    print(i)

for i in range(30):
    ret = pool.run(action, (i,), callback)

time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
pool.close()
pool.terminate()

進程(multiprocessing)

進程與線程的使用方式基本雷同.好比start,daemon(用法略不一樣,意義相同),join,各類鎖等等.

進程之間的數據共享

默認進程之間是沒法進行共享的,看例子:

from multiprocessing import Process

li = []

def foo(i):
    li.append(i)
    print('say hi',li)

for i in range(10):
    p = Process(target=foo,args=(i,))
    p.start()

print('ending',li)

out:

say hi [0]
say hi [1]
say hi [2]
say hi [3]
say hi [4]
say hi [5]
say hi [6]
say hi [7]
ending []
say hi [8]
say hi [9]

那麼如何讓進程之間可以共享呢?
基本可分爲三種方式:

  1. from multiprocessing import queues 特殊的queues
  2. from multiprocessing import Array 數組方式
  3. from multiprocessing import Manager manager.dict
    須要注意的是,他們都是multiprocessing中的模塊

queues方式:

from multiprocessing import queues
from multiprocessing import Process
import multiprocessing
def foo(i,arg):
    arg.put(i)
    print('say hi',i,arg.qsize())

if __name__ == '__main__':
    li=queues.Queue(20,ctx=multiprocessing)
    for i in range(10):
        p=Process(target=foo,args=(i,li,))
        p.start()

Array方式,數組有個特性,必須初始化的時候指定數組的長度和元素類型:

from multiprocessing import Process
from multiprocessing import Array
def foo(i,arg):
    arg[i]=i+100
    for item in arg:
        print(item)
    print('======')

if __name__ == '__main__':
    li=Array('i',10)
    for i in range(10):
        p=Process(target=foo,args=(i,li,))
        p.start()

Array的類型對應表:

'c': ctypes.c_char,  'u': ctypes.c_wchar,
    'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
    'h': ctypes.c_short, 'H': ctypes.c_ushort,
    'i': ctypes.c_int,   'I': ctypes.c_uint,
    'l': ctypes.c_long,  'L': ctypes.c_ulong,
    'f': ctypes.c_float, 'd': ctypes.c_double

manager.dict 可實現數據共享
進程和進程之間若是想通信,須要鏈接p.join()

from multiprocessing import Process
from multiprocessing import Manager
def foo(i,arg):
    arg[i]=i+100
    print(arg.values())
if __name__ == '__main__':
    obj=Manager()
    li=obj.dict()
    for i in range(10):
        p=Process(target=foo,args=(i,li,))
        p.start()
        p.join()

當建立進程時(非使用時),共享數據會被拿到子進程中,當進程中執行完畢後,再賦值給原值。
進程鎖例子:

from multiprocessing import Process, Array, RLock

def Foo(lock,temp,i):
    """
    將第0個數加100
    """
    lock.acquire()
    temp[0] = 100+i
    for item in temp:
        print(i,'----->',item)
    lock.release()

lock = RLock()
temp = Array('i', [11, 22, 33, 44])

for i in range(20):
    p = Process(target=Foo,args=(lock,temp,i,))
    p.start()

進程池

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進程,那麼程序就會等待,直到進程池中有可用進程爲止。
進程池有兩種方式:

  • apply 串行操做
  • 異步操做apply_async
    串行操做:
from multiprocessing import Pool
import time

def f1(arg):
    time.sleep(1)
    print(arg)
if __name__ == '__main__':
    pool=Pool(5)
    for i in range(30):
        pool.apply(func=f1,args=(i,))

異步操做:

from multiprocessing import Pool
import time

def f1(arg):
    time.sleep(1)
    print(arg)
if __name__ == '__main__':
    pool=Pool(5)
    for i in range(30):
        pool.apply_async(func=f1,args=(i,))
    pool.close() #全部任務執行完畢          #1
    time.sleep(1)
    #pool.terminate() #當即終止,不論是否有任務正在執行或者待執行     #2
    pool.join()

其中 #1 #2 二選一操做
pool.terminate 當即終止,不論是否有任務正在執行或者等待執行
pool.close 全部任務執行完畢後關閉

協程

原理:利用一個線程,分解一個線程成爲多個微線程==>程序級別作的,與操做系統沒有關係.
與線程進程的區別:線程和進程的操做是由程序觸發系統接口,最後的執行者是系統;協程的操做則是程序員。
協程的適用場景:涉及到http的I/O請求,協程是高性能的代名詞.因此,網絡爬蟲不少是使用協程方式.
協程存在的意義:對於多線程應用,CPU經過切片的方式來切換線程間的執行,線程切換時須要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。
協程的執行方式:打個比方,1個很牛逼的足球隊員,前面一排並列的足球,從第一個足球踢出去,而後提出第二個第三個,等足球彈回起始位置時,足球員對此足球接住後再次剃出或者停住球,這個足球員就是協程

協程使用前提

使用前須要安裝gevent第三方模塊
pip3 install gevent
看下代碼吧,自動切換,關鍵詞gevent.spawn():

from gevent import monkey;monkey.patch_all()
import gevent
import requests

def f(url):
    print('GET: %s' % url)
    resp = requests.get(url)
    data = resp.text
    print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://github.com/'),
])
相關文章
相關標籤/搜索