八.進程和線程

[TOC]python

一.進程

1.相關概念

  • 什麼是程序?

程序:例如XXXX.py這是程序,處於靜態的。數據庫

  • 什麼是進程

進程:一個程序運行起來後,代碼+用到的資源稱之爲進程,它是操做系統分配資源的基本單元。緩存

在早期面向進程設計的計算機結構中,進程是程序的基本執行實體;
在當代面向線程設計的計算機結構中,進程是線程的容器。bash

  • 同步/異步

所謂同步就是一個任務的完成須要依賴另一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成,這是一種可靠的任務序列。
所謂異步是不須要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工做,依賴的任務也當即執行,只要本身完成了整個任務就算完成了。至於被依賴的任務最終是否真正完成,依賴它的任務沒法肯定,因此它是不可靠的任務序列。多線程

  • 阻塞/非阻塞

阻塞和非阻塞跟同步和異步無關,主要與程序等待消息通知時的狀態有關。也就是說阻塞與非阻塞主要是從程序(線程)等待消息通知時的狀態角度來說的。併發

  • 併發/並行

1)並行,parallel 強調同一時刻同時執行
2)併發 concurrency 則指的一個時間段內去一塊兒執行app

2.進程的狀態

image
就緒態:運行的條件都已經慢去,正在等在cpu執行
執行態:cpu正在執行其功能
等待態:等待某些條件知足,例如一個程序sleep了,此時就處於等待態dom

3.Python中使用多進程

multiprocessing模塊就是跨平臺版本的多進程模塊,提供了一個Process類來表明一個進程對象,這個對象能夠理解爲是一個獨立的進程,能夠執行另外的事情異步

  • 示例1
from multiprocessing import Process
import time


def run_process():
    while True:
        print("子進程----2----")
        time.sleep(1)


if __name__=='__main__':
    p = Process(target=run_process) # target指定目標函數
    p.start()
    while True:
        print("主進程----1----")
        time.sleep(1)

Process語法:
Process([group [, target [, name [, args [, kwargs]]]]])async

參數--------------------------
target:若是傳遞了函數的引用,能夠任務這個子進程就執行這裏的代碼
args:給target指定的函數傳遞的參數,以元組的方式傳遞
kwargs:給target指定的函數傳遞命名參數
name:給進程設定一個名字,能夠不設定
group:指定進程組,大多數狀況下用不到
Process建立的實例對象的經常使用方法:

方法--------------------------
start():啓動子進程實例(建立子進程)
is_alive():判斷進程子進程是否還在活着
join([timeout]):是否等待子進程執行結束,或等待多少秒
terminate():無論任務是否完成,當即終止子進程
Process建立的實例對象的經常使用屬性:

屬性-------------------------
name:當前進程的別名,默認爲Process-N,N爲從1開始遞增的整數
pid:當前進程的pid(進程號)

  • 示例2 進程pid
from multiprocessing import Process
import time
import os


def run_process():
    while True:
        print("子進程----pid:{}----".format(os.getpid()))
        print()
        time.sleep(1)


if __name__=='__main__':
    p = Process(target=run_process)
    p.start()
    while True:
        print("主進程----pid:{}----".format(os.getpid()))
        time.sleep(1)
  • 示例3 子進程目標方法傳參
from multiprocessing import Process
import time
import os


def run_process(course, teacher, *args, **kwargs):
    while True:
        print("子進程----pid:{}----{}上{}課".format(os.getpid(), teacher, course))
        print()
        time.sleep(1)


if __name__=='__main__':
    p = Process(target=run_process, args=('語文',), kwargs={'teacher':'張三'})
    p.start()
    while True:
        print("主進程----pid:{}----{}上{}課".format(os.getpid(),'李四','數學'))
        time.sleep(1)
  • 示例4 進程間不會共享全局變量
from multiprocessing import Process
import time
import os

num_list = [0 , 1, 3, 4, 5, 6, 7, 8, 9, 10]
i = 3

def run_process1():
    global  i
    while i:
        print("子進程----pid:{}----".format(os.getpid()))
        num_list.pop()
        print(num_list)
        i = i - 1
        time.sleep(1)


def run_process2():
    global i
    while i:
        print("子進程----pid:{}----".format(os.getpid()))
        num_list.append(i+1)
        print(num_list)
        i = i - 1
        time.sleep(1)


if __name__=='__main__':
    p = Process(target=run_process1)
    p.start()

    p = Process(target=run_process2)
    p.start()

輸出

子進程----pid:10187----
[0, 1, 3, 4, 5, 6, 7, 8, 9]
子進程----pid:10188----
[0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 4]
子進程----pid:10187----
[0, 1, 3, 4, 5, 6, 7, 8]
子進程----pid:10188----
[0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 4, 3]
子進程----pid:10187----
[0, 1, 3, 4, 5, 6, 7]
子進程----pid:10188----
[0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 4, 3, 2]

4.進程間通訊

可使用multiprocessing模塊的Queue實現多進程之間的數據傳遞,Queue自己是一個消息列隊程序。

示例

from multiprocessing import Process, Queue
import time, random


def worker(q):
    """完成文件"""
    for i in range(10):
        file_num = random.randint(1, 100)
        print('已完成工做{}...'.format(file_num))
        q.put(file_num)
        time.sleep(1)


def boss(q):
    """查看文件"""
    while True:
        if not q.empty():
            file_num = q.get(True)
            print('已查看工做{}...'.format(file_num))
            time.sleep(1)
        else:
            break

if __name__=='__main__':
    # 建立Queue,並傳給各個子進程:
    q = Queue(5)
    pw = Process(target=worker, args=(q,))
    pb = Process(target=boss, args=(q,))
    pw.start()
    # pw.join()
    pb.start()
    # pb.join()

5.進程池

當須要建立的子進程數量很少時,能夠直接利用multiprocessing中的Process動態成生多個進程,但若是是上百甚至上千個目標,手動的去建立進程的工做量巨大,此時就能夠用multiprocessing模塊提供的Pool方法。

初始化Pool時,能夠指定一個最大進程數,當有新的請求提交到Pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到指定的最大值,那麼該請求就會等待,直到池中有進程結束,纔會用以前的進程來執行新的任務。

示例

from multiprocessing import Pool
import os, time, random

def worker(msg):
    t_start = time.time()
    print("start pro {},pid爲{}".format(msg,os.getpid()))
    # random.random()隨機生成0~1之間的浮點數
    time.sleep(random.random()*2)
    t_stop = time.time()
    print(msg,"執行完畢,耗時{:.2f}" .format(t_stop-t_start))

# 定義一個進程池,最大進程數3
po = Pool(5)
for i in range(0,10):
    # Pool().apply_async(要調用的目標,(傳遞給目標的參數元祖,))
    # 每次循環將會用空閒出來的子進程去調用目標
    po.apply_async(worker,(i,))

print("----start----")
time.sleep(10)
po.close()  # 關閉進程池,關閉後po再也不接收新的請求
po.join()  # 等待po中全部子進程執行完成,必須放在close語句以後
print("-----end-----")

6.進程間進程的通訊

使用Pool建立進程,就須要使用multiprocessing.Manager()中的Queue()

示例

from multiprocessing import Pool, Manager
import time, random


def worker(q):
    """完成文件"""
    for i in range(10):
        file_num = random.randint(1, 100)
        print('已完成工做{}...'.format(file_num))
        q.put(file_num)
        time.sleep(1)


def boss(q):
    """查看文件"""
    while True:
        if not q.empty():
            file_num = q.get(True)
            print('已查看工做{}...'.format(file_num))
            time.sleep(1)
        else:
            break

if __name__=='__main__':
    # 建立Queue,並傳給各個子進程:
    q = Manager().Queue()
    po = Pool()
    po.apply_async(worker, (q,))
    time.sleep(1)  # 先讓上面的任務向Queue存入數據,而後再讓下面的任務開始從中取數據
    po.apply_async(boss, (q,))

    po.close()
    po.join()

二.線程

如今操做系統提出進程的概念,每個進程都認爲本身獨佔全部的計算機硬件資源。
進程就是獨立的王國,進程間不能夠隨便的共享數據。
線程就是省份,同一個進程內的線程能夠共享進程的資源,每個線程都擁有本身獨立的堆棧。

線程一樣有着相似進程的狀態
1)運行態:該時刻,該線程正在佔用CPU
2)就緒態:可隨時轉換爲運行態,由於其餘線程正在運行而暫停,該進程不佔用CPU
3)阻塞態:除非某些外部事件發生,不然線程不能運行

Python線程的操做可使用threading模塊,threading模塊是對更底層thread作了一些包裝的,能夠更加方便的被使用。

1.threading.Thread

Thread類:
def __init__(self, group=None, target=Nonoe, name=None, args=(), kwargs=None, daemon=None)

target 線程調用的對象,就是目標函數
name 爲線程起個名字(能夠重名,由於線程區分靠ID,不靠名字)
args,爲目標函數傳遞實參,元組
kwargs, 爲目標函數關鍵字傳參,字典

threading的屬性和方法
current_thread() 返回當前線程對象
main_thread() 返回主線程對象
active_count() 當前處於alive狀態的線程個數
enumerate() 返回全部活着的線程的列表,不包括已經終止的和未開始的線程
get_ident() 返回當前線程的ID,非0整數

Thread實例的屬性和方法
name 只是一個名字
ident 線程ID
is_alive() 返回線程是否活着

start() 啓動線程,每個線程必須且只能執行該方法一次
run() 運行線程函數

  • 示例1 線程的啓動
import threading
import time


def worker():
    for _ in range(10):
        time.sleep(0.5)
        print('start')
        print(threading.get_ident()) # 返回當前線程對象線程id
        print('Thread over')


t = threading.Thread(target=worker)
t.start()
  • 示例2 多線程
import threading
import time

def finish_working():
    for i in range(5):
        print("線程:{} --完成工做加{}".format(threading.currentThread(), i))
        print(threading.current_thread())
        time.sleep(1)

if __name__ == "__main__":
    for i in range(5):
        t = threading.Thread(target=finish_working, name=str(i))
        t.start() #啓動線程,即讓線程開始執行
  • .線程執行代碼的封裝

經過使用threading模塊能完成多任務的程序開發,爲了讓每一個線程的封裝性更完美,因此使用threading模塊時,每每會定義一個新的子類class,只要繼承threading.Thread就能夠了,而後重寫run方法。

import threading
import time

class MyThread(threading.Thread):
    def run(self):
        print('run')
        super().run()

    def start(self):
        print('start')
        super().start()

def worker1():
    for _ in range(5):

        time.sleep(0.5)
        print('線程:{}-woring'.format(threading.currentThread()))
        print('Thread over')

t = MyThread(target=worker1,name='w')
t.start()
  • 線程之間共享全局變量
import time

count = 100

def work1():
    global count
    for i in range(3):
        count += 1

    print("----in work1, g_num is {}---".format(count))


def work2():
    global count
    print("----in work2, g_num is {}---".format(count))


print("---線程建立以前g_num is {}---".format(count))

t1 = Thread(target=work1)
t1.start()

#延時一會,保證t1線程中的事情作完
time.sleep(1)

t2 = Thread(target=work2)
t2.start()

輸出

---線程建立以前g_num is 100---
----in work1, g_num is 103---
----in work2, g_num is 103---

2.線程同步

線程同步,線程間協同,經過某種技術,讓一個線程訪問某些數據時,其餘線程不能訪問這些數據,直到該線程完成對數據的操做。

  • Event

Event事件,是線程間通訊機制中最簡單的實現,使用一個內部的標記flag,經過flag的True或False的變化來進行操做。

名稱 含義
set() 標記設置爲True
clear() 標記設置爲False
is_set() 標記是否設置爲True
wait(timeout=None) 設置等待標記爲True的時長,None爲無限等待。獲得返回True, 未等到超時了返回False。
from threading import Event, Thread
import time


def boss(event:Event):
    """
    等待員工全部任務完成,點評
    """
    print("I'm boss, waiting for u.")
    event.wait()
    print('good job')

def worker(event:Event, count=10):
    print("I am working for u")
    cups = []
    while True:
        print('make 1')
        time.sleep(0.5)
        cups.append(1)
        if len(cups) >= count:
            event.set()
            break
        print('I finished my job. cups={}'.format(cups))

event = Event()
w = Thread(target=worker, args=(event, ))
b = Thread(target=boss, args=(event, ))
w.start()
time.sleep(1)
b.start(

總結:
使用同一個event對象的標記flag
誰wait就是等到flag變爲True,或者等到超時返回False,不限制等待的個數。

  • Lock

鎖,凡是存在共享資源爭搶的地方均可以使用鎖,從而保證只有一個使用者均可以徹底使用這個資源。
示例 不加鎖:

import threading
cups = []
def worker(task=100):
    while True:
        count = len(cups)
        print(count)
        if count >= task:
            break
        cups.append(1)
        print('{}'.format(threading.current_thread()))
    print('I finished {} cups'.format(count))

for x in range(10):
    threading.Thread(target=worker, args=(100, )).start()

以上任務完成的數量會大於100,使用鎖能夠解決
示例

import logging
import threading
logging.basicConfig(level=logging.INFO)

cups = []
# 實例一把鎖
lock = threading.Lock()

def worker(lock:threading.Lock,task=100):
    while True:
        lock.acquire() # 加鎖
        count = len(cups)

        logging.info(count)
        if count >= task:
            lock.release() #記得退出循環時釋放鎖
            break

        cups.append(1)
        lock.release() # 釋放鎖
        logging.info('{}'.format(threading.current_thread()))
    logging.info('I finished {} cups'.format(count))

for x in range(10):
    threading.Thread(target=worker, args=(lock, 100)).start()

通常來講加鎖後還有一些代碼實現,在釋放鎖以前還有可能拋異常,一旦出現異常,鎖是沒法釋放,可是當前線程可能由於這個異常被終止了,這就產生了死鎖。

加鎖、解鎖經常使用語句:
1)使用try...finally語句保證鎖的釋放
2)with上下文管理,鎖對象支持上下文管理

示例:

from threading import Thread, Lock
import time, logging
logging.basicConfig(level=logging.INFO)

class Counter:
    def __init__(self):
        self.c = 0
        self.lock = Lock()

    def inc(self):
        try:
            self.lock.acquire()
            self.c += 1
            logging.info('add {}'.format(self.c))
        finally:
            self.lock.release()

    def dec(self):
        try:
            self.lock.acquire()
            self.c -= 1
            logging.info('sub {}'.format(self.c))
        finally:
            self.lock.release()

    @property
    def value(self):
        with self.lock:
            return self.c

def do(c:Counter, count=100):
    for  _ in range(count):
        for i in range(-50, 50):
            if i < 0:
                c.dec()
            else:
                c.inc()

c = Counter()
c1 = 10
c2 = 10
for i in range(c1):
    Thread(target=do, args=(c,c2)).start()

time.sleep(5)
logging.info(c.value)

3.Condition

Condition 用於生產者、消費模型,爲了解決生產者消費速度匹配問題。

構造方法Condition(lock=None), 能夠傳入一個lock或者RLock對象,默認是RLock。

名稱 含義
acquire(*args) 獲取鎖
wait(self, timoout=None) 等待或超時間
notify(n=1) 喚醒至多指定個數的等待的線程,沒有等待的線程沒有操做
notiy_all() 喚醒全部等待的線程

示例1 不使用Condition

import threading
import logging
import random
logging.basicConfig(level=logging.INFO)

class Dispatcher:

    def __init__(self, data=0):
        self.data = data
        self.event = threading.Event()

    def produce(self):
        for i in range(100):
            data =  random.randint(1,100)
            self.data = data
            logging.info("produce--{}".format(self.data))
            self.event.wait(1)  

    def custom(self):
        while True:
            logging.info("curstom---{}".format(self.data))
            self.event.wait(1)

d = Dispatcher()
p = threading.Thread(target=d.produce)
c = threading.Thread(target=d.custom)

c.start()
p.start()

示例2 使用Condition

import threading
import logging
import random
logging.basicConfig(level=logging.INFO)

class Dispatcher:
    def __init__(self, data=0):
        self.data = data
        self.event = threading.Event()
        self.cond = threading.Condition()

    def produce(self):
        for i in range(100):
            data =  random.randint(1,100)
            with self.cond:
                self.data = data
                self.cond.notify_all()
            logging.info('produce {}'.format(self.data))
            self.event.wait(1)

    def custom(self):
        while True:
            with self.cond:
                self.cond.wait()
                logging.info('custom {}'.format(self.data))
            self.event.wait(0.5)

d = Dispatcher()
p = threading.Thread(target=d.produce)
c = threading.Thread(target=d.custom)

c.start()
p.start()

示例3 多個消費者

import threading
import logging
import random
logging.basicConfig(level=logging.INFO, format='%(thread)d %(threadName)s %(message)s')

class Dispatcher:
    def __init__(self, data=0):
        self.data = data
        self.event = threading.Event()
        self.cond = threading.Condition()

    def produce(self):
        for i in range(100):
            data =  random.randint(1,100)
            with self.cond:
                self.data = data
                self.cond.notify(1)
            logging.info('pru {}'.format(self.data))
            self.event.wait(1)

    def custom(self):
        while True:
            with self.cond:
                self.cond.wait()
                logging.info("線程{}--消費{}".format(threading.get_ident(), self.data))
            self.event.wait(0.5)

d = Dispatcher()
p = threading.Thread(target=d.produce)
c = threading.Thread(target=d.custom)
c1 = threading.Thread(target=d.custom)
c.start()
c1.start()

p.start()

總結:
Condition是用於生產者消費者模型中,解決生產者消費者速度匹配的問題。
採用通知機制,很是有效率。

使用方式:
使用Condition必須先acquire,用玩release,由於內部使用鎖,默認使用RLock,最好的方式是使用with上下文。
消費者wait,等待通知。
生產者生產好消息,對消費者發通知,可使用notify或者notify_all方法。

4 .Barrier

名稱 含義
Barrier(parties, action=None, timeout=None) 構建Barrier對象,指定參與方數目。timeout是wait方法未指定超時的默認值。
n_waiting 當前在屏障中等待的線程數
parties 各方數,就是須要多少個等待
wait(timeout=None) 等待經過屏障,返回0到[線程數-1]的整數,每一個線程返回不一樣。若是wait方法設置了超時,並超時發送,屏障將處於broken狀態。

方法:

名稱 含義
broken 若是屏障處於打破狀態,返回True
abort() 將屏障至於broken狀態,等待中的線程或者調用等待方法的線程都會拋出BrokenBarrierError異常, 直到reset方法來恢復屏障
reset() 恢復屏障,從新開始攔截

示例

import threading
import logging
logging.basicConfig(level=logging.INFO, format='%(thread)d %(threadName)s %(message)s')

def worker(barrier:threading.Barrier):
    logging.info('n_waiting={}'.format(barrier.n_waiting))
    try:
        bid = barrier.wait()
        logging.info("after barrier {}".format(bid))
    except threading.BrokenBarrierError:
        logging.info('Broken Barrier in {}'.format(threading.current_thread().name))

barrier = threading.Barrier(3)

for i in range(5): #調整數字看結果
    threading.Thread(target=worker, args=(barrier, )).start()

全部線程衝到了Barrier前等待,直到到達parties的數目,屏障打開,全部線程中止等待,繼續執行。
再有線程wait,屏障就緒等到到達參數方數目。

Barrier應用場景:併發初始化全部線程都必須初始化完成後,才能繼續工做,例如運行前加載數據、檢查,若是這些工做沒完成,就開始運行,將不能正常工做。10個線程作10種準備工做,每一個線程負責一種工做,只有這10個線程都完成後,才能繼續工做,先完成的要等待後完成的線程。例如,啓動一個程序,須要先加載磁盤文件、緩存預熱、初始化鏈接池等工做。這些工做能夠齊頭並進,不過只有都知足了,程序才能繼續向後執行。假設數據庫鏈接失敗,則初始化工做失敗,就要about,屏障broken,全部線程收到異常退出。

相關文章
相關標籤/搜索