Python:線程、進程和協程

首先,推薦一篇講解進程與線程關係的漫畫:http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.htmlhtml

 

 

線程

  在平時,咱們若是要執行一個任務,須要排隊執行,可是咱們有了線程和進程就不同了。好比,公司只有我和老闆,有一天,老闆給我派任務了,只有我一我的來作任務,用了一天完成,這就叫單線程;又有一天,老闆又派了一樣的任務,而且招了好幾個技術人員,讓我和他們一塊兒完成任務,結果只用了一個小時,這就叫多線程python

  咱們寫一段代碼,編譯器從上到下讀的過程叫主線程,遇到threading.Thread就叫子線程。程序員

threading模塊

threading用於提供線程相關的操做,線程是應用程序中工做的最小單元算法

import threading
import time

def worker(num):
    time.sleep(1)
    print("Thread %d" % num)

for i in range(10):
    #args裏面的參數必須是元組
    t = threading.Thread(target=worker,args=(i,),name = "t.%d" % i)
    t.start()
    print(t.name)#線程名

上述代碼中建立了10個線程,而後控制器就交給了CPU,CPU根據指定算法進行調度,分片執行命令。多線程

  • start            線程準備就緒,等待CPU調度
  • setName      爲線程設置名稱
  • getName      獲取線程名稱
  • setDaemon   設置爲後臺線程或前臺線程(默認)

                   若是是後臺線程,主線程執行過程當中,後臺線程也在進行,主線程執行完畢後,後臺線程不論成功與否,均中止
                   若是是前臺線程,主線程執行過程當中,前臺線程也在進行,主線程執行完畢後,等待前臺線程也執行完成後,程序中止併發

  • join              逐個執行每一個線程,執行完畢後繼續往下執行,該方法使得多線程變得無心義
  • run              線程被cpu調度後自動執行線程對象的run方法

 

setdaemon:不等待子線程執行完就關閉app

import threading
import time
def f0():
    pass
def f1(a1,a2):
    time.sleep(10)
    f0()

t1 = threading.Thread(target=f1,args=(123,111))
t1.setDaemon(True)
t1.start()

t2 = threading.Thread(target=f1,args=(123,111))
t2.setDaemon(True)
t2.start()

t3 = threading.Thread(target=f1,args=(123,111))
t3.setDaemon(True)
t3.start()

等待子線程執行完畢才關閉async

import threading
import time
def f0():
    print("f0")
def f1(a1,a2):
    time.sleep(10)
    f0()

t1 = threading.Thread(target=f1,args=(123,111))
t1.start()
t2 = threading.Thread(target=f1,args=(123,111))
t2.start()
t3 = threading.Thread(target=f1,args=(123,111))
t3.start()

 

join:等當前線程執行結束後再執行下一個線程,可傳參數,參數表示最多等的秒數ide

import threading
import time
def f0():
    print("f0")
def f1(a1,a2):
    print("f1")
    f0()

t1 = threading.Thread(target=f1,args=(123,111))
t1.start()
t1.join()
t2 = threading.Thread(target=f1,args=(123,111))
t2.start()
t2.join()

 

 

線程鎖

咱們使用線程對數據進行操做的時候,若是多個線程同時修改某個數據,可能會出現髒數據(不許確的數據),爲了保證數據的準確性,就須要加把鎖。函數

  鎖有兩種:RLock和Lock,這裏咱們使用RLock。

import threading
import time

globals_num = 0
lock = threading.RLock()

def func():
    lock.acquire() # 得到鎖
    global globals_num
    globals_num +=1
    time.sleep(1)
    print(globals_num)
    lock.release() #釋放鎖

for i in range(10):
    r = threading.Thread(target=func)#建立線程鎖
    r.start()

 

RLock和Lock的區別:

Lock:得到鎖以後必須釋放鎖才能再次得到鎖,否則會產生死鎖,就會一直等待釋放鎖。

import threading
lock = threading.Lock()
lock.acquire()
lock.acquire() #產生死鎖
lock.release()
lock.release()

import threading
lock = threading.Lock()
lock.acquire()
print("1")
lock.release()
print("ok")
lock.acquire()
print("2")
lock.release()

RLock:得到幾把鎖就要釋放幾把鎖。

import threading
rlock = threading.RLock()
rlock.acquire()
rlock.acquire()
rlock.release()
rlock.release()
print("end")

import threading
rlock = threading.RLock()
rlock.acquire()
print("1")
rlock.acquire()
print("2")
rlock.release()
print("3")
rlock.release()
print("4")

 

event

Python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法:set、wait、clear。

事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。

  • clear:將「Flag」設置爲False
  • set:將「Flag」設置爲True
  • wait:阻塞線程,直到event對象內部標識位被設爲True時。
import threading

def do(event):
    print('start')
    event.wait()
    print('execute')

event_obj = threading.Event()
for i in range(10):
    t = threading.Thread(target=do, args=(event_obj,))
    t.start()

event_obj.clear()
inp = input('input:')
if inp == 'true':
    event_obj.set()

 

queue模塊

Queue就是隊列,規則是先進先出。這個模型也叫生產者-消費者模型。

  生產者-消費者:建立一個爲10的隊列,生產12個,只消費10個。

import queue
import threading

message = queue.Queue(10) # 建立隊列

def producer(i):
    print("put:",i)
    message.put(i)

def consumer(i):
    print("get:",i)
    msg = message.get()


for i in range(12):
    t = threading.Thread(target=producer, args=(i,))
    t.start()

for i in range(10):
    t = threading.Thread(target=consumer, args=(i,))
    t.start()

join:等到隊列爲空時,再繼續往下執行

put(item, block=True, timeout=None):放入隊列尾部

get(block=True, timeout=None):獲取隊列第一個值

put_nowait:等效於 put(item,block=False)

get_nowait:等效於 get(item,block=False)

 

 

 

線程池

  先說一下線程池是什麼鬼!之因此這麼說,是由於我本覺得很簡單的東東,結果好複雜...可是!懂了以後,又以爲還好還好,沒有特別難。其實就是用最少的勞動力,創造出最多的利益!線程池裏面有不少線程,同時還有一個任務隊列。執行任務過程就是,也就是說重複利用線程來執行任務,減小系統資源的開銷。

  簡單版線程池是線程執行完任務以後銷燬,若是還有任務,就再建立線程去執行。

  高級版線程池就是線程執行完任務以後回來告訴回調函數執行完畢,回調函數不銷燬它,而是把它做爲空閒線程,不用建立新線程,若是任務隊列中還有任務,就讓它再去執行任務。

  so easy!媽媽不再用擔憂我會掉進各類池啦......

簡易版線程池(武大神說這是low版本):

import queue
import threading
import time

class ThreadPool(object):
    def __init__(self,max_num=20):
        self.queue = queue.Queue(max_num) # 長度爲20的隊列
        for i in range(max_num):
            self.queue.put(threading.Thread) # threading.Thread是類對象,放入隊列中

    def get_thread(self):
        return self.queue.get() #獲取隊列中的類對象

    def add_thread(self):
        self.queue.put(threading.Thread)

def func(pool,a1):
    time.sleep(1)
    print(a1)
    pool.add_thread()

p = ThreadPool(10)

for i in range(50):
    thread = p.get_thread() #threading.Thread #獲得一個線程
    t = thread(target=func,args=(p,i)) # 執行func函數
    t.start()

 

高級版線程池:

import queue
import threading
import contextlib
import time

StopEvent = object()

class ThreadPool(object):

    def __init__(self, max_num):
        self.q = queue.Queue() # 建立隊列
        self.max_num = max_num # 最多建立線程數量

        self.terminal = False # 默認爲False
        self.generate_list = []  # 實際建立的線程
        self.free_list = [] # 空閒的線程

    def run(self, func, args, callback=None):
        """
        線程池執行一個任務
        :param func: 任務函數
        :param args: 任務函數所需參數
        :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值(默認爲None,即:不執行回調函數)
        :return: 若是線程池已經終止,則返回True不然None
        """

        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: # 若是空閒列表爲0而且實際建立的線程列表長度小於最多建立的線程數
            self.generate_thread() # 建立線程
        w = (func, args, callback,) # 將任務元組賦值給w變量
        self.q.put(w) # 將任務放入隊列中

    def generate_thread(self):
        """
        建立一個線程
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循環去獲取任務函數並執行任務函數
        """
        current_thread = threading.currentThread # 建立當前線程
        self.generate_list.append(current_thread) # 將線程添加到實際建立線程列表中

        event = self.q.get() # 等待着去隊列中獲取線程
        while event != StopEvent: # 隊列中沒有中止符

            func, arguments, callback = event # 得到任務
            try:
                result = func(*arguments) #函數
                status = True #賦值True
            except Exception as e:
                status = False # 發生錯誤賦值False
                result = e # 錯誤類型

            if callback is not None: # 回調函數不爲空
                try:
                    callback(status, result) # 執行回調函數
                except Exception as e:
                    pass

            if self.terminal: # False
                event = StopEvent
            else:
                with self.worker_state(self.free_list,current_thread): #with:上下文管理
                    event = self.q.get() #before:append;after:remove

        else:
            self.generate_list.remove(current_thread) #沒有任務時,就把實際建立線程列表裏的線程刪掉

    @contextlib.contextmanager # 能夠實現上下文管理的裝飾器
    def worker_state(self, lis, val):
        lis.append(val)
        try:
            yield
        finally:
            lis.remove(val)


    def close(self):
        num = len(self.generate_list) #時間建立線程列表長度
        while num:
            self.q.put(StopEvent) #中止符放入已建立的線程中
            num -= 1

    # 終止線程(清空隊列)
    def terminate(self):
        self.terminal = True
        while self.generate_list:
            self.q.put(StopEvent)
        self.q.empty()

def work(i):
    print(i)

pool = ThreadPool(10)
for item in range(50):
    pool.run(func=work, args=(item,))

pool.terminate() # 執行一下就關閉

 

ps:這裏要補充一個知識點。

實現上下文管理(高級版本線程池中有出現)。須要導入模塊,而後加上裝飾器就ok。

示例:

import contextlib

@contextlib.contextmanager
def myopen(file_path,mode):
    f = open(file_path,mode,encoding="utf-8")
    try:
        yield f
    finally:
        f.close()
with myopen(
"index.html","r") as file_obj: print(file_obj.readline())

 

 

 

進程

注:因爲進程之間的數據須要各自持有一份,因此建立進程須要很是大的開銷,也就是佔有很大的內存。同一個進程中,線程跟線程之間的內存是共享的。

建立進程:multiprocessing模塊

from  multiprocessing import Process

def f(name):
    print("hello",name)

if __name__ == "__main__":
    p = Process(target=f,args=("bob",))
    p.start()
    print("start")
    p.join() # 等待
    print("end")

 

實現進程數據共享

#默認數據沒有共享
from multiprocessing import Process,Manager

def Foo(i,dic):
    dic[i] = 100 + i
    print(dic)
    for k,v in dic.items():
        print(k,v)
if __name__ == "__main__":
    manage = Manager()
    dic = {}
    for i in range(2):
        p = Process(target=Foo,args=(i,dic,))
        p.start()
        p.join()
#打印結果:
{0: 100}
0 100
{1: 101}
1 101

 

#實現數據共享
from multiprocessing import Process,Manager
def foo(i,dic):
    dic[i] = 100 + i
    print(dic)
    print(len(dic))

if __name__ == "__main__":
    manage = Manager()
    dic = manage.dict()
    for i in range(2):
        p = Process(target=foo,args=(i,dic,))
        p.start()
        p.join()
#打印結果:
{0: 100}
1
{0: 100, 1: 101}
2

 

 'c': ctypes.c_char,  'u': ctypes.c_wchar,
    'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
    'h': ctypes.c_short, 'H': ctypes.c_ushort,
    'i': ctypes.c_int,   'I': ctypes.c_uint,
    'l': ctypes.c_long,  'L': ctypes.c_ulong,
    'f': ctypes.c_float, 'd': ctypes.c_double
類型對應表

 

進程池

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池程序中沒有可供使用的進程,那麼進程就會等待,直到進程池中有可用進程爲止。

 進程池中有兩個方法:apply_async、apply

apply_async:

一次建立多個進程,併發執行,執行完一個將返回值給回調函數而後執行回調函數
from multiprocessing import Pool
import time

def f1(a):
    time.sleep(1)
    print(a)
    return 1000

def f2(arg):
    print(arg)


if __name__ == "__main__":
    pool = Pool(5)
    for i in range(10):
        pool.apply_async(func=f1,args=(i,),callback=f2)
        print("111111")
    pool.close()
    pool.join()
join:進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。
apply
:
#排隊執行,apply裏面有join
from multiprocessing import Pool
import time
def f1(a):
    time.sleep(1)
    print(a)

if __name__ == "__main__":
    pool = Pool(5)
    for i in range(10):
        pool.apply(func=f1,args=(i,))
        print("111111")

 

 

 

進程與線程之間的關係

  線程是屬於進程的,線程運行在進程空間內,而且同一進程所產生的線程共享同一內存空間,當進程退出時該進程所產生的線程都會被強制退出並清除。不論是進程仍是線程,都是爲了實現一個併發操做。

  io密集型:使用線程;計算密集型:使用進程。根本的緣由是Python的線程裏面有個鎖(只有Python裏有這個鎖:GIL):全局解釋器鎖,規定進程裏面只有一個線程能出來被CPU調用。

 

 

 

協程

線程在執行任務時,發現要等一段時間才能獲得結果,就不等結果,再去執行其餘任務。

線程和進程的操做是由程序觸發系統接口,最後的執行者是系統;協程的操做則是程序員。

協程存在的意義:對於多線程應用,CPU經過切片的方式來切換線程間的執行,線程切換時須要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。

協程的適用場景:當程序中存在大量不須要CPU的操做時(IO),適用於協程;

 

gevent

手動版協程:爲了看出執行任務時須要等待,因此咱們就睡一會咯(gevent.sleep)

import gevent
def foo():
    print("1")
    gevent.sleep() #表示暫停,去執行下一個
    print("3")

def bar():
    print("2")
    gevent.sleep()
    print("4")

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])
#打印結果:
1
2
3
4

 

終極版協程:終極版就是不用手動讓它睡一會就能看出效果的版本!

from gevent import monkey; monkey.patch_all()
import gevent
import requests

def f(url):
    print('GET: %s' % url)
    resp = requests.get(url)
    data = resp.text
    print(url,len(data))

gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://baidu.com/'),
])
#執行結果:須要你們本身去執行才能感同身受呢!!!
相關文章
相關標籤/搜索