python學習第十天

線程鎖使用場景python

多個線程同時修改同一份數據時必須加鎖即線程鎖git

 

隊列的做用github

解耦,使程序之間實現鬆耦合編程

提升處理效率多線程

 

隊列和列表處理數據的區別併發

隊列數據只有一份,取走就沒有了,列表數據取出後仍然存在於列表中app

 

python多線程使用場景異步

python多線程不適合CPU操做(計算)密集型任務,適合IO操做(數據讀寫收發)密集型任務socket

 

每一個進程都有父進程async

 

進程間通訊

進程queue

兩個進程的queue經過序列化共享數據

from multiprocessing import Process,Queue

def run(qq):

    qq.put(1)

'''定義主進程隊列'''

q=Queue()

'''將主進程隊列拷貝給子進程,子進程向隊列中放入數據'''

p =Process(target=run,args=(q,))

p.start()

'''經過pickle序列化將子進程隊列的數據傳遞給主進程'''

print(q.get())

 

管道Pipes

使用方法相似socket

from multiprocessing import Process, Pipe

def f(conn):

    conn.send([42, None, 'hello'])

    print("from parent:",conn.recv())

    conn.close()

if __name__ == '__main__':

    parent_conn, child_conn = Pipe()

    p = Process(target=f, args=(child_conn,))

    p.start()

    print(parent_conn.recv())  # prints "[42, None, 'hello']"

    parent_conn.send("ok")

 

Managers

真正的進程數據共享

from multiprocessing import Process,Manager

import os

def f(d,l):

    d[os.getpid()] =os.getpid()

    l.append(os.getpid())

    print(l)

'''能夠寫成manager =Manager()'''

with Manager() as manager:

    d =manager.dict()

    l =manager.list(range(5))

    p_list =[]

    for i in range(10):

        p =Process(target=f,args=(d,l))

        p.start()

        p_list.append(p)

    for j in p_list:

        j.join()

    print(d)

    print(l)

 

進程鎖

保證屏幕輸出結果不亂

from multiprocessing import Process,Lock

def f(l,i):

    l.acquire()

    print("hello world",i)

    l.release()

lock =Lock()

for i in range(10):

    p =Process(target=f,args=(lock,i))

    p.start()

 

進程池

apply  串行

apply_async  並行

from  multiprocessing import Process, Pool

import time,os

def Foo(i):

    time.sleep(2)

    print(os.getpid())

    return i + 100

def Bar(arg):

    print('-->exec done:', arg)

'''定義線程池,同時只能執行5個進程'''

pool = Pool(5)

for i in range(10):

    '''併發執行,當子線程執行完前面的函數後由主線程調用callback指定的函數'''

    pool.apply_async(func=Foo, args=(i,), callback=Bar)

    '''串行執行'''

    # pool.apply(func=Foo, args=(i,))

print('end')

pool.close()

pool.join()  # 進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。

 

協程的優缺點

優勢:

無需線程上下文切換的開銷

無需原子操做鎖定及同步的開銷

方便切換控制流,簡化編程模型

高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。因此很適合用於高併發處理

 

缺點:

沒法利用多核資源

進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序

 

協程處理邏輯

遇到IO就切換

 

greenlet和gevent模塊

greenlet須要用戶本身定義協程切換位置

gevent自動切換,在配合urllib、socket等個別模塊使用時須要額外導入monkey模塊標記IO操做

greenlet模塊

from greenlet import greenlet

def test1():

    print(12)

    r2.switch()

    print(45)

    r2.switch()

    print(67)

def test2():

    print(34)

    r1.switch()

    print(56)

    r1.switch()

r1 =greenlet(test1)

r2 =greenlet(test2)

r1.switch()

 

gevent模塊

import gevent

def foo():

    print("1")

    gevent.sleep(2)

    print("1-1")

def bar():

    print("2")

    gevent.sleep(1)

    print("2-2")

def func():

    print("3")

    gevent.sleep(0)

    print("3-3")

gevent.joinall([gevent.spawn(foo),gevent.spawn(bar),gevent.spawn(func)])

 

from urllib import request

import gevent,time

from gevent import monkey

monkey.patch_all()

def f(url):

    print("GET: %s"%url)

    resp =request.urlopen(url)

    data =resp.read()

    print("%d bytes received from %s"%(len(data),url))

urls =['https://www.python.org/','https://www.yahoo.cpm/','https://github.com/']

start_time =time.time()

for url in urls:

    f(url)

print("cost:",time.time()-start_time)

start_time2 =time.time()

gevent.joinall(gevent.spawn[f,'https://www.python.org/'],

               gevent.spawn([f,'https://www.yahoo.cpm/'],

               gevent.spawn([f,'https://github.com/'])))

print("cost:",time.time()-start_time2)

 

事件驅動模型

一個線程接收事件並放入消息隊列中,另外一個線程從隊列中取出事件處理

 

IO方案

阻塞   單線程請求數據阻塞,數據從內核拷貝到應用內存阻塞

非阻塞  單線程請求數據當即返回錯誤,客戶端不停的請求,數據從內核拷貝到應用內存阻塞

多路複用  單線程包含多個請求讓內核幫助監聽,只要有一個請求的數據準備好了就通知線程,select不通知哪一個線程準備好了,epoll會通知。數據從內核拷貝到應用內存阻塞

異步  線程請求數據當即返回,當數據準備好時直接拷貝到應用內存並通知線程

 

selectors模塊

import selectors

import socket

sel = selectors.DefaultSelector()

def accept(sock, mask):

    conn, addr = sock.accept()  # Should be ready

    print('accepted', conn, 'from', addr)

    conn.setblocking(False)

    sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):

    data = conn.recv(1000)  # Should be ready

    if data:

        print('echoing', repr(data), 'to', conn)

        conn.send(data)  # Hope it won't block

    else:

        print('closing', conn)

        sel.unregister(conn)

        conn.close()

sock = socket.socket()

sock.bind(('localhost', 10000))

sock.listen(100)

sock.setblocking(False)

sel.register(sock, selectors.EVENT_READ, accept)

while True:

    events = sel.select()

    for key, mask in events:

        callback = key.data

        callback(key.fileobj, mask)

相關文章
相關標籤/搜索