python隊列、線程、進程、協程

目錄:

1、queue
2、線程
        基本使用
        線程鎖
        自定義線程池
        
        生產者消費者模型(隊列)
3、進程
        基本使用
        進程鎖
        進程數據共享
            默認數據不共享
            queues
            array
            Manager.dict
        進程池
    
    PS:
        IO密集型-多線程
        計算密集型 - 多進程
    
4、協程
        原理:利用一個線程,分解一個線程成爲多個「微線程」==》程序級別
        greenlet
        gevent
        pip3 install gevent

 

1、queue

1.1 queue用法

# 先進先出隊列python

# put放數據,是否阻塞,阻塞時的超時事件linux

# get取數據(默認阻塞),是否阻塞,阻塞時的超時事件git

# 隊列的最大長度:queue.Queue(2) 裏面的數字程序員

# qsize()真實個數github

# maxsize 最大支持的個數web

# join,task_done,阻塞進程,當隊列中任務執行完畢以後,再也不阻塞數據庫

 

import queue
q = queue.Queue(2)  # q = queue.Queue()若是沒有參數的話,就是能夠放無限多的數據。
print(q.empty())    # 返回隊列是否爲空,空則爲True,此處爲True
q.put(11)
q.put(22)
print(q.empty())    # 此處爲False
print(q.qsize())    # 返回隊列中如今有多少元素
# q.put(22)
# q.put(33,block=False)   # 若是隊列最大能放2個元素,這時候放了第三個,默認是阻塞的,block=False,若是就會報錯:queue.Full
# q.put(33,block=True,timeout=2)  # 設置爲阻塞,若是timeout設置的時間以內,尚未人來取,則就會報錯:queue.Full
print(q.get())
print(q.get())
print(q.get(timeout=2))   # 隊列裏的數據已經取完了,若是再取就會阻塞,這裏timeout時間2秒,就是等待2秒,隊列裏尚未數據就報錯:queue.Empty

 

1.2 queue.join

# join:實際上意味着等到隊列爲空,再執行別的操做,不然就一直阻塞,不是說get取完了,就不阻塞了,而是每次get以後,windows

# 要執行:task_done 告訴一聲已經取過了,等隊列爲空,join纔不阻塞。api

 

下面的程序是阻塞的數組

q = queue.Queue(5)

q.put(123)
q.put(456)
q.get()
# q.task_done()
q.get()
# q.task_done()  # 在完成一項工做以後,Queue.task_done()函數向任務已經完成的隊列發送一個信號
q.join()

 

下面的程序是不阻塞的:

q = queue.Queue(5)

q.put(123)
q.put(456)
q.get()
q.task_done()
q.get()
q.task_done()  # 在完成一項工做以後,Queue.task_done()函數向任務已經完成的隊列發送一個信號
q.join()

 

1.3 其餘隊列

import queue

# queue.Queue,先進先出隊列

# queue.LifoQueue,後進先出隊列

# queue.PriorityQueue,優先級隊列

# queue.deque,雙向對隊

 

# queue.Queue(2) 先進先出隊列

# put放數據,是否阻塞,阻塞時的超時事件

# get取數據(默認阻塞),是否阻塞,阻塞時的超時事件

# qsize()真實個數

# maxsize 最大支持的個數

# join,task_done,阻塞進程,當隊列中任務執行完畢以後,再也不阻塞

 

import queue
# q = queue.Queue(2) 先進先出隊列
# q = queue.LifoQueue() 後進先出隊列
# q = queue.PriorityQueue() 優先級隊列
# q = queue.deque()   雙向隊列

q = queue.LifoQueue()
q.put(123)
q.put(456)
# 打印;456
print(q.get())

# 優先級最小的拿出來
# 若是優先級同樣,則是誰先放,就先取出誰
q = queue.PriorityQueue()
q.put((1,'alex1'))
q.put((1,'alex2'))
q.put((1,'alex3'))
q.put((3,'alex3'))
# (1, 'alex1')
print(q.get())


q = queue.deque()
q.append(123)
q.append(333)
q.appendleft(456)
# deque([456, 123, 333])
print(q)
# 打印:456
print(q[0])
q.pop()     # 從右邊刪除
# deque([456, 123])
print(q)
q.popleft() # 從左邊刪除

 

python的隊列是在內存裏建立的,python的進程退出了,則隊列也清空了。

 

 

 

2、生產者消費者模型(隊列)

1生產者消費者模型的做用:

1解決阻塞

2就是解耦,修改生產者,不會影響消費者,反之亦然。

 

2)在生產環境,用生產者消費者模型,就能夠解決:

一、處理瞬時併發的請求問題。瞬時的鏈接數就不會佔滿。因此服務器就不會掛了。

二、客戶端提交一個請求,不用等待處理完畢,能夠在頁面上作別的事情。

 

 

2.1)若是不用隊列存數據,服務端經過多線程來處理數據:

用戶往隊列存數據,服務器從隊列裏取數據。

沒有隊列的話,就跟最大鏈接數有關係,每一個服務器就有最大鏈接數。

客戶端要獲取服務器放回,服務器要查、修改數據庫或修改文件,要2分鐘,那客戶端就要掛起連接2分鐘,2萬個鏈接一半都要掛起,服務器就崩潰了。

 

若是沒有隊列,第一個用戶發來請求,連上服務器,佔用鏈接,等待2分鐘。

第二我的來也要佔用2分鐘。

web服務器

若是要處理併發,有10萬併發,若是:一臺機器接收一個鏈接,須要10萬個機器,等待2分鐘就處理完了。

 

2.2)把請求放在隊列的好處

用戶發來請求,把請求放到隊列裏,可讓鏈接立刻斷開,不會阻塞,就不佔用服務器的鏈接數了。若是看到訂單處理了沒,就要打開另一個頁面,查看請求是否處理。

 

服務器查詢處理任務的時候,每一個才花2分鐘,服務器耗時是沒有減小的。

可是這樣作,客戶端就不會持續的佔用鏈接了。那瞬時的鏈接數就不會佔滿。因此服務器就不會掛了。

可是後臺要處理10萬個請求,也須要50臺服務器。並不會減小服務器數量。

這樣就能處理瞬時併發的請求問題。

 

服務器只是處理請求,是修改數據庫的值,不是告訴客戶端。而是客戶端再發來請求,查詢數據庫已經修改的內容。

提交訂單以後,把這個訂單扔給隊列,程序返回「正在處理」,就不等待了,而後斷開這個鏈接,你能夠在頁面裏作別的事情,不用一直等待訂單處理完。這樣就不影響服務器的最大鏈接數。在頁面幫你發起一個alax請求,url,不停的請求(多是定時器),個人訂單成功沒有,個人訂單成功沒有,若是訂單成功了,就自動返回頁面:訂單成功

若是不用隊列的話,一個請求就佔用一個服務器,等待的人特別多,等待鏈接的個數太多了。服務器就掛掉了。

隊列就沒有最大個數限制,把請求發給隊列了,而後http連接就斷開了,就不用等待了。

12306買票的時候,下次再來請求的時候,就會告訴你,前面排了幾我的。

 

3python queue的特色:

 

pythonqueue是內存級別的。rabbitmq能夠把隊列發到別的服務器上處理。

因此python裏的queue不能持久化,可是rabbitmq能夠持久化。

queue.Queue()這樣寫,隊列就沒有最大個數限制。queue.Queue(5)就是說隊列裏最多能放5個值

 

4)生產者消費者代碼示例:

 

import time,random
import queue,threading
q = queue.Queue()


def Producer(name):
    count =0
    while True:
        time.sleep(random.randrange(3))
        if q.qsize()<3:         # 只要盤子裏小於3個包子,廚師就開始作包子
            q.put(count)
            print("Producer %s has produced %s baozi.." %(name,count))
            count += 1

def Consumer(name):
    count =0
    while True:
        time.sleep(random.randrange(4))
        if not q.empty():       # 只要盤子裏有包子,顧客就要吃。
            data = q.get()
            print(data)
            print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' % (name,data))
        else:           # 盤子裏沒有包子
            print("---no baozi anymore----")
        count+=1

p1 = threading.Thread(target=Producer,args=('A',))
c1 = threading.Thread(target=Consumer,args=('B',))
c2 = threading.Thread(target=Consumer,args=('C',))
p1.start()
c1.start()
c2.start()

'''
當你設計複雜程序的時候,就能夠用生產者消費者模型,來鬆耦合你的代碼,也能夠減小阻塞。
'''

 

3、線程鎖

3.1 LockRLock

Lock只能鎖一次,RLock能夠遞歸多層,Lock不支持多層鎖嵌套,咱們通常用RLOCK

 

import threading
import time
NUM = 10

def func(lock):
    global NUM
    # 上鎖
    lock.acquire()
    lock.acquire()
    NUM -= 1
    time.sleep(2)
    print(NUM)
    # 開鎖
    lock.release()
    lock.release()

# Lock = threading.Lock()   # 不支持嵌套鎖,通常不用
RLock = threading.RLock()   # 通常用RLock,支持嵌套鎖。

for i in range(10):
    t = threading.Thread(target=func,args=(RLock,))
    t.start()

'''
死鎖:
就是你也搶資源,我也搶資源,誰也搶不走就是死鎖。
若是是python,就是Lock,弄成嵌套鎖,不支持,則變成死鎖。
解決辦法:
用RLock,支持嵌套鎖
'''

 

3.2 信號量 BoundedSemaphore

若是用線程鎖,一次只容許一個進入,若是用信號量能夠容許同時多少個一塊兒進入。

每次5個線程同時執行,可能就會同時修改一個值。

 

 

import threading
import time
NUM = 10

def func(i,lock):
    global NUM
    # 上鎖
    lock.acquire()  # 總共30個 一次執行5個 25個,依次類推:20,15。。。
    NUM -= 1
    time.sleep(2)
    print('NUM:',str(NUM),'i:',i)
    # 開鎖
    lock.release()

# Lock = threading.Lock()   # 不支持嵌套鎖,通常不用
# RLock = threading.RLock()   # 通常用RLock,支持嵌套鎖。
lock = threading.BoundedSemaphore(5)   # 參數是每次執行幾個線程

for i in range(30):
    t = threading.Thread(target=func,args=(i,lock,))
    t.start()

 

'''
打印:
NUM: 5 i: 2
NUM: 4 i: 0
NUM: 4 i: 4
NUM: 2 i: 3
NUM: 1 i: 1
NUM: 0 i: 6
NUM: 0 i: 5
NUM: -2 i: 7
NUM: -2 i: 8
NUM: -4 i: 9
NUM: -5 i: 10
NUM: -6 i: 11
NUM: -7 i: 12
NUM: -8 i: 13
NUM: -9 i: 14
NUM: -10 i: 15
NUM: -10 i: 16
NUM: -10 i: 18
NUM: -10 i: 17
NUM: -10 i: 19
'''
打印

 

3.3 event紅綠燈

要麼所有阻塞(紅燈),要麼所有放開(綠燈)

 

import threading

def func(i,e):
    print(i)    # 10個線程併發打印:0-9 ,而後到wait的時候,就開始檢測是什麼燈
    e.wait()    # 檢測是什麼燈,若是是紅燈,停;綠燈,行
    print(i+100)

event = threading.Event()

for i in range(10):
    t = threading.Thread(target=func,args=(i,event))
    t.start()

event.clear()   # 設置成紅燈
inp = input('>>>')
if inp == "1":
    event.set() #設置成綠燈
打印:
默認是紅燈
0
1
2
3
4
5
6
7
8
9
>>>1   # 輸入1,表示綠燈,就繼續執行
102
103
106
107
100
101
104
105
108
109
打印

 

3.4 線程鎖條件-condition1

Lock,RLock:線程鎖使用場景:

Lock,RLock是多個用戶同時修改一份數據,可能會出現髒數據,數據就會亂,就加互斥鎖,一次只能讓一我的修改數據,就能解決。

conditioneventBoundedSemaphore 使用場景:

若是寫了個爬蟲,在創建數據庫鏈接,線程就等着,什麼能數據庫能用了,就開通線程,再爬蟲。

 

eventkua一下,全走了。

notify維護一個隊列,傳幾個,就只能出去幾回。

 

 

import threading


def func(i, con):
    print(i)
    con.acquire()
    con.wait()
    print(i + 100)
    con.release()


c = threading.Condition()
for i in range(10):
    t = threading.Thread(target=func, args=(i, c,))
    t.start()

while True:
    inp = input(">>>")
    if inp == 'q':
        break
    c.acquire()
    c.notify(int(inp))
    c.release()

 

打印
0
1
2
3
4
5
6
7
8
9
>>>1        # 只讓1個線程運行
>>>100
2           # 再放出去2個線程
>>>102
101
3           # 再放出去3個線程
>>>103
104
105
4           # 再放出去4個線程,此時10個已經執行了
>>>108
109
106
107
5           # 再輸入5,又進入while循環 提示輸入:>>>
>>>q        # 輸入q就退出循環了。

Process finished with exit code 0
打印

 

3.5 線程鎖條件-condition2

 

con.wait_for裏傳一個函數名當參數,返回布爾值,是True,就執行下面的代碼。反之,就不執行。

不管是否返回True,都是用了一個線程。

import threading

def condition():
    ret = False
    r = input('>>>')
    if r == 'true':
        ret = True
    else:
        ret = False
    return ret


def func(i,con):
    print(i)
    con.acquire()
    con.wait_for(condition)     # 只能一個一個過
    print(i+100)
    con.release()

c = threading.Condition()
for i in range(10):
    t = threading.Thread(target=func, args=(i,c,))
    t.start()
'''
打印:
0
>>>1
2
3
4
5
6
7
8
9
s           # 第0個線程, 雖然沒返回True,沒有答應101,可是仍是使用了一個線程了。
>>>w        # 第1個線程
>>>e        # 第2個線程
>>>true     # 第3個線程
103
>>>true
104
>>>true
105
>>>true
106
>>>true
107
>>>true
108
>>>true     # 第10個線程
109
true        # 線程執行完畢,一直等待,就一直阻塞
true
w
'''
打印

 

3.6 線程鎖定時器

from threading import Timer

def hello():
    print("hello, world")

t = Timer(1, hello)  # 等1秒,執行hello
t.start()  # after 1 seconds, "hello, world" will be printed

 

 

4、自定義線程池

 

4.1 自定義線程池基礎版

import queue
import threading
import time
class TheadPool:
    def __init__(self,maxsize = 5):
        self.maxsize = maxsize
        self._q = queue.Queue(maxsize)
        for i in range(maxsize):    # 一、初始化的時候,先往隊列裏放5個線程
            self._q.put(threading.Thread)
        # 【threading.Thread, threading.Thread, threading.Thread, threading.Thread】
    def get_thread(self):
        return self._q.get()

    def add_thread(self):
        self._q.put(threading.Thread)

pool = TheadPool(5)

def task(arg,p):
    # 二、線程併發執行,5個線程在瞬間(1秒鐘以內)從隊列裏取出5個(執行get_thread()方法)
    #    5個線程在瞬間打印0-4,就是i
    print(arg)
    time.sleep(1)   # 三、停了5秒
    p.add_thread()  # 四、5個線程執行:隊列添加線程(由於是5個線程執行,一個線程添加一個,隊列總共是5個線程)

for i in range(20): # 五、而後這樣先取走5個,再put5個,而後打印i,就會出現:第一次打印:0-4,而後是:5-9,10-14,15-19
    # threading.Thread類
    t = pool.get_thread()
    obj = t(target=task,args=(i,pool,))
    obj.start()

這個程序的問題:
線程沒有被重用,線程一下開到最大(浪費)

打印:
第一次打印:0-4,而後是:5-9,10-14,15-19
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
打印

 

4.2 自定義線程池

4.2.1 自定義線程池思路

不要把隊列裏放線程,而是聽任務,開三個線程來從隊列裏取任務,若是都取完了,就會阻塞

方法1

設置超時時間

方法2

往隊列尾部,加三個空值,若是取得是空值,則終止線程。

沒有空閒線程,而且已經建立的線程小於最大的線程數,這樣纔會建立線程。

 

4.2.2 出現的問題

注意;以前的自定義線程池,若是定義queue的最多能放值的個數,pool = ThreadPool(5,5)

terminate就很差使了。

有的時候會一直阻塞,由於隊列裏已經有5個了,再往裏面put一個,就超出queue裏最大的個數。

解決辦法是:
加上這一行

 

4.2.3自定義線程池代碼

import queue
import threading
import contextlib
import time

StopEvent = object()
RUN = 0     # 定義線程池的三種狀態
CLOSE = 1
TERMINATE = 2
iNum=0
'''
開啓最大個數爲5個的隊列,
'''

class ThreadPool(object):

    def __init__(self, max_num, max_task_num = None):
        if max_task_num:        # 若是傳了最大隊列數,就設置,不然就是無限大。
            self.q = queue.Queue(max_task_num)
        else:
            self.q = queue.Queue()
        self.max_num = max_num      # 設置最大線程數
        self.cancel = False         # 假如已經執行close了,就再也不執行任務,生成線程處理了
        self.generate_list = []     # 已經生成的線程數列表
        self.free_list = []         # 空閒的線程數列表
        self._state = RUN

    def run(self, func, args, callback=None):
        """
        線程池執行一個任務
        :param func: 任務函數
        :param args: 任務函數所需參數
        :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值(默認爲None,即:不執行回調函數)
        :return: 若是線程池已經終止,則返回True不然None
        """
        if self.cancel:         # 假如已經執行close了,就再也不執行任務,生成線程處理了
            return
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: # 假如空閒的線程列表爲空,而且已經生成的線程數小於最大線程數
            self.generate_thread()      # 建立線程

        w = (func, args, callback,)     # 把當前任務放入隊列,也就是run循環了300次,就有300個任務放入隊列
        self.q.put(w)       # 注意:隊列數是多少個,就要開啓幾個線程,由於當要關閉的線程池時,
                            # 要把空對象加到隊列。線程判斷獲取到是空對象(此時已經把queue裏的任務都取完了)就關閉線程。

        global iNum
        iNum+=1
        # print('qsize:',str(self.q.qsize()))
    def generate_thread(self):
        """
        建立一個線程
        """
        t = threading.Thread(target=self.call)  # 執行call函數
        t.start()

    def call(self):
        """
        循環去獲取任務函數並執行任務函數
        """
        current_thread = threading.currentThread    # 獲取當前線程
        self.generate_list.append(current_thread)   # 把當前線程加入到已經生成線程列表

        event = self.q.get()        # 從隊列裏取一個任務
        while event != StopEvent:   # 假如 這個任務不是空對象

            func, arguments, callback = event      # 傳進去的任務是個元組,由函數,參數,回調函數組成。
            try:
                result = func(*arguments)       # 執行任務,返回result
                success = True                 # 執行成功,返回狀態爲True
            except Exception as e:
                success = False
                result = None
            else:
                if callback is not None:       # 假若有回調函數
                    try:
                        callback(success, result)   # 把狀態和返回值傳給回調函數執行
                    except Exception as e:
                        pass
            # 執行worker_state函數,空閒線程列表裏是否加入個線程。在yield處執行with下的代碼
            with self.worker_state(self.free_list, current_thread):
                if self._state == TERMINATE:        # 假如線程池狀態是TERMINATE
                    print(11111111111111111111111)
                    event = StopEvent               # 就把當前任務賦值爲空對象,while循環不知足,這樣就走else的內容

                else:
                    event = self.q.get()            # 若是不是TERMINATE狀態,則把當前任務賦給event對象
        else:       # 若是while循環不知足,或者while循環完了,沒有break,就執行else內容。

            self.generate_list.remove(current_thread)   # 隊列獲取到了空對象,就關閉線程(從列表中移除當前的線程)
            print(len(self.generate_list))


    def close(self):        # 先執行close(),再執行join()
        """
        執行完全部的任務後,全部線程中止
        """
        if self._state == RUN:
            self._state = CLOSE
            self.cancel = True
        full_size = len(self.generate_list)     # 查看已經生成的線程數個數
        while full_size:
            self.q.put(StopEvent)       # 往隊列尾部加上一個空對象,因爲隊列是先進先出的,因此空對象是最後獲取的,經過空對象就能關閉線程。
            full_size -= 1      # 循環的次數爲生成的線程的總個數

    def terminate(self):        # 直接執行terminate()
        """
        不管是否還有任務,終止線程
        """
        self._state = TERMINATE
        print("len:",str(len(self.generate_list)))
        while self.generate_list:   # 假如線程列表不爲空,就往隊列里加上空對象
            print('q.qsize():',str(self.q.qsize()))
            self.q.get()
            self.q.put(StopEvent)

        # self.q = queue.Queue()
        print(self.q.empty())       # 查看隊列是否爲空,至關於q.size==0
        print('------------'+str(self.q.qsize()))

    def join(self):     # CLOSE和join結合用
        """Waits until all outstanding tasks have been completed."""
        assert self._state in (CLOSE,)
        delay = 0.0005
        if self._state==CLOSE:
            while self.q.qsize() > 0:
                delay = min(delay * 2, .05)



    @contextlib.contextmanager      # 上下文管理器
    def worker_state(self, state_list, worker_thread):  # 傳入的是空閒線程列表和當前線程
        """
        用於記錄線程中正在等待的線程數
        """
        state_list.append(worker_thread)        # 把當前線程加到空閒線程裏,yield前面的代碼至關於執行__enter__,
        try:
            yield           # yield是執行with worker_state下的代碼,
        finally:            # yield後面的代碼至關於執行__exit__
            state_list.remove(worker_thread)    # 執行完一個queue的全部任務了,就移除這個線程了。由於一個隊列對應着一個線程。


pool = ThreadPool(5,5)

def callback(status, result):
    # status, execute action status
    # result, execute action return value
    pass

def action(i):
    print(i)

for i in range(200):
    ret = pool.run(action, (i,), callback)
pool.terminate()
# pool.close()
# pool.join()
print(1234234523452345234523452345234523452345234523455)
# time.sleep(1)
print(pool.q.qsize())
print(len(pool.generate_list), len(pool.free_list))
print('iNum:',iNum)
# print(len(pool.generate_list), len(pool.free_list))

 

 

5、進程之間的數據共享

5.1 多進程

 

windows里加main才能執行,若是在linux不加main能夠執行。

windows下,若是在程序裏,不方便加main,只能放棄了。

 

from multiprocessing import Process

def foo(i):
    print('say hi',i)

if __name__ == '__main__':
    for i in range(10):
        p = Process(target=foo,args=(i,))
        p.start()

 

 

 

5.2 daemonjoin 主線程是否等子線程

 

主線程執行完,子線程是否終止掉

 

5.2.1 默認是不等的

from multiprocessing import Process

def foo(i):
    print('say hi',i)

if __name__ == '__main__':
    for i in range(10):
        p = Process(target=foo,args=(i,))
        # p.daemon = True
        p.start()
        # p.join()
    print(123456)
'''
打印:
123456
say hi 3
say hi 1
say hi 6
say hi 2
say hi 9
say hi 7
say hi 0
say hi 5
say hi 4
say hi 8
'''
打印

5.2.2 daemon+join等子線程

from multiprocessing import Process

def foo(i):
    print('say hi',i)

if __name__ == '__main__':
    for i in range(10):
        p = Process(target=foo,args=(i,))
        p.daemon = True
        p.start()
        p.join()
    print(123456)
'''
打印:
say hi 0
say hi 1
say hi 2
say hi 3
say hi 4
say hi 5
say hi 6
say hi 7
say hi 8
say hi 9
123456
'''
打印

 

 

5.3 進程默認數據不共享

5.3.1 線程中數據共享

 

這個例子是單線程,數據是共享的,不管單線程仍是多線程都是共享的。

from multiprocessing import Process
import multiprocessing

def foo(i,li):
    li.append(i)
    print('say hi',i,li)


if __name__ == '__main__':
    li = []
    for i in range(10):
        # p = Process(target=foo,args=(i,li))
        foo(i,li)
        # p.daemon = True
        # p.start()
        # p.join()
'''
正常的執行結果打印:最後是數組裏有10個數
say hi 0 [0]
say hi 1 [0, 1]
say hi 2 [0, 1, 2]
say hi 3 [0, 1, 2, 3]
say hi 4 [0, 1, 2, 3, 4]
say hi 5 [0, 1, 2, 3, 4, 5]
say hi 6 [0, 1, 2, 3, 4, 5, 6]
say hi 7 [0, 1, 2, 3, 4, 5, 6, 7]
say hi 8 [0, 1, 2, 3, 4, 5, 6, 7, 8]
say hi 9 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
'''
打印

 

 

5.3.2 進程默認數據不共享

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = 'WangQiaomei'

from multiprocessing import Process
import multiprocessing

def foo(i,li):
    li.append(i)
    print('say hi',i,li)


if __name__ == '__main__':
    li = []
    for i in range(10):
        p = Process(target=foo,args=(i,li))
        # foo(i,li)
        # p.daemon = True
        p.start()
        # p.join()

 

'''
正常的執行結果打印:最後是數組裏有10個數,可是多進程最後數組裏只有9
say hi 0 [0]
say hi 1 [1]
say hi 2 [2]
say hi 3 [3]
say hi 4 [4]
say hi 5 [5]
say hi 6 [6]
say hi 7 [7]
say hi 8 [8]
say hi 9 [9]
'''
打印

 

 

5.4 queues實現:進程之間數據共享

from multiprocessing import Process
from multiprocessing import queues
import multiprocessing

def foo(i,arg):
    arg.put(i)
    print('say hi',i,arg.qsize())


if __name__ == '__main__':
    # li =[]
    li = queues.Queue(20,ctx=multiprocessing)
    for i in range(10):
        p = Process(target=foo,args=(i,li,))
        # p.daemon = True
        p.start()
        # p.join()
'''
打印:
say hi 1 1
say hi 5 3
say hi 0 5
say hi 3 6
say hi 7 7
say hi 6 7
say hi 2 7
say hi 9 9
say hi 4 9
say hi 8 10
'''
打印

 

 

5.5數組和列表的區別:

數組和列表的特色比較:

1、數組類型必定:

數組只要定義好了,類型必須是一致的

python裏列表裏,能夠放字符串也能夠放數字。

 

 

2、數組個數必定:

建立數組的時候,就要指定數組多大,好比數組是10,再添加11個,就會報錯

列表是動態的,個數不必定。

 

數組和列表的相鄰元素的內存位置比較:

 

python的列表是基於c來實現。

python的列表相鄰的兩個元素在內存裏,不必定挨着。是用鏈表實現的。

由於個數不限制,開始是10個長度,因此可能第11個被佔用了。

每一個元素,記錄上一個和下一個的位置在哪裏。能夠找到位置。

 

字符串和int類型的佔用內存的位置大小確定不同。因此數組,不僅是長度同樣,類型也要同樣。

 

 

對於數組的話,相鄰的元素是挨着的。

數組是int類型,而且長度是肯定的。因此是相鄰的。

數組的內存地址是連續的,列表不是,就是鏈表,鏈表是每一個元素記錄上一個位置和下一個位置在哪裏。

 

如圖,內存中:數組是黑框位置,列表是分散的分佈:

 

5.6 數組實現:進程之間數據共享

from multiprocessing import Process
from multiprocessing import queues
import multiprocessing
from multiprocessing import Array

def foo(i,arg):
    # arg.put(i)
    # print('say hi',i,arg.qszie())
    arg[i] = i + 100
    for item in arg:
        print(item)
    print("====================")

if __name__ == '__main__':
    # li =[]
    # li = queues.Queue(20,ctx=multiprocessing)
    li = Array('i',5)
    for i in range(5):
        p = Process(target=foo,args=(i,li,))
        # p.daemon = True
        p.start()
        # p.join()

 

'''
打印:
0
0
102
0
0
====================
0
101
102
0
0
====================
0
101
102
103
0
====================
100
101
102
103
0
====================
100
101
102
103
104
====================
'''
打印

 

注意:Array的參數,寫了i,就只能放數字:

 

 

5.7 dict實現:進程之間數據共享

 

用Manager()對象建立一個特殊的字典。
For循環建立了多個進程,每一個進程均可以利用dict。
dict.values()就是獲取它全部的值,
若是字典獲取的值是遞增的,說明數據是共享的。

5.7.1 AttributeError:'ForkAwareLocal' object has no attribute 'connection'

若是把join註釋就會報錯:

報錯:

    conn = self._tls.connection

AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

from multiprocessing import Process
import multiprocessing
from multiprocessing import Manager

def foo(i,arg):
    arg[i] = i + 100
    print(arg.values())

if __name__ == '__main__':
    obj = Manager()
    li = obj.dict()
    for i in range(10):
        p = Process(target=foo,args=(i,li,))
        p.start()
        # p.join()        # 方式1
    # 方式2
    # import time
    # time.sleep(10)

 

緣由是:
li = obj.dict()是在主進程建立的
for循環裏建立的是子進程,子進程是修改主進程:arg[i] = i + 100(arg就是li)
主進程和子進程都在執行,主進程裏有個字典,子進程要修改這個字典。
進程和進程之間要通訊的話,須要建立鏈接的。至關於兩邊都寫上一個socket,進程之間經過鏈接進行操做。
主進程執行到底部,說明執行完了,會把它裏面的鏈接斷開了。
主進程把鏈接斷開了,子進程就鏈接不上主進程。
若是在底部寫停10秒,主進程就中止下來,並無執行完。主進程沒有執行完,鏈接尚未斷開,那子進程就能夠鏈接它了。

 

5.7.2 解決方法1:停10秒(不建議)

from multiprocessing import Process
import multiprocessing
from multiprocessing import Manager

def foo(i,arg):
    arg[i] = i + 100
    print(arg.values())

if __name__ == '__main__':
    obj = Manager()
    li = obj.dict()
    for i in range(10):
        p = Process(target=foo,args=(i,li,))
        p.start()
        # p.join()        # 方式1
    # 方式2
    import time
    time.sleep(10)

 

打印:
[102]
[102, 105]
[102, 103, 105]
[102, 103, 105, 107]
[109, 102, 103, 105, 107]
[101, 102, 103, 105, 107, 109]
[101, 102, 103, 105, 106, 107, 109]
[101, 102, 103, 105, 106, 107, 108, 109]
[100, 101, 102, 103, 105, 106, 107, 108, 109]
[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
打印

 

 

5.7.3測試:

若是把sleep(10)改爲sleep(0.1),那麼只會打印前面幾行,而後又報以前的錯誤

打印:

[102]

[102, 105]

[102, 103, 105]

[102, 103, 105, 107]

接着報錯:

    conn = self._tls.connection

AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

 

5.7.4 解決辦法2:用join

p.join()
因此使用多進程的常規方法是,先調用start啓動進程,再調用join要求主進程等待當前子進程的結束。
join是用來阻塞當前線程的,每次循環:p.start()以後,p就提示主線程,須要等待p結束才向下執行,那主線程就乖乖的等着啦。

from multiprocessing import Process
import multiprocessing
from multiprocessing import Manager

def foo(i,arg):
    arg[i] = i + 100
    print(arg.values())

if __name__ == '__main__':
    obj = Manager()
    li = obj.dict()
    for i in range(10):
        p = Process(target=foo,args=(i,li,))
        p.start()
        p.join()        # 方式1:
    # 方式2
    # import time
    # time.sleep(10)
打印:
[102]
[102, 105]
[102, 103, 105]
[102, 103, 105, 107]
[109, 102, 103, 105, 107]
[101, 102, 103, 105, 107, 109]
[101, 102, 103, 105, 106, 107, 109]
[101, 102, 103, 105, 106, 107, 108, 109]
[100, 101, 102, 103, 105, 106, 107, 108, 109]
[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
打印

 

5.7.5 總結:進程之間共享的方式:

queues,數組和字典的方式

dict對類型沒有限制,跟使用字典是如出一轍的。用數組則限制了數據類型。

進程和進程之間要通訊,是要鏈接的。

主進程執行到底部了,就執行完了,就把鏈接斷開了。子進程就連不上主進程了。

 

6、進程

6.1 進程鎖

 

沒有鎖,多個進程就會一塊兒修改數據:

from multiprocessing import Process
from multiprocessing import queues
from multiprocessing import Array
from multiprocessing import RLock, Lock, Event, Condition, Semaphore
import multiprocessing
import time

def foo(i,lis):
    lis[0] = lis[0] - 1     # 由於停1秒,在1秒以內,10個進程都已經修改了數據。
    time.sleep(1)           # 停1秒前全都修改完

    print('say hi',lis[0])  # 打印的全是0

if __name__ == "__main__":
    # li = []
    li = Array('i', 1)
    li[0] = 10
    for i in range(10):
        p = Process(target=foo,args=(i,li))
        p.start()
進程沒有鎖

 

'''
打印:
say hi 0
say hi 0
say hi 0
say hi 0
say hi 0
say hi 0
say hi 0
say hi 0
say hi 0
say hi 0
'''
打印

 

 

 

加把鎖就把進程鎖住了,同一時間只有一個進程能夠運行,其餘都等着。

RLock, Lock, Event, Condition, Semaphore    # 這些方法跟線程的使用方法是同樣的

 

from multiprocessing import Process
from multiprocessing import queues
from multiprocessing import Array
from multiprocessing import RLock, Lock, Event, Condition, Semaphore    # 這些方法跟線程的使用方法是同樣的
import multiprocessing
import time

def foo(i,lis,lc):
    lc.acquire()        # 加鎖
    lis[0] = lis[0] - 1
    time.sleep(1)
    print('say hi',lis[0])
    lc.release()        # 釋放鎖

if __name__ == "__main__":
    # li = []
    li = Array('i', 1)
    li[0] = 10
    lock = RLock()
    for i in range(10):
        p = Process(target=foo,args=(i,li,lock))
        p.start()

 

 

'''
打印:
say hi 9
say hi 8
say hi 7
say hi 6
say hi 5
say hi 4
say hi 3
say hi 2
say hi 1
say hi 0
'''
打印

 

 

6.2 進程池

6.2.1 進程池串行-apply

apply從進程池裏取進程,而後一個一個執行,第一個進程執行完,第二個進程才執行,進程之間是串行的操做。這樣就不是併發操做,沒有太大意義。

from multiprocessing import Pool

def f1(arg):
    print(arg)
if __name__ == "__main__":
    pool = Pool(5)

    for i in range(10):
        pool.apply(func=f1,args=(i,))   # apply執行函數,傳入參數
    print('end')

 

'''
打印:
0
1
2
3
4
5
6
7
8
9
end
'''
打印

 

6.2.1 進程池異步-apply_rsync

from multiprocessing import Pool

def f1(arg):
    print(arg)
if __name__ == "__main__":
    pool = Pool(5)

    for i in range(10):
        # pool.apply(func=f1,args=(i,))   # apply執行函數,傳入參數
        pool.apply_async(func=f1,args=(i,))
    print('end')
'''
打印:
end
'''

這10個任務kua一下全執行了,主進程執行到end了。
主進程執行完了,子進程就被終止掉了。
主進程執行完了,就再也不等子線程了,若是要等就要設置參數。
多線程線程默認也是,主進程不等子進程,多線程是:daemon=True加join來讓他等。

 

6.3 主線程等子線程

6.3.1 close等子線程所有執行完

 

 

join是終止進程,必需要前面執行close或者terminate方法。

執行close,等全部任務(10個)所有執行完,再終止

執行terminate,表示當即終止,無論你當前的任務是否執行完,都當即終止。

 

from multiprocessing import Pool
import time


def f1(arg):
    time.sleep(1)   # 加這句是爲了看出5個5個執行的效果。
    print(arg)
if __name__ == "__main__":
    pool = Pool(5)

    for i in range(10):
        # pool.apply(func=f1,args=(i,))   # apply執行函數,傳入參數
        pool.apply_async(func=f1,args=(i,))
    pool.close()
    pool.join()     # join表示:主進程執行到這裏的時候,夯住了,等子進程結束的時候,再往下執行。
    print('end')

 

光執行join,會觸發下面的斷言錯誤:
assert self._state in (CLOSE, TERMINATE)
join源代碼有這句,只有符合這個條件的,纔不會報錯。
這個條件就是:執行join以前,必須執行close或者terminate方法。
close+join:是等子線程所有執行完了,才繼續往下執行。

 

這是5個5個執行。由於是5個線程同時執行,總共要完成10個任務。

打印:
0
1
2
3
4
5
6
7
8
9
end
打印

 

 

6.3.2 terminate當即終止

from multiprocessing import Pool
import time

def f1(arg):
    time.sleep(1)
    print(arg)
if __name__ == "__main__":
    pool = Pool(5)

    for i in range(10):
        # pool.apply(func=f1,args=(i,))   # apply執行函數,傳入參數
        pool.apply_async(func=f1,args=(i,))
    time.sleep(1.5)
    pool.terminate()   # 當即終止
    pool.join()
    print('end')

光執行join,會觸發下面的斷言錯誤:
assert self._state in (CLOSE, TERMINATE)
join源代碼有這句,只有符合這個條件的,纔不會報錯。
這個條件就是:執行join以前,必須執行close或者terminate方法。
terminate+join:是表示當即終止,無論你當前的任務是否執行完,都當即終止。

'''
打印:
0
1
2
3
4
end
'''
打印

 

 

7、協程

 

7.1 協程及gevent原理

 

IO密集型:用多線程+gevent(更好),多線程

計算密集型:用多進程

 

協程原理:利用一個線程,分解一個線程成爲多個「微線程」==>程序級別

 

若是寫爬蟲,就訪問別的網站,拿別人源碼。http請求叫IO請求,用多線程。

假設要訪問3url,建立3個線程,都在等待着,第一個有數據返回就繼續執行,以此類推。

在等待過程當中,就什麼事也沒幹。

 

協程的方式。

計算機幫你建立進程、線程。線程是人爲建立出來的。用一個線程,一下子執行這個操做,一下子執行那個操做。

協程是隻用一個線程。程序員利用io多路複用的方式,讓協程:

先訪問一個url,不等待返回,就再訪問第二個url,訪問第三個url,而後也在等待。

greenlet本質是實現協程的。

注意:協程自己不高效,協程的本質只是程序員調用的,那爲啥gevent這麼高效率呢,是由於用了協程(greenlet+IO多路複用的方式。

IO多路複用的用法才能高效。因此用的時候就用gevent就行了。

 

 

用多線程:假設每爬一個網址須要2秒,3url,就是3個請求,等待2秒,就能夠繼續往下走。

若是用gevent,用單線程,單線程應該從上到下執行,用for循環讀取3url,往地址發送url請求,就是IO請求,線程是不等待的。

for循環再拿第二個url,再發第三個url。在這過程當中,誰先回來,就處理誰。

 

資源佔用上,多線程佔用了3個線程,2秒鐘,多線程啥也沒幹,在等待。gevent2秒鐘,只要發送請求了,接着就想幹什麼幹什麼。

 

7.2 greenlet協程

 

greenlet切換協程:

 

from greenlet import greenlet

def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()

def test2():
    print(56)
    gr1.switch()
    print(78)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

 

'''
打印:
12
56
34
78
'''
打印

 

7.3 gevent

greenlet切換協程:

 

import gevent

def foo():
    print('Running in foo')         # 第1步
    gevent.sleep(0)
    print('Explicit context switch to foo again')   # 第3步

def bar():
    print('Explicit context to bar')    # 第2步
    gevent.sleep(0)
    print('Implicit context switch back to bar')    # 第4步

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

 

'''
打印:
Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar
'''
打印

 

7.3 gevent 切換執行

greenlet切換執行協程的本質是執行以下代碼:

 

import gevent

def foo():
    print('Running in foo')         # 第1步
    gevent.sleep(0)
    print('Explicit context switch to foo again')   # 第3步

def bar():
    print('Explicit context to bar')    # 第2步
    gevent.sleep(0)
    print('Implicit context switch back to bar')    # 第4步

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

 

 

'''
打印:
Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar
'''
打印

 

 

可是日常咱們用gevent,不用這麼麻煩,而是使用下面的代碼就行了。

 

7.4 gevent使用方法

 

遇到IO操做自動切換:

from gevent import monkey; monkey.patch_all()
import gevent
import requests

# 這個函數是發http請求的
def f(url):
    print('GET: %s' % url)
    resp = requests.get(url)
    data = resp.text    # 獲取內容
    print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),     # 建立了一個協程
        gevent.spawn(f, 'https://www.yahoo.com/'),      # 建立了一個協程
        gevent.spawn(f, 'https://github.com/'),         # 建立了一個協程
])

 

建立了三個協程。總共就一個線程,經過for循環發送三個url請求。而後等待結果,誰先回來,就處理誰。
經過requests.get(url)發送url請求,誰先回來,就拿到數據(data),拿到數據就能夠處理數據了。

這都是在一個線程裏執行的。

瞬間打印:
GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
而後等待哪一個url先返回,就先打印
47394 bytes received from https://www.python.org/.
25528 bytes received from https://github.com/.
450811 bytes received from https://www.yahoo.com/.
'''
打印

 

gevent的使用場景舉例:

一、scrapy框架內部用的gevent。發請求性能比線程高不少。

二、作api(url)監控,把代碼發佈到哪一個url,得自動檢測下返回值是否是200,或是指定的狀態碼。

發佈完成以後,就要發送http請求過去檢測一下返回的狀態碼。若是有20url請求,就用gevent一下全給發了,就不必建立多個線程,一個線程就足以了,而後配合多進程+gevent,又能夠利用多顆cpu的優點了。

 

monkey.patch_all()是什麼?

發送http請求,request本質上調用socket來發。原來執行http請求,就會通知我一下,執行完了,默認socket是沒有這個功能的。這至關於把原來的socket修改了,修改爲特殊功能的socket,發送請求若是完事了,會告訴你完事了。

其實內部就是把io請求作了個封裝而已。

相關文章
相關標籤/搜索