線程、進程與協程

衆所周知, 計算機是由軟件和硬件組成. 硬件中的CPU主要用於解釋指令和處理數據, 軟件中的操做系統負責資源的管理和分配以及任務的調度. 而程序則是運行在操做系統上具備特定功能的軟件. 每當程序執行完成特定功能的時候, 爲了保證程序的獨立運行不受影響每每須要進程控制塊(專門管理和控制程序執行的數據結構)的做用.
說了以上這麼多基本理論知識, 接下來咱們談談進程. 進程本質上就是一個程序在一個數據集上的動態執行過程. 進程一般由程序, 數據集和進程控制塊三部分組成.python

  • 程序: 描述進程須要完成的功能以及如何去完成
  • 數據集: 程序執行過程當中須要使用的資源(包括IO資源和基本數據)
  • 進程控制塊: 記錄進程的外部特徵以及描述其執行過程. 操做系統正是經過它來控制和管理進程

而線程在如今的多處理器電子設備中是最小的處理單元. 一個進程能夠有多個線程, 這些線程之間彼此共享該進程的資源. 可是進程之間默認是相互獨立的, 若數據共享則須要另外特定的操做. 這裏作一個比喻. 如今有一個大型工廠, 該工廠負責生產汽車. 同時這個工廠又有多個車間, 每一個車間負責不一樣的功能, 有的生產輪胎, 有的生產方向盤等等. 每一個車間又有多個車間工人, 這些工人相互合做, 彼此共享資源來共同生產輪胎方向盤等等. 這裏的工廠就至關於一個應用程序, 而每一個車間至關於一個進程, 每一個車間工人就至關於線程.數據結構

普通多線程建立使用多線程

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time

def showThreading(arg):
    time.sleep(1)
    print("current thread is : ",arg)

if __name__ == '__main__':
    for tmp in range(10):
        t=threading.Thread(target=showThreading,args=(tmp,))
        t.start()
    print('main thread has been stopped')複製代碼

執行結果以下:併發

簡單多線程運行結果

  • 由輸出結果可知, 子線程之間是併發執行的, 並且在阻塞1秒的時間內主線程也執行完畢
  • 當主線程執行完畢, 子線程還能繼續執行是由於當前的t.setDaemon(False)默認爲false. 爲false代表當前線程爲前臺線程, 主線程執行完畢後仍需等待前臺線程執行完畢以後方能結束當前進程; 爲true代表當前線程爲後臺線程, 主線程執行完畢後則當前進程結束, 不關注後臺線程是否執行完畢

Daemon爲True時的執行結果

  • t=threading.Thread(target=showThreading,args=(tmp,)) 這一句建立一個線程, target=代表線程所執行的函數, args= 代表函數的參數
  • t.start() 線程準備完畢等待cpu調度處理, 當線程被cpu調度後會自動執行線程對象的run方法(自定義線程類時候可用)
  • t.setName(string) 爲當前線程設置名字
  • t.getName() 獲取當前線程的名字
  • t.join() 該方法表示主線程必須在此位置等待子線程執行完畢後才能繼續執行主線程後面的代碼, 當且僅當setDaemon爲false時有效

自定義線程類app

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading,time

class MyThread(threading.Thread):
    def __init__(self,target,arg=()):
        super(MyThread, self).__init__()
        self.target=target
        self.arg=arg

    def run(self):
        self.target(self.arg)

def test(arg):
    time.sleep(1)
    print("current thread is : ",arg)

if __name__ == '__main__':
    for tmp in range(10):
        mt=MyThread(target=test,arg=(tmp,))
        mt.start()
    print("main thread has been stopped")複製代碼
  • class MyThread(threading.Thread): 自定義線程類須要繼承threading.Thread
  • super(MyThread, self).__init__() 自定義線程類初始化時候需將當前對象傳遞給父類並執行父類的初始化方法
  • run(self) 線程啓動以後會執行該方法

因爲CPU對線程是隨機調度執行, 而且每每會在當前線程執行一小段代碼以後便直接換爲其餘線程執行, 如此往復循環直到全部的線程執行結束. 所以在一個共享資源和數據的進程中, 多個線程對同一資源操或者同一數據操做容易形成資源搶奪和產生髒數據. 此時咱們引入鎖的概念, 對這種資源和數據進行加鎖, 直到當前線程操做完畢再釋放鎖讓其餘線程操做.async

咱們先看看不加鎖時候對數據的操做狀況:函數

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading, time

NUM = 0


def add():
    global NUM
    NUM += 1
    name=t.getName()
    time.sleep(1)
    print('current thread is: ',name ,' current NUM is: ',NUM )


if __name__ == '__main__':
    for tmp in range(10):
        t=threading.Thread(target=add)
        t.start()
    print("main thread has been stopped !")複製代碼

不加鎖執行結果

  • 從圖中可知數據已經不是咱們指望的結果, 此時輸出的是10個線程對該數據操做完的結果, 咱們指望的是輸出每一個線程對該數據操做後的結果. 顯然代碼的執行順序並非一個線程一個線程依次執行, 而是彼此穿插交錯執行
  • 注意time.sleep(1) 這一句讓線程阻塞的位置會影響線程的執行順序

咱們再來看看加鎖的狀況:fetch

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading, time

NUM = 0


def add():
    global NUM
    lock.acquire()
    NUM += 1
    name=t.getName()
    lock.release()
    time.sleep(1)
    print('current thread is: ',name ,' current NUM is: ',NUM )

if __name__ == '__main__':
    lock=threading.Lock()
    for tmp in range(10):
        t=threading.Thread(target=add)
        t.start()
    print("main thread has been stopped !")複製代碼

加鎖後的執行結果

  • lock=threading.Lock() 實例化鎖對象
  • lock.acquire() 從該句開始加鎖
  • lock.release() 釋放鎖

python中在threading模塊中定義了一下幾種鎖:ui

  • Lock(不可嵌套), RLock(可嵌套), 兩個都是普通鎖, 同一時刻只容許一個線程被執行, 是互斥鎖
  • Semaphore 信號量鎖, 該鎖容許必定數量的線程同時操做數據
  • event 事件機制鎖, 根據Flag的真假來控制線程
  • condition 條件鎖, 只有知足某條件時候才能釋放線程

Semaphore 信號量鎖使用:this

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading, time


def test():
    semaphore.acquire()
    print("current thread is: ", t.getName())
    time.sleep(1)
    semaphore.release()

if __name__ == '__main__':
    semaphore = threading.BoundedSemaphore(5)
    for tmp in range(20):
        t = threading.Thread(target=test)
        t.start()複製代碼
  • semaphore = threading.BoundedSemaphore(5) 得到信號量鎖對象
  • semaphore.acquire() 加鎖
  • semaphore.release() 釋放鎖

event 事件機制鎖使用

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading,time

def test():
    print(t.getName())
    event.wait()


if __name__ == '__main__':
    event=threading.Event()
    for tmp in range(10):
        t=threading.Thread(target=test)
        t.start()
    print("zhe middle of main thread")
    if input("input your flag: ")=='1':
        event.set()
    print("main thread has been stopped")複製代碼
  • event=threading.Event() 獲取事件鎖對象
  • event.wait() 檢測標誌位flag, 爲true則放行該線程, 爲false則阻塞該線程
  • event.set() 將標誌位flag設置爲true
  • event.clear() 將標誌位flag設置爲false

condition 條件鎖使用

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading


def condition():
    inp = input("input your condition: ")
    print(inp)
    if inp == "yes":
        return True
    return False


def test():
    cd.acquire()
    # cd.wait(1)
    cd.wait_for(condition)
    # cd.notify(2)
    print(t.getName())
    cd.release()


if __name__ == '__main__':
    cd = threading.Condition()
    for tmp in range(10):
        t = threading.Thread(target=test)
        t.start()
        t.join()
    print("\nmain thread has been stopped")複製代碼

運行結果

  • 由圖可得每次輸入yes 則放行一個線程
  • cd = threading.Condition() 獲取條件鎖對象
  • cd.wait(1) 設置線程最多等待時間
  • cd.wait_for(condition) 設置放行的條件, 該方法接受condition函數的返回值

在python的queue模塊中內置了一種特殊的數據結構, 即隊列. 這裏咱們能夠把隊列簡單的看做是規定順序執行的一組線程.

Queue 先進先出隊列的使用:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue

q=queue.Queue(10)

for tmp in range(10):
    q.put(tmp)

for tmp in range(10):
    print(q.get(),q.qsize())複製代碼
  • q=queue.Queue(10) 生成隊列對象, 設置隊列最多存放的數據爲10個
  • q.put(tmp) 往隊列中存入數據
  • q.get() 獲取隊列數據
  • q.qsize() 獲取當前隊列的大小

利用Queue實現生產者消費者模型

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time, threading, queue


def productor(i):
    while True:
        q.put(i)
        time.sleep(1)


def consumer(i):
    while True:
        print("consumer-%s ate %s" % (i, q.get()))


if __name__ == '__main__':
    q = queue.Queue(10)
    for tmp in range(8):
        t = threading.Thread(target=productor, args=(tmp,))
        t.start()

    for tmp in range(5):
        t = threading.Thread(target=consumer, args=(tmp,))
        t.start()

    print("main has been stopped")複製代碼

運行結果

不斷的建立和銷燬線程是很是消耗CPU的, 所以咱們會採起維護一個線程池來實現多線程. 可是python中並未提供線程池的模塊, 這裏就須要咱們本身來寫.

簡單版本的線程池實現:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue, threading


class ThreadPool(object):
    def __init__(self, max_num=5):
        self.queue = queue.Queue(max_num)
        for i in range(max_num):
            self.queue.put(threading.Thread)

    def get_thread(self):
        return self.queue.get()

    def add_thread(self):
        self.queue.put(threading.Thread)


def test(pool, i):
    tm = __import__("time")
    tm.sleep(1)
    print("current thread is: ", i)
    pool.add_thread()


if __name__ == '__main__':
    p = ThreadPool()
    for tmp in range(20):
        thread = p.get_thread()
        t = thread(target=test, args=(p, tmp))
        t.start()
    print("main thread has been stopped")複製代碼

運行結果

  • 這裏實現線程池的主要思想是維護一個指定大小的隊列, 隊列中的每個元素就是threading.Thread類. 每當須要線程時候, 直接獲取該類並建立線程, 使用完畢則返回線程池中
  • 缺點就是沒有回調函數, 不能重複使用線程, 每當本身使用完線程須要本身將線程放回線程池, 且須要手動啓動線程

健壯版本的線程池:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue, threading, contextlib

stopFlag = object()


class ThreadPool(object):
    def __init__(self, max_num):
        self.queue = queue.Queue()
        self.max_num = max_num

        self.terminal = False
        self.queue_real_list_list = []
        self.queue_free_list = []

    def run(self, target, args, callback):
        task_tuple = (target, args, callback)
        self.queue.put(task_tuple)
        if len(self.queue_free_list) == 0 and len(self.queue_real_list_list) < self.max_num:
            self.add_thread()

    def add_thread(self):
        t = threading.Thread(target=self.fetch)
        t.start()

    def fetch(self):
        current_thread = threading.currentThread
        self.queue_real_list_list.append(current_thread)
        task_tuple = self.queue.get()
        while task_tuple != stopFlag:
            func, args, callback = task_tuple
            result_status = True
            try:
                result = func(*args)
            except Exception as e:
                result_status = False
                result = e
            if callback is not None:
                try:
                    callback(result_status, result)
                except Exception as e:
                    pass
            if not self.terminal:
                # self.queue_free_list.append(current_thread)
                # task_tuple = self.queue.get()
                # self.queue_free_list.remove(current_thread)
                with ThreadPool.queue_operate(self.queue_free_list,current_thread):
                    task_tuple = self.queue.get()
            else:
                task_tuple = stopFlag

        else:
            self.queue_real_list_list.remove(current_thread)

    def close(self):
        num = len(self.queue_real_list_list)
        while num:
            self.queue.put(stopFlag)
            num -= 1

    def terminate(self):
        self.terminal = True
        max_num = len(self.queue_real_list_list)
        while max_num:
            self.queue.put(stopFlag)
            max_num -= 1

    def terminate_clean_queue(self):
        self.terminal = True
        while self.queue_real_list_list:
            self.queue.put(stopFlag)
        self.queue.empty()

 @staticmethod
 @contextlib.contextmanager
    def queue_operate(ls, ct):
        ls.append(ct)
        try:
            yield
        finally:
            ls.remove(ct)


def callback_func(result_status, result):
    print(result_status, result)


def test(i):
    tm = __import__("time")
    tm.sleep(1)
    return "current thread is: {}".format(i)


if __name__ == '__main__':
    pool = ThreadPool(5)
    for tmp in range(20):
        pool.run(target=test, args=(tmp,), callback=callback_func)
    # pool.close()
    pool.terminate()複製代碼
  • pool = ThreadPool(5) 生成線程池對象, 指定線程池最多線程數爲5
    • __init__(self, max_num)被執行
    • self.queue = queue.Queue() 任務隊列
    • self.max_num = max_num 最多線程數
    • self.terminal = False 是否當即終止標誌
    • self.queue_real_list_list = [] 當前已經建立的線程對象列表
    • self.queue_free_list = [] 空閒的線程對象列表
  • pool.run(target=test, args=(tmp,), callback=callback_func) 運行線程池對象, target=test 線程運行的功能函數, args=(tmp,) 功能函數的參數, callback=callback_func 功能函數執行完畢以後調用的函數(即 回調函數)
    • task_tuple = (target, args, callback) 將線程要執行的功能函數和回調函數打包成任務元組
    • self.queue.put(task_tuple) 將任務元組加入到隊列中
    • if len(self.queue_free_list) == 0 and len(self.queue_real_list_list) < self.max_num:
              self.add_thread()複製代碼
      判斷空閒列表是否爲空且真實的線程列表數目是否小於最大線程數目, 如果則執行add_thread()函數添加線程
    • add_thread(self) 添加並啓動線程, 並將線程要執行的功能交給fetch(self)函數
    • current_thread = threading.currentThread 獲取當前線程, self.queue_real_list_list.append(current_thread) 將當前線程加入到真實線程列表中
    • task_tuple = self.queue.get() 從任務隊列中獲取任務元組
    • while task_tuple != stopFlag 該循環語句內容表示任務元組對象不是stopFlag結束標誌的時候執行其具體的功能和回調函數
    • if not self.terminal 判斷是否當即終止當前線程(等待當前線程執行完任何當即結束)
  • pool.close() 根據當前真實線程列表添加對應的stopFlag終止符
  • pool.terminate() 此爲不清空任務隊列的當即終止線程方法
  • terminate_clean_queue(self) 清空任務隊列的當即終止線程方法

在python中由multiprocess模塊提供的Process類來實現進程相關功能(process與Process是不一樣的)

Process的使用:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process


def test(pro):
    print("current process is: ",pro)


if __name__ == '__main__':
    for tmp in range(10):
        p = Process(target=test,args=(tmp,))
        p.start()複製代碼

運行結果

  • args=(tmp,) 這裏傳入的是元組, 不加逗號則表示整型數據
  • p = Process(target=test,args=(tmp,)) 建立進程對象

普通的數據共享在進程中的實現:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process

ls = []


def test(i):
    ls.append(i)
    print("current process is: ", i, " and list is: ", ls)


if __name__ == '__main__':
    for tmp in range(10):
        p = Process(target=test, args=(tmp,))
        p.start()
        p.join()
    print("The final list is: ", ls)複製代碼

運行結果

  • 由圖可知, 進程之間默認是不能共享數據. 咱們須要藉助python的multiprocess模塊提供的類來實現數據共享

用Array共享數據

# -*- coding:utf-8 -*-
from multiprocessing import Process, Array


def test(i, ay):
    ay[i] += 10
    print('current process is: ', i)
    for tmp in ay:
        print(tmp)


if __name__ == '__main__':
    ay = Array('i', [1, 2, 3, 4, 5, 6])
    for tmp in range(5):
        p = Process(target=test, args=(tmp, ay))
        p.start()複製代碼

運行結果

  • ay = Array('i', [1, 2, 3, 4, 5, 6]) 建立整型的Array共享數據對象
  • p = Process(target=test, args=(tmp, ay)) 進程直接不能像線程之間共享數據, 故須要傳入ay對象

使用Manager共享數據:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Manager, Process


def test(i, dic):
    dic[i] = i + 10
    print('current process is: ', i)
    for k, v in dic.items():
        print(k, v)


if __name__ == '__main__':
    mg = Manager()
    dic = mg.dict()
    for tmp in range(10):
        p = Process(target=test, args=(tmp, dic))
        p.start()
        p.join()複製代碼

運行結果

  • mg = Manager() 初始化Manager對象
  • dic = mg.dict() 生成共享字典數據類型
  • p.join() 這裏須要保證每一個進程執行完畢以後才能進行接下來的操做, 不然會報錯

使用queue共享數據:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process,queues

import multiprocessing


def test(i,qu):
    qu.put(i+10)
    print("current process is: ",i," and zhe size of zhe queue is: ",qu.qsize())

if __name__ == '__main__':
    qu=queues.Queue(10,ctx=multiprocessing)
    for tmp in range(10):
        p=Process(target=test,args=(tmp,qu))
        p.start()複製代碼

運行結果

在進程中共享數據也會出現髒數據的問題, 好比用multiprocessing模塊中的queue或者Queue共享數據時候就會出現髒數據. 此時咱們每每須要設置進程鎖. 進程鎖的使用和線程鎖使用徹底相同(Rlock, Lock, Semaphore, Event, Condition, 這些鎖均在multiprocess中)

在實際開發中咱們並不會採起直接建立多進程來實現某些功能, 而是主動維護一個指定進程數的進程池來實現多進程. 由於不斷的建立進程和銷燬進程對CPU的開銷太大. python中內置了了進程池Pool 模塊

進程池Pool的使用:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Pool
import time


def test(arg):
    time.sleep(1)
    return arg + 10


def call_end(arg):
    print(arg)


if __name__ == '__main__':
    p = Pool(5)
    for tmp in range(10):
        p.apply_async(func=test, args=(tmp,), callback=call_end)
    p.close()
    # p.terminate()
    p.join()複製代碼

運行結果

  • p.apply() 從進程池中取出一個進程執行其對應的功能
  • p.apply_async(func=test, args=(tmp,), callback=call_end)p.apply() 做用相同, p.apply_async() 能夠調用回調函數. callback=call_end 代表call_end是回調函數, 當test執行完畢以後會將其返回值做爲參數傳遞給該回調函數
  • p.close() 等到全部進程結束後關閉進程池
  • p.join() 代表主進程必須等待全部子進程執行結束後方能結束(須要放在p.close()或者p.terminate()後面)

協成是python中特有的一個概念, 它是人爲的利用單線程在操做某任務等待空閒的時間內, 經過yield來保存當時狀態, 進而用該線程作其餘的操做. 由此實現的併發操做, 本質上跟IO多路複用相似.

基礎版本協成的使用:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import greenlet


def f1():
    print('1111')
    gr2.switch()
    print('2222')
    gr2.switch()


def f2():
    print('3333')
    gr1.switch()
    print('4444')


if __name__ == '__main__':
    gr1 = greenlet.greenlet(f1)
    gr2 = greenlet.greenlet(f2)
    gr1.switch()複製代碼
  • gr1 = greenlet.greenlet(f1) 建立f1函數的協成對象
  • gr1.switch() 由當前線程轉到到執行f1函數

封裝後的協成模塊使用:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import gevent


def f1():
    print('this is f1 !!!')
    gevent.sleep(0)
    print('f1 after sleep')


def f2():
    print("this is f2 !!!")
    gevent.sleep(0)
    print('f2 after sleep')


if __name__ == '__main__':
    gevent.joinall([
        gevent.spawn(f1),
        gevent.spawn(f2),
    ])複製代碼
  • gevent.joinall([
          gevent.spawn(f1),
          gevent.spawn(f2),
      ])複製代碼
  • 等待f1f2執行完成再結束當前線程, 相似線程中的join()方法
  • gevent.sleep(0) 設置等待時間
  • 每每實際開發中並不須要設置從哪裏須要切換代碼執行或者等待的

用協成訪問網頁簡單例子:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from gevent import monkey

monkey.patch_all()
import gevent, requests


def fetch(url):
    print('current url %s' % url)
    rp = requests.get(url)
    data = rp.text
    print(url, len(data))


if __name__ == '__main__':
    gevent.joinall([
        gevent.spawn(fetch, 'https://www.baidu.com'),
        gevent.spawn(fetch, 'https://www.sogou.com/'),
        gevent.spawn(fetch, 'http://www.jianshu.com'),
    ])複製代碼

運行結果

  • 由圖中可見, 執行第一個print('current url %s' % url)以後, 當前線程會處於等待請求狀態, 此時該線程會發送第二個url, 依次類推. 直到最後請求數據獲取後, 才返回到第一次執行的函數中執行後續操做
相關文章
相關標籤/搜索