No.012-Python-學習之路-Day9-GIL|Thread|Process|Coroutine

GIL-Python全局解釋器鎖<global interpreter lock>

GIL官方說明:

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)html

GIL本質是一把互斥鎖,是互斥鎖其做用就是將並行變爲串行,其意思在於Python的線程是調用os的原生線程,Python沒法有效的控制;當一個進程內多個線程<包括解釋器本身的線程>要操做一個數據block時,會形成數據異常;而最簡單的辦法就是加鎖,因此在cpython內存在一個全局的鎖即GIL,用於確保在同一個時間段內進程中只有一個線程是正常運行的。python

GIL並不是是Python的一個特性,這個是CPython中的一個設計缺陷,其形成CPython的線程沒法利用多核運行;其在PyPy及JsPython中沒有這個問題。linux

So Python的Thread適合處理IO密集型的任務,在處理計算密集型任務時,反而會由於頻繁的context切換而形成運行速度變慢。編程

GIL與普通互斥鎖

GIL的存在保證了同一時刻只有一個線程在使用CPU,GIL在ticks=100IO操做的時候釋放;json

普通互斥鎖則是爲了在多線程時,保證修改共享數據時有序的修改,不會產生數據修改混亂;網絡

以下圖,GIL保護的是python解釋器,而用戶數據須要自定義互斥鎖保護;多線程

一個互斥鎖與GIL同時使用的場景併發

(1)多線程運行,假設Thread1得到GIL可使用cpu,這時Thread1得到 互斥鎖lock,Thread1能夠改date數據(但並
沒有開始修改數據)

(2)Thread1線程在修改date數據前發生了 i/o操做 或者 ticks計數滿100 (注意就是沒有運行到修改data數據),這個
時候 Thread1 讓出了Gil,Gil鎖能夠被競爭

(3) Thread1 和 Thread2 開始競爭 Gil (注意:若是Thread1是由於 i/o 阻塞 讓出的Gil Thread2一定拿到Gil,若是
Thread1是由於ticks計數滿100讓出Gil 這個時候 Thread1 和 Thread2 公平競爭)

(4)假設 Thread2正好得到了GIL, 運行代碼去修改共享數據date,因爲Thread1有互斥鎖lock,因此Thread2沒法更改共享數據
date,這時Thread2讓出Gil鎖 , GIL鎖再次發生競爭 

(5)假設Thread1又搶到GIL,因爲其有互斥鎖Lock因此其能夠繼續修改共享數據data,當Thread1修改完數據釋放互斥鎖lock,
Thread2在得到GIL與lock後纔可對data進行修改。

互斥鎖加與不加的區別<2.0在linux能夠復現現象>app

在python2.7上不加鎖執行dom

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading, time

lock = threading.Lock()  # 生成一把鎖 # 在3.0之後版本不會出錯,本身加鎖了;


def run():
    # lock.acquire()  # 上鎖
    n = 0
    num = nums[0] 
    while n < 100: # 循環目的隔開取值與寫回內存操做
        n += 1     # 同時機率性達到ticks切換GIL的目的
    num += 1
    nums[0] = num
    # lock.release()  # 修改完後,釋放鎖


nums = [1]

for i in range(1000):
    t = threading.Thread(target=run)
    t.start()

while threading.active_count() != 1:
    time.sleep(0.1)
else:
    print(nums[0])

'''
RESULT:並非預期的1001
[root@sq ~]# python lock_test.py 
955
[root@sq ~]# python lock_test.py 
967
[root@sq ~]# python lock_test.py 
969
'''

在Python2上加鎖執行,即去掉上面的註釋符號

[root@sq ~]# python lock_test.py 
1001
[root@sq ~]# python lock_test.py 
1001
[root@sq ~]# python lock_test.py 
1001

當一個進程能同時包含多把鎖時,使用普通的Thread.lock()會形成錯的鑰匙開鎖的狀況,這時候就須要使用遞歸鎖->threading.Rlock()

import threading

"""
遞歸鎖,形成的假死
"""

num1, num2 = 0, 0


lock = threading.RLock()  # 生成一把遞歸鎖<多重鎖的時候會認錯鑰匙>

def run1():
    print("---run1---")
    lock.acquire()
    global num1
    num1 += 1
    lock.release()
    return num1

def run2():
    print("---run2---")
    lock.acquire()
    global num2
    num2 += 1
    lock.release()
    return num2

def run3():
    lock.acquire()
    res1 = run1()
    print("---run3 run1-2----")
    res2 = run2()
    lock.release()
    print("value is [{}][{}]".format(res1, res2))

for i in range(10):
    t = threading.Thread(target=run3)
    t.start()

while threading.active_count() != 1:
    print(threading.active_count())
else:
    print("all thread is over;")

當咱們要限制進程生成速度時,可使用threading.Semaphore(num),即信號量;一個場景若是咱們須要併發100000個線程同時進行IO操做,若是不加限制會形成系統崩潰,此時就可使用semaphore限制同一時間可以同時運行的線程數量;

semaphore.acquire() -> num += -1

semaphore.release() -> num += 1

當num  <= 0時,不在釋放

import threading
import time

def get_time():
    print("{} is waiting sq.".format(threading.current_thread()))
    sq.acquire()
    print("{} is get sq.".format(threading.current_thread()))
    print(sq._value) # semaphore的計數
    time.sleep(1)
    print(time.ctime())
    sq.release()
    print("{} release sq.".format(threading.current_thread()))

sq = threading.Semaphore(5)

for i in range(100):
    t = threading.Thread(target=get_time)
    t.start()

while threading.active_count() != 1:
    time.sleep(0.1)
else:
    print("All Thread is over....")

'''
<Thread(Thread-1, started 38160)> is waiting sq.
<Thread(Thread-1, started 38160)> is get sq.
<Thread(Thread-2, started 33740)> is waiting sq.
<Thread(Thread-2, started 33740)> is get sq.
<Thread(Thread-3, started 9488)> is waiting sq.
<Thread(Thread-3, started 9488)> is get sq.
<Thread(Thread-4, started 8076)> is waiting sq.
<Thread(Thread-4, started 8076)> is get sq.
<Thread(Thread-5, started 35684)> is waiting sq.
<Thread(Thread-5, started 35684)> is get sq.
<Thread(Thread-6, started 6804)> is waiting sq.
<Thread(Thread-7, started 33728)> is waiting sq.
<Thread(Thread-8, started 28308)> is waiting sq.
...
'''

Thread與Process

線程<Thread>,是cpu可以調用的最小單位,是os發向cpu的一對指令,是進程中的實際運做單位;

進程<Process>, 每個程序的內存是獨立的,每一個程序以總體的形式由os進行管理,os對程序進行總體的包裝,裏面包含各類資源的調用,如內存的管理,網絡接口調用等;
對各類資源管理的集合就能夠稱爲進程<Process>;

進程與線程的關係

進程要操做CPU,要執行,必須至少要先建立一個線程;進程自己並不具有執行能力,只是各類資源的集合;

在一個進程中的全部線程共享這個進程的內存空間,訪問同一塊數據時,爲了保證有序性,使用互斥鎖,串行訪問;

每一個進程啓動時會建立一個線程即主線程,主線程能夠建立子線程,可是子線程是獨立的,除非被設置爲守護線程,不然不受主線程影響;

進程與線程的區別

a.線程共享內存空間,進程<包括子進程>的內存是獨立的,子進程copy父進程的內存並獨立劃分出本身的內存空間;  

b.同一個進程的線程們能夠直接交流<涉及數據共享及信息傳遞>,兩個進程間的通訊,必須經過一箇中間代理來實現;

c.新線程容易建立,新進程須要對其父進程進行一次克隆;

d.一個線程能夠控制和操做同一進程裏的其餘線程,可是進程只能操做子進程;

e.對主線程的修改有可能影響到同一進程內的其餘線程的行爲,可是對一個父進程的修改,對子進程沒有任何影響;

Thread的兩種實現

方式一:直接調用

import threading, time
"""
the simplest threading;
"""

# 這是一個任務
def run(n):
    print("task", n)
    time.sleep(2)

# 啓動兩個線程執行這個任務
t1 = threading.Thread(target=run, args=("t1",))
t2 = threading.Thread(target=run, args=("t2",))
t1.start()
t2.start()

方式二:繼承式調用

import threading, time
"""
the threading of a class;
"""

class Mythread(threading.Thread):

    def __init__(self, n):
        super(Mythread, self).__init__()
        self.n = n

    def run(self):
        print("running task", self.n)
        time.sleep(10)


t1 = Mythread("t1")
t2 = Mythread("t2")
t1.start()
t2.start()
Thread的deamon線程

Some threads do background tasks, like sending keepalive packets, or performing periodic garbage collection, or whatever. These are only useful when the main program is running, and it's okay to kill them off once the other, non-daemon, threads have exited.

Without daemon threads, you'd have to keep track of them, and tell them to exit, before your program can completely quit. By setting them as daemon threads, you can let them run and forget about them, and when your program quits, any daemon threads are killed automatically.

Note:Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an Event.

只須要將線程的flag位設置爲True,即爲守護線程:

import threading
import time
import random

data = 1

def add():
    global data
    time.sleep(random.randrange(2))
    data += 1
    print("{} is over.".format(threading.current_thread()))


for i in range(10):
    t = threading.Thread(target=add)
    t.setDaemon(True) # 將daemon的flag位設置爲True
    t.start()

print(data) # 打印完主線程結束

''' # 結果中只於4個進程在主線程結束前完成,且在主線程結束後,剩餘daemon再也不執行
<Thread(Thread-6, started daemon 29724)> is over.
<Thread(Thread-7, started daemon 7084)> is over.
<Thread(Thread-8, started daemon 5420)> is over.
<Thread(Thread-10, started daemon 29996)> is over.
'''
Thread的join()

join() 方法的功能是在程序指定位置,優先讓該方法的調用者使用 CPU 資源。該方法的語法格式:thread.join( [timeout] )

import threading, time

"""
the threading of a class;
start a threading with a loop;
show how to use "thread_obj.join(timout=int())"
"""
class Mythread(threading.Thread):

    def __init__(self, n):
        super(Mythread, self).__init__()
        self.n = n

    def run(self):
        print("running task-", self.n, threading.current_thread())
        time.sleep(2)
        print("running over-%d" % self.n)

start = time.time()
for i in range(50):
    t1 = Mythread(i)
    t1.start()
end = time.time()
print("111", end-start)
# 這裏面沒法算出時間,由於多線程啊,主程序自己就是一個線程,與其餘線程是並行關係;
# 默認狀況下主線程不會等待子線程執行完畢的;既然默認,便可設置;
# t1.join() # 主線程等待子線程執行結果,會變成串行;
# threading.active_count() 顯示目前進程中的線程數;
# threading.current_thread() 顯示當前運行的線程;

# 方式一:# 主線程等待子線程執行結果,會變成串行;
start = time.time()
t_list = []
for i in range(50):
    t_list.append(Mythread(i))
    t_list[-1].start()
    t1.join() 

# 方式二:會等待先輪循的線程執行結束
for thread in t_list:
    thread.join()
end = time.time()
print(threading.current_thread(), end-start)
Thread的Timer

This class represents an action that should be run only after a certain amount of time has passe.

Timers are started, as with threads, by calling their start() method. The timer can be stopped (before its action has begun) by calling the cancel() method. The interval the timer will wait before executing its action may not be exactly the same as the interval specified by the user.

Timer實現每隔一段時間運行某函數<global是否必須?進程的積壓如何復現?>

import threading
import time

def hello(name):
    print("Hello %s" %name)
    print("活躍的線程數量: %s" % threading.active_count())
    global timer
    timer = threading.Timer(1.0, hello, ["Bruce"])
    timer.start()
    time.sleep(10)
    print("線程 %s is over." % threading.current_thread().ident)


if __name__ == "__main__":
    timer = threading.Timer(1.0, hello, ["Bruce"])
    timer.start()
Thread的Event

An event is a simple synchronization object;the event represents an internal flag, and threads can wait for the flag to be set, or set or clear the flag themselves.

event = threading.Event() ///生成event object

# a client thread can wait for the flag to be set
event.wait()

# a server thread can set or reset it
event.set()
event.clear()

If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.

Event用於實現兩個或多個線程間的交互。以下面紅綠燈例子,紅綠燈線程設置event,汽車線程根據event的flag執行不行的動做;

import threading
import time


event = threading.Event()

def light():
    count = 0
    event.set()
    while True:
        if 30 >= count > 20: # 改紅燈
            event.clear()
            print("\033[41;1mred light is on...\033[0m")
        elif count > 30:
            event.set()
            count = 0
        else:
            print("\033[44;1mgreen light is on...\033[0m")
        time.sleep(0.5)
        count += 1


def car(name):
    while True:
        if event.isSet():
            print("Green light...the Car {} across the road.".format(name))
            break
        else:
            print("Red light , the Car {} is waiting.".format(name))
            event.wait()

l = threading.Thread(target=light)
l.start()

cars = ["Bruce", "Amadeus", "Lee", "Vick", "John", "Google"]

for car_nam in cars:
    t = threading.Thread(target=car, args=(car_nam, ))
    time.sleep(3)
    t.start()

while threading.active_count() != 1:
    pass
else:
    print("thread is over;")

queue隊列

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.there are three type of queue:

class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #last in fisrt out  
class queue.PriorityQueue(maxsize=0) #優先級越低越優

some functions in Queue

Queue.qsize() # 返回隊列的長度
Queue.emput() # return True if empty
Queue.full() # return True if full
Queue.put(item, block=True, timeout=None) # block是否阻塞,timeout阻塞多長時間,過期raise the Full exception.
Queue.put_nowait(item) # equal Queue.put(item, block=False)
Queue.get(block=True, timeout=None) # block是否阻塞,timeout阻塞多長時間,過期raise the Empty exception.
Queue.get_nowait() #equal Queue.get(block=False)
Queue.task_done() # customer thread調用,每次從queue中get一個數據以後,當處理好相關問題,最後調用該方法,以提示q.join()是否中止阻塞,讓線程向前執行或者退出;
Queue.join() # 阻塞,直到queue中的數據均被刪除或者處理。爲隊列中的每一項都調用一次。

queue隊列的做用

a.對於生產數據的能夠自行分配,對於排隊的程序能夠將數據放置進隊列,而後去處理其餘的東西,提升效率;
b.生產數據與取數據的沒有必然聯繫,兩方只跟隊列有關係,完成了程序的解耦;
c.隊列能夠理解爲一個有順序的容器;
列表與隊列最直接的區別

a.列表也是容器,爲何不用列表呢?對於queue來講數據只有一份,取走了就沒有了,列表則不是。

列表的簡單實用-先入先出queue

import queue

q_first_first = queue.Queue(maxsize=3) # 先入先出
q_first_first.put(1)
q_first_first.put(2)
q_first_first.put(3)
print(q_first_first.get())
print(q_first_first.get())

列表的簡單實用-後入先出queue

import queue

q_first_first = queue.LifoQueue(maxsize=3) # 後入先出
q_first_first.put(1)
q_first_first.put(2)
q_first_first.put(3)
print(q_first_first.get())
print(q_first_first.get())

列表的簡單實用-後入先出queue

import queue

q = queue.PriorityQueue() # 優先級,越低越優化
q.put((1, 1))
q.put((2, 2))
q.put((-1, 3))
q.put((-2, 4))
q.put((-1, 5))
q.put((6, 6))
print(q.get())
print(q.get())

生產者消費者模型

在併發編程中使用生產者和消費者模型可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。

生產者消費者模型的實現

生產者消費者模式經過一個容器來解決生產者和消費者的強耦合問題;生產者和消費者之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列;消費者不找生產者要數據,而是直接從阻塞隊列裏取;阻塞隊列就相對於一個緩衝區平衡了生產者和消費者的處理能力。

以下面的生產與消費簡單舉例

import threading
import queue
import time
import random

q = queue.Queue(10)

def produce():
    count = 0
    while count < 100:
        if not q.full():
            p = "產品-%s" % count
            q.put(p)
            print("生產了[%s]" % p)
            count += 1
            time.sleep(0.1)
    q.join() # 等待全部產品被使用完,若是沒有task_done的告知並不知道產品是否被使用
    print("共生產了%s個[%s],且已經用完。" % (count, p))


def customer(num, n):
    for i in range(n):
        if not q.empty():
            print("Customer<%s>消費了[%s]。" % (num, q.get()))
            q.task_done() # 告知生產者線程有個產品已經被使用
            time.sleep(0.3)
        else:
            print("Customer<%s>來晚了。" % num)
            break


p = threading.Thread(target=produce)
p.start()

for i in range(100):
    c = threading.Thread(target=customer, args=(i, random.randint(1, 4)))
    c.start()
    c.join()

while threading.active_count() != 1:
    time.sleep(0.1)
else:
    print("全部顧客都走了。")

多進程MultpProcessing

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency.

多進程的建立及進程id查看

"""
# 基本的進程建立,相似於thread的語法;
Linux中,每個進程都是由父進程啓動的;
os.getppid() 獲取當前進程的父進程的pid
os.getpid() 獲取當前進程的pid
"""
import os
import multiprocessing

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
    print("\n\n")

def f(name):
    info('\033[31;1mfunction f\033[0m')
    print('hello', name)

if __name__ == "__main__":
    info('\033[41;1mmain    \033[0m')
    p = multiprocessing.Process(target=f, args=('Bruce', ))
    p.start()
    p.join()
多進程間的通訊

因進程間的內存是彼此獨立的,進程間通訊沒法像Thread那樣直接訪問,進程間通訊依賴於中間介質;有Queues、Pipes及Managers;

Queues

區別於threading queue<僅可由同一進程的線程訪問>;進程queue用於進程間數據互訪;

進程queue實現進程通訊的過程:
  1.父進程克隆一份queue給子進程;
  2.子進程對queue進程了編輯;
  3.進程queue內部封裝了一箇中間方;
  4.子進程會把queue進行序列化<pickle>傳給中間方;
  5.中間方反序列化傳給父進程;

import multiprocessing

def f(qq):
    qq.put(["Bruce", "Lee"])

if __name__ == "__main__":
    q = multiprocessing.Queue() # 在win的pycharm下提示:PermissionError: [WinError 5] 拒絕訪問。
    #q = multiprocessing.Manager().Queue() # 變動爲這種queue即解決該問題???瞭解下
    p1 = multiprocessing.Process(target=f, args=(q, ))
    p1.start()
    print(q.get())
    p1.join()

Pipe

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way).

The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.

For example:

import multiprocessing

def f(conn):
    conn.send(["Bruce", "Lee", "From Child"])
    conn.send(["Bruce", "Lee", "From Child2"])
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = multiprocessing.Pipe() # pip會返回兩端
    p1 = multiprocessing.Process(target=f, args=(child_conn, ))
    p1.start()
    print(parent_conn.recv())
    print(parent_conn.recv())
    print(parent_conn.recv())
    p1.join()
Management

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example:

from multiprocessing import Process, Manager
import os

def f(d, l):
    d[os.getpid()] = os.getpid()
    l.append(os.getpid())
    print(l)


if __name__ == '__main__':
    with Manager() as manager: # 等同於manager = Manager()
        d = manager.dict() # 可在多個進程間傳遞和共享的字典

        l = manager.list(range(5)) # 可在多個進程間傳遞和共享的列表 #
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()

        print(d)
        print(l)

注: 表面是共享一個object,其實是複製了多份,修改的同時management已經給加了鎖;

進程同步

reason:For example,all process use the same screen<system source> to show the result。so without using the lock, output from the different processes is liable to get all mixed up.

"""
若是有個文件內是票的數量1個;
3個進程同時買票,若是沒有鎖在同一時間可能賣出>1個;
"""

from multiprocessing import Process, Lock
import json
import time

def showCounts():
    tickets = json.load(open("db.txt", "r", encoding="utf-8"))
    time.sleep(0.1)
    print("\033[41m目前還剩[%s]張票.\033[0m" % tickets["num"])

def getTicket():
    tickets = json.load(open("db.txt", "r", encoding="utf-8"))
    time.sleep(0.1)
    if tickets["num"] > 0:
        tickets["num"] -= 1
        time.sleep(2)
        json.dump(tickets,open("db.txt", "w", encoding="utf-8"))
        print('\033[42m購票成功\033[0m')

def main():
    showCounts()
    getTicket()


if __name__ == "__main__":
    for i in range(3):
        p = Process(target=main)
        p.start()
'''
Result:->成功賣出三次這是不合理的
目前還剩[1]張票.
目前還剩[1]張票.
目前還剩[1]張票.
購票成功
購票成功
購票成功
'''

而加鎖讓對db.txt的訪問由並行變爲串行

from multiprocessing import Process, Lock
import json
import time

def showCounts():
    tickets = json.load(open("db.txt", "r", encoding="utf-8"))
    time.sleep(0.1)
    print("\033[41m目前還剩[%s]張票.\033[0m" % tickets["num"])

def getTicket():
    tickets = json.load(open("db.txt", "r", encoding="utf-8"))
    time.sleep(0.1)
    if tickets["num"] > 0:
        tickets["num"] -= 1
        time.sleep(2)
        json.dump(tickets,open("db.txt", "w", encoding="utf-8"))
        print('\033[42m購票成功\033[0m')

def main(lock):
    showCounts()
    lock.acquire()
    getTicket()
    lock.release()


if __name__ == "__main__":
    procLock = Lock()
    for i in range(3):
        p = Process(target=main, args=(procLock,))
        p.start()
'''
Result:->成功賣出一次這是合理的
目前還剩[1]張票.
目前還剩[1]張票.
目前還剩[1]張票.
購票成功
'''
進程池

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。

用來解決的問題:

每一個進程的新建都會複製父進程的內容,這會消耗不少的資源;若是不對進程數量進行必定的限制,那麼過多的進程會形成系統內存的溢出;

from multiprocessing import Process, Pool
from multiprocessing import freeze_support # win中啓動多進程須要新增這
import time
import os


def Foo(i):
    time.sleep(2)
    print(i+100)
    return i + 100


def Bar(arg):
    print('-->exec done:', arg, os.getpid()) #-> -->exec done: 100 11958


if __name__ == '__main__':
    print(__name__, os.getpid())
    freeze_support()
    pool = Pool(processes=2) #容許進程池同時放入5個進程,交由CPU運行;啓動的是10個;
    for i in range(10):
        print("Process->%s" % i)
        pool.apply_async(func=Foo, args=(i,), callback=Bar)
        # callback回調函數,意爲先執行func再執行callback;
        # 能夠在執行相關操做後,寫日誌之類
        # callback是父進程進行的操做,而非子進程;
        
        #pool.apply_async(func=Foo, args=(i,))  # 異步執行->並行
        #pool.apply(func=Foo, args=(i, )) # 同步執行->串行

    print('end')
    pool.close() # 進程池關閉,須要先於join進行;
    print("pool close.")
    pool.join()  # 進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。
進程池中Process可使用get()獲取函數的返回值
from multiprocessing import Process, Pool
from multiprocessing import freeze_support # win中啓動多進程須要新增這
import time
import os


def Foo(i):
    time.sleep(2)
    print(i+100)
    return i + 100


def Bar(arg):
    print('-->exec done:', arg, os.getpid()) #-> -->exec done: 100 11958


if __name__ == '__main__':
    print(__name__, os.getpid())
    freeze_support()
    pool = Pool(processes=2)
    result = list()
    for i in range(5):
        result.append(pool.apply_async(func=Foo, args=(i,), callback=Bar))

    print('end')
    pool.close()
    print("pool close.")
    pool.join()

    for res in result:
        print("res-->%s" % res.get())
'''
result:
__main__ 936
end
pool close.
100
-->exec done: 100 936
101
-->exec done: 101 936
102
-->exec done: 102 936
103
-->exec done: 103 936
104
-->exec done: 104 936
res-->100
res-->101
res-->102
res-->103
res-->104
'''

協程(Coroutine)

協程即用戶態的輕量級線程,即協程是由用戶程序本身控制調度的;協程本質上就是一個線程,多線程的切換由操做系統控制,遇到I/O自動切換等;而協程的目的就是減小操做系統切換的開銷(開關線程、建立寄存器、堆棧等,在他們間進行切換等),由咱們本身的程序來控制任務的切換。因此協程擁有本身的寄存器上下文和棧,協程調度切換時,將寄存器上下文和棧保存到內存中,在切換回任務時恢復先前保存的寄存器上下文和棧。

以下使用yield實現是的「僞協程」

import time
import random

def customer(name):
    print("顧客%s進店準備吃包子. " % name)
    while True:
        baozi = yield "test"
        time.sleep(1)
        print("顧客%s吃了%s個%s包子. " % (name, baozi[0], baozi[1]))

menu = ["大蔥餡", "韭菜餡", "白菜粉絲", "豬肉餡", "牛肉餡", "梅乾菜肉", "豆沙餡"]
def producer():
    c1 = customer("Bruce") # 若是函數內有yield,則函數會被識別爲生成器
    c2 = customer("Elvin") # 而生成器只有next方法,或者send方法纔會運行至yield處
    c1.__next__() # 第一次運行,不可以使用send發送數據,由於未到yield處,而在函數開頭
    c2.send(None) # 只可以發送None
    for thing in menu:
        time.sleep(1)
        print("包子店作了一籠{}的包子。".format(thing))
        c1_num = random.randint(1, 7)
        c2_num = 8 - c1_num
        c1_eat = [c1_num, thing]
        c2_eat = [c2_num, thing]
        c1.send(c1_eat)
        c2.send(c2_eat)

if __name__ == "__main__":
    producer()

爲何稱yield爲僞協程呢?其實由於yield未知足協程的4個條件中的一個,條件以下:

1.必須在只有一個單線程裏實現併發;

2.修改共享數據不須要加鎖;

3.用戶程序裏本身保存多個控制流的上下文棧;

4.一個協程遇到IO操做自動切換到其餘協程;

協程的優勢:

1.無需線程上下文切換的開銷;

2.無需原子操做(注1)鎖定及同步的開銷;

3.方便切換控制流,簡化編程模型;

4.高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題,因此很適合用於高併發處理。

協程的缺點:

1.沒法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程須要和進程配合才能運行在多CPU上.固然咱們平常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用;

2.進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序。

真正的協程-模塊gevent及Greenlet

協程的本質就是在單線程下,由用戶本身控制一個任務遇到io阻塞了就切換另一個任務去執行,以此來提高效率。爲了實現它,咱們須要找尋一種能夠同時知足如下條件的解決方案:

1.能夠控制多個任務之間的切換,切換以前將任務的狀態保存下來,以便從新運行時,能夠基於暫停的位置繼續執行;

2.同時能夠檢測io操做,在遇到io操做的狀況下才發生切換。

3.考慮一個問題,遇到IO就切換,那麼何時切回來呢?

首先了解greenlet

greenlet提供了一種比generator更加便捷的切換方法,但並無實現解決方案的第2點,以下

from greenlet import greenlet

def play(name):
    print("{} play 1 times.".format(name))
    crt2.switch("Alvin") # 2 # 第一次傳入參數,第二次不須要
    print("{} play 2 times.".format(name))
    crt2.switch() # 4
    print("{} play 3 times.".format(name))
    crt2.switch() # 6

def eat(name):
    print("{} eat 1 times.".format(name))
    crt1.switch() # 3
    print("{} eat 2 times.".format(name))
    crt1.switch() # 5
    print("{} eat 3 times.".format(name))

if __name__ == "__main__":
    crt1 = greenlet(play)
    crt2 = greenlet(eat)
    crt1.switch("Bruce") # 1 # 第一次傳入參數,以後不須要

'''
result:
Bruce play 1 times.
Alvin eat 1 times.
Bruce play 2 times.
Alvin eat 2 times.
Bruce play 3 times.
Alvin eat 3 times.
'''
第三方庫Gevent

Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程; Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。

第三個問題的答案:使用callback函數,當IO操做結束後,讓操做系統調用callback函數通知相關IO操做完成。

實例建立及主要函數

# 建立一個協程對象g1,spawn括號內第一個參數是函數名,以後爲該函數所用到的參數;
g1=gevent.spawn(func,1,2,3,x=4,y=5)
g2=gevent.spawn(func)

# 等待協程執行完畢
g1.join() # 等待g1結束
g2.join() # 等待g2結束
gevent.joinall([g1, g2]) # 同時等待list內進程執行結束

# 獲取返回值
g1.value # 拿到func的返回值

模擬遇到IO時,gevent的切換

import gevent

def play(name):
    print("{} play 1 times.".format(name))
    gevent.sleep(2) # 模擬遇到IO切換
    print("{} play 2 times.".format(name))
    gevent.sleep(3)
    print("{} play 3 times.".format(name))


def eat(name):
    print("{} eat 1 times.".format(name))
    gevent.sleep(1)
    print("{} eat 2 times.".format(name))
    gevent.sleep(2)
    print("{} eat 3 times.".format(name))
    gevent.sleep(3)
    print("{} eat 4 times.".format(name))

if __name__ == "__main__":
    g1 = gevent.spawn(play, "Bruce")
    g2 = gevent.spawn(eat, "Alvin")
    g3 = gevent.spawn(play, "Amadeus")
    g4 = gevent.spawn(eat, "Google")
    gevent.joinall([g1, g2, g3, g4])

以上是使用gevent.sleep(sec)來模擬io操做,可是gevent沒法直接識別如time.sleep(sec),socket等真正的io操做,以下程序會變成串行

import gevent
import time

def play(name):
    print("{} play 1 times.".format(name))
    time.sleep(2) # 模擬遇到IO切換
    print("{} play 2 times.".format(name))
    time.sleep(3)
    print("{} play 3 times.".format(name))


def eat(name):
    print("{} eat 1 times.".format(name))
    time.sleep(1)
    print("{} eat 2 times.".format(name))
    time.sleep(2)
    print("{} eat 3 times.".format(name))


if __name__ == "__main__":
    g1 = gevent.spawn(play, "Bruce")
    g2 = gevent.spawn(eat, "Alvin")
    gevent.joinall([g1, g2])

'''
Result:
Bruce play 1 times.
Bruce play 2 times.
Bruce play 3 times.
Alvin eat 1 times.
Alvin eat 2 times.
Alvin eat 3 times.
'''

這裏須要使用monkey.patch_all(),會把當前程序以後的全部io操做打上標記,則遇到這些標記<各類IO>,gevent會自動切換,以下:

import gevent
import time

from gevent import monkey # 從gevent中導入monkey模塊
monkey.patch_all() # 必須使用早於被標記的io操做

def play(name):
    print("{} play 1 times.".format(name))
    time.sleep(2) # 模擬遇到IO切換
    print("{} play 2 times.".format(name))
    time.sleep(3)
    print("{} play 3 times.".format(name))


def eat(name):
    print("{} eat 1 times.".format(name))
    time.sleep(1)
    print("{} eat 2 times.".format(name))
    time.sleep(2)
    print("{} eat 3 times.".format(name))


if __name__ == "__main__":
    g1 = gevent.spawn(play, "Bruce")
    g2 = gevent.spawn(eat, "Alvin")
    gevent.joinall([g1, g2])

'''
Result:
Bruce play 1 times.
Alvin eat 1 times.
Alvin eat 2 times.
Bruce play 2 times.
Alvin eat 3 times.
Bruce play 3 times.
'''

使用gevent實現的socketServer及client

socketServer

import socket
import gevent
from gevent import monkey
monkey.patch_all()


def handle_fun(conn, addr):
    while True:
        try:
            rec = conn.recv(1024)
            print("From <{}> recv [{}]".format(
                addr,
                rec.decode()
            ))
            if not rec:
                print("鏈接[{}]已斷開".format(addr))
                break
            conn.send(rec.upper())
        except Exception as ex:
            print("鏈接[{}]已斷開".format(addr))
            print(repr(ex))
            break


server = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
server.bind(("127.0.0.1", 5431))
server.listen(5)


while True:
    conn, addr = server.accept()
    print("新鏈接[{}]鏈接".format(addr))
    gevent.spawn(handle_fun, conn, addr)
客戶端
import socket
import gevent
import time
import random
from gevent import monkey
monkey.patch_all()

def client_soc(msg):
    client = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
    client.connect(("127.0.0.1", 5431))
    while True:
        time.sleep(random.randint(1, 10))
        if isinstance(msg, int):
            msg += 1
        else:
            msg = int(msg)
            msg += 1
        if not msg:
            continue
        elif msg > 150:
            break
        msg = str(msg)
        client.send(msg.encode())
        print(client.recv(1024))

list1 = []
for i in range(100):
    list1.append(gevent.spawn(client_soc, i))

gevent.joinall(list1)











end

注1:原子操做(atomic operation):所謂原子操做是指不會被線程調度機制打斷的操做;這種操做一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另外一個線程)。原子操做能夠是一個步驟,也能夠是多個操做步驟,可是其順序是不能夠被打亂,或者切割掉只執行部分。視做總體是原子性的核心。

參考文檔:

GIL相關:  https://www.cnblogs.com/value-code/p/8572852.html

      https://blog.csdn.net/weixin_41594007/article/details/79485847

進程鎖:https://blog.csdn.net/qq_36357820/article/details/88743497

進程池:http://www.javashuo.com/article/p-bccpxxqz-gh.html

總文檔參考:https://www.cnblogs.com/alex3714/articles/5230609.html

相關文章
相關標籤/搜索