併發編程

操做系統

系統是一個有程序員寫出來的軟件, 該軟件用於控制計算機的硬件,讓他們之間進行相互配合python

併發與並行

併發:僞,好比吃飯時,電話響了,停下吃飯動做去接電話就是併發,至關於能夠處理多個任務,但不能同時處理react

並行:真,如果一遍吃飯一遍接電話就是並行,至關於能夠處理多個任務,且能夠同時進行git

線程與進程

一個程序至少有一個進程和一個線程,一個CPU一次只能執行一個進程,而一個進程中能夠有一個線程,也能夠有多個線程,線程是程序執行的最小單位程序員

1.由於線程是CPU工做的最小單元,建立線程能夠利用CPU多核的優點實現並行操做(不適用於Python)github

2.進程與進程之間是數據隔離的(jave/c#)進程是爲線程創造工做環境web

3.Python中存在一個GIL鎖,所以只能一個進程對應一個線程,就不能利用多核的優點,只能經過多線程的方式(但會形成資源的浪費)c#

4.當python使用多線程處理I/O密集型時,能夠提升效率,當計算密集型時只能經過多進程安全

線程

線程的建立

第一種函數式多線程

def func(arg):
    print(arg)


t1 = threading.Thread(target=func, args=(11, ))
t1.start()
View Code

第二種類方式併發

class MyThread(threading.Thread):
    def run(self):
        print(self._args)


t1 = MyThread(args=(11, ))
t1.start()
View Code2

主線程默認等子線程執行完後再執行

def func(arg):
    print(arg)


t1 = threading.Thread(target=func, args=(11, ))
t1.start()

t2 = threading.Thread(target=func, args=(22, ))
t2.start()
print("主線程")

setDaemon

默認值爲False,當爲True時,主線程再也不等子線程,當主線程結束時終止全部的子線程

def func(arg):
    print(arg)


t1 = threading.Thread(target=func, args=(11, ))
t1.setDaemon(True)
t1.start()

t2 = threading.Thread(target=func, args=(22, ))
t2.setDaemon(True)
t2.start()
print("主線程")

join

開發者可使用join()來控制主線程等子線程的時間

def func(arg):
    print(arg)


t1 = threading.Thread(target=func, args=(11, ))
t1.start()
t1.join(2)

t2 = threading.Thread(target=func, args=(22, ))
t2.start()
t2.join(2)
print("主線程")

線程名稱

def func(arg):
    t = threading.current_thread()
    name = t.getName()
    print(arg, name)


t1 = threading.Thread(target=func, args=(11, ))
t1.setName("線程一")
t1.start()

t2 = threading.Thread(target=func, args=(22, ))
t2.setName("線程二")
t2.start()
print("主線程")

start

start不是表明開始運行線程,而是向cpu發送信息代表本身準備就緒,但是被CPUdcdu調度了,可是CPU是否會當即調度要取決於CPU的運算

 鎖

1.當線程安全時(列表和字典),鎖會在內部讓多個線程排隊操做

2.當線程不安全時,此時的排隊處理還能夠起到線程安全的做用

3.一次只放一個線程

lock

import threading

lst = []
lock = threading.Lock()


def func_lock(args):
    lock.acquire()  # 加鎖 
    lst.append(args)
    m = lst[-1]
    print(args, m)
    lock.release()  # 解鎖


for i in range(10):  # 建立十個線程
    t = threading.Thread(target=func_lock, args=(i,))
    t.start()
lock

rlock

import threading
import time

lst = []
lock = threading.RLock()


def func_lock(args):
    lock.acquire()  # 加鎖
    lock.acquire()  # 再次加鎖
    lst.append(args)
    time.sleep(1)
    m = lst[-1]
    print(args, m)
    lock.release()  # 解鎖
    lock.release()  # 不會鎖死


for i in range(10):  # 建立十個線程
    t = threading.Thread(target=func_lock, args=(i,))
    t.start()
rlock

信號量

Semaphore一次放n個

import threading
import time


lock = threading.BoundedSemaphore(3)


def func_lock(args):
    lock.acquire()  # 加鎖
    time.sleep(1)
    print(args)
    lock.release()  # 解鎖


for i in range(10):  # 建立十個線程
    t = threading.Thread(target=func_lock, args=(i,))
    t.start()
Semphore

條件

condition經過一次方法放入指定個數

import threading
import time


lock = threading.Condition()


def func_lock(args):
    lock.acquire()
    lock.wait()  # 加鎖
    time.sleep(0.1)
    print("\n"+str(args))
    lock.release()  # 解鎖


for i in range(10):  # 建立十個線程
    t = threading.Thread(target=func_lock, args=(i,))
    t.start()

while True:
    inp = int(input("放入的數量>>>>"))
    lock.acquire()
    lock.notify(inp)
    lock.release()
condition
import threading
import time


lock = threading.Condition()


def func_lock():
    input("放入數量>>>>")
    return True


def func(args):
    lock.wait_for(func_lock)
    print(args)
    time.sleep(1)


for i in range(10):  # 建立十個線程
    t = threading.Thread(target=func, args=(i,))
    t.start()
2

事件

控制放開與禁止,一旦放開就是放開所有線程

import threading
import time


lock = threading.Event()


def func_lock(args):
    lock.wait()  # 加鎖
    time.sleep(1)
    print(args)


for i in range(10):  # 建立十個線程
    t = threading.Thread(target=func_lock, args=(i,))
    t.start()

input("放行>>>>")
lock.set()

input("中止放行>>>>")
lock.clear()

for i in range(10):  # 建立十個線程
    t = threading.Thread(target=func_lock, args=(i,))
    t.start()

input("放行>>>>")
lock.set()
Event

threading.local

內部自動爲每一個線程維護一個空間(字典),用於當前存儲屬於本身的值,以保證線程之間的數據隔離

import threading
import time


lock = threading.local()


def func(name, age):
    # 內部會爲當前線程建立一個空間用於儲存
    lock.name = name
    lock.age = name
    time.sleep(1)
    print(lock.name, name)
    print(lock.age, age)


for i in range(10):  # 建立十個線程
    t = threading.Thread(target=func, args=(i, i+10))
    t.start()
threading.local
import time
import threading

DATA_DICT = {}


def func(arg):
    ident = threading.get_ident()  # 獲取表明此線程線程的值
    DATA_DICT[ident] = arg  # 線程的值做爲字典的key, arg做爲values
    time.sleep(1)
    print(DATA_DICT[ident], arg)


for i in range(10):
    t = threading.Thread(target=func, args=(i,))
    t.start()
loacl實現原理
import time
import threading
INFO = {}


class Local(object):

    def __getattr__(self, item):
        ident = threading.get_ident()
        return INFO[ident][item]

    def __setattr__(self, key, value):
        ident = threading.get_ident()
        if ident in INFO:
            INFO[ident][key] = value  # INFO[ident]是字典indent鍵對應的值,此值是一個字典
        else:
            INFO[ident] = {key: value}


obj = Local()


def func(arg):
    obj.phone = arg  # 調用對象的 __setattr__方法(「phone」,1)
    time.sleep(2)
    print(obj.phone, arg)


for i in range(10):
    t = threading.Thread(target=func, args=(i,))
    t.start()
類方式實現原理

線程池

from concurrent.futures import ThreadPoolExecutor
import time


def task(x, y):
    time.sleep(2)
    print(x, y)


# 建立5個線程池
pool = ThreadPoolExecutor(5)

for i in range(10):
    # 去線程池申請一個線程執行task函數
    pool.submit(task, i, i + 10)
pool

生產者消費者模型

三部件:消費者

      隊列:先入先出

      棧:先入後出

    生產者

import time
import queue
import threading

q = queue.Queue()  # 線程安全


def producer(id):
    """
    生產者
    :return:
    """
    while True:
        time.sleep(2)
        q.put('包子')
        print('廚師%s 生產了一個包子' % id)


for i in range(1, 4):
    t = threading.Thread(target=producer, args=(i,))
    t.start()


def consumer(id):
    """
    消費者
    :return:
    """
    while True:
        time.sleep(1)
        v1 = q.get()
        print('顧客 %s 吃了一個包子' % id)


for i in range(1, 3):
    t = threading.Thread(target=consumer, args=(i,))
    t.start()
demo

 進程

進程建立

import multiprocessing


def func(args):
    print(args)


def run():
    for i in range(10):
        t = multiprocessing.Process(target=func, args=(i,))
        t.start()


if __name__ == '__main__':
    run()
function
class MyProcess(multiprocessing):
    def run(self):
        print("當前進程", multiprocessing.current_process())


def run():
    t = MyProcess()
    t.start()


if __name__ == '__main__':
    run()
class

進程的經常使用功能

進程的經常使用功能與線程相似,只是使用時把線程的方式換爲進程就能夠

join 等子進程的最多時間,不填則等完
deamon 是否等子進程, 默認爲False,不等
name 查看當前進程名字
multiprocessing.current_process() 查看當前進程名字
multiprocessing.current_process().ident/pid 查看當前進程id

進程間的數據共享

進程之間的數據是不能夠共享的,可是爲了某些需求,有進程間數據共享的須要時,能夠經過下面的方式進行

Queue

import multiprocessing


def func(args, q):
    q.put(args)


if __name__ == '__main__':
    q = multiprocessing.Queue()
    for i in range(10):
        t = multiprocessing.Process(target=func, args=(i, q, ))
        t.start()
    while True:
        v = q.get()
queue

Manager

import multiprocessing


def func(args, dic):
    dic[args] = 123


if __name__ == '__main__':
    m = multiprocessing.Manager()
    dic = m.dict()
    lst = []
    for i in range(10):
        t = multiprocessing.Process(target=func, args=(i, dic, ))
        t.start()
        lst.append(t)
    while True:
        n = 0
        for el in lst:
            if not el.is_alive():  # 進程是否存活
                n += 1
        if n == len(lst):
            break
    print(dic)
manager

進程鎖

與線程類似,參考線程

進程池

import time
from concurrent.futures import ProcessPoolExecutor


def task(arg):
    time.sleep(2)
    print(arg)


if __name__ == '__main__':

    pool = ProcessPoolExecutor(5)
    for i in range(10):
        pool.submit(task, i)
pool

單線程實現併發

- 協程+IO切換:gevent
- 基於事件循環的異步非阻塞框架:Twisted
                

 IO多路複用

 IO多路複用是爲了檢測多個socket是否已經發生變化,(是否鏈接成功,是否已經獲取數據,可讀可寫)

操做系統檢測socket是否發生變化

select:最多1024個socket;循環去檢測。
poll:不限制監聽socket個數;循環去檢測(水平觸發)。
epoll:不限制監聽socket個數;回調方式(邊緣觸發)。

python模塊檢測socket是否發生變化

import selcet
select.select 
select.epoll 

異步非阻塞

 socket非阻塞

非阻塞就是不等待,socket是默認阻塞的,固然但是改變它是否阻塞,它的阻塞體如今connect與recv

client.setblocking(False) # 默認爲True
# 會報BlockingIOError的錯誤,只要捕獲便可。

 異步

指通知,當執行完以後,自動執行回調函數或者自動執行某些操做(通知)

from twisted.web.client import getPage, defer
from twisted.internet import reactor


def all_done(arg):
    reactor.stop()


def callback(contents):
    print(contents)


deferred_list = []
url_list = ['http://www.bing.com', 'http://www.baidu.com', ]
for url in url_list:
    deferred = getPage(bytes(url, encoding='utf8'))
    deferred.addCallback(callback)
    deferred_list.append(deferred)

dlist = defer.DeferredList(deferred_list)
dlist.addBoth(all_done)

reactor.run()
twisted

同步阻塞

對比非阻塞,阻塞固然指的就是等了,等一步完成後才進行下一步,就是嚴格按順序來執行

協程

協程是程序員創造出來了的,而不是一個真正存在的東西,單純的協程沒有實際的用處,通常都是配合IO操做使用的

協程就是微線程,是對一個線程進程的分片,使得線程能夠在代碼塊之間能夠來回的切換執行,而不是逐行的執行

協程與IO操做

from gevent import monkey
monkey.patch_all() # 之後代碼中遇到IO都會自動執行greenlet的switch進行切換
import requests
import gevent


def get_page1(url):
    ret = requests.get(url)
    print(url,ret.content)

def get_page2(url):
    ret = requests.get(url)
    print(url,ret.content)

def get_page3(url):
    ret = requests.get(url)
    print(url,ret.content)

gevent.joinall([
    gevent.spawn(get_page1, 'https://www.python.org/'), # 協程1
    gevent.spawn(get_page2, 'https://www.yahoo.com/'),  # 協程2
    gevent.spawn(get_page3, 'https://github.com/'),     # 協程3
])

 進程,線程,協程的區別

       進程是計算機資源分配的最小單元,主要用來作數據隔離,線程是工做的最小單元,真正進行工做的其實就是線程.一個進程裏能夠有多個線程,一個應用程序裏能夠有多個進程,對於其餘語言來講幾乎用不到進程,他們使用的都是線程,而對於Python來講,對於IO密集型操做使用線程,對於計算密集型操做使用進程.由於python有GIL鎖,它的做用是使一個進程中同一時刻只能有一個線程被CPU調度,因此想使用CPU的多核優點只能使用多個進程,而IO操做佔用不多的CPU,因此使用多線程

       協程是程序員創造出來的,它自己是不存在的,它是用來可讓程序員能夠控制代碼的執行順序,它自己存在沒有什麼意義,可是一旦它與IO切換放在一塊兒,它的價值就大了,它能夠人爲的控制使程序遇到IO就切換到別的任務,IO操做回來時繼續執行,這樣就可以使使線程的工做不會停,讓線程一直工做,python在使用協程時主要是經過greenlet的模塊,使用協程+IO操做時使用的是gevent模塊

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息