Python中的多線程沒法利用多核優點 , 因此若是咱們想要充分地使用多核CPU的資源 , 那麼就只能靠多進程了html
multiprocessing模塊中提供了Process , Queue , Pipe , Lock , RLock , Event , Condition等組件 , 與threading模塊有不少類似之處python
from multiprocessing import Process import time def func(name): time.sleep(2) print('hello',name) if __name__ == '__main__': p= Process(target=func,args=('derek',)) p.start() # p.join() print('end...')
不一樣進程間內存是不共享的,要想實現兩個進程間的數據交換。進程間通訊有兩種主要形式 , 隊列和管道github
from multiprocessing import Process, Queue #Queue是進程排列 def f(test): test.put('22') #經過建立的子進程往隊列添加數據,實線父子進程交互 if __name__ == '__main__': q = Queue() #父進程 q.put("11") p = Process(target=f, args=(q,)) #子進程 p.start() p.join() print("取到:",q.get_nowait()) print("取到:",q.get_nowait()) #父進程在建立子進程的時候就把q克隆一份給子進程 #經過pickle序列化、反序列化,來達到兩個進程之間的交互 結果: 取到: 11 取到: 22
The Pipe()
function returns a pair of connection objects connected by a pipe which by default is duplex (two-way).windows
from multiprocessing import Process, Pipe def f(conn): conn.send('11') conn.send('22') print("from parent:",conn.recv()) print("from parent:", conn.recv()) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() #生成管道實例,能夠互相send()和recv() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "11" print(parent_conn.recv()) # prints "22" parent_conn.send("33") # parent 發消息給 child parent_conn.send("44") p.join()
進程之間是相互獨立的 ,Queue和pipe只是實現了數據交互,並沒實現數據共享,Manager能夠實現進程間數據共享 。多線程
Manager還支持進程中的不少操做 , 好比Condition , Lock , Namespace , Queue , RLock , Semaphore等併發
from multiprocessing import Process, Manager import os def f(d, l): d[os.getpid()] =os.getpid() l.append(os.getpid()) print(l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() #{} #生成一個字典,可在多個進程間共享和傳遞 l = manager.list(range(5)) #生成一個列表,可在多個進程間共享和傳遞 p_list = [] for i in range(2): p = Process(target=f, args=(d, l)) p.start() p_list.append(p) for res in p_list: #等待結果 res.join() print(d) print(l)
from multiprocessing import Process, Lock def f(l, i): #l.acquire() print('hello world', i) #l.release() if __name__ == '__main__': lock = Lock() for num in range(100): Process(target=f, args=(lock, num)).start() #要把lock傳到函數的參數l #lock防止在屏幕上打印的時候會亂
- apply:從進程池裏取一個進程並執行
- apply_async:apply的異步版本
- terminate:馬上關閉線程池
- join:主進程等待全部子進程執行完畢,必須在close或terminate以後
- close:等待全部進程結束後,才關閉線程池
from multiprocessing import Process, Pool import time import os def Foo(i): time.sleep(2) print("in process",os.getpid()) return i + 100 def Bar(arg): print('-->exec done:', arg,os.getpid()) if __name__ == '__main__': #多進程,必須加這一句(windows系統) pool = Pool(processes=3) #容許進程池同時放入3個進程 print("主進程",os.getpid()) for i in range(10): pool.apply_async(func=Foo, args=(i,), callback=Bar) #callback=回調,執行完Foo(),接着執行Bar() # pool.apply(func=Foo, args=(i,)) #串行 print('end') pool.close() pool.join() #進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。必須先close(),再join()
協程(Coroutine) : 是單線程下的併發 , 又稱微線程 , 纖程 . 協程是一種用戶態的輕量級線程 , 即協程有用戶本身控制調度
優勢 :
- 協程的切換開銷更小 , 屬於程序級別的切換 , 更加輕量級
- 單線程內就能夠實現併發的效果 , 最大限度利用CPU
缺點 :
- 協程的本質是單線程下 , 沒法利用多核 , 能夠是一個程序開啓多個進程 , 每一個進程內開啓多個線程 , 每一個線程內開啓協程
- 協程指的是單個線程 , 於是一旦協程出現阻塞 將會阻塞整個線程
from greenlet import greenlet def test1(): print(12) gr2.switch() #到這裏切換到gr2,執行test2() print(34) gr2.switch() #切換到上次gr2運行的位置 def test2(): print(56) gr1.switch() #切換到上次gr1運行的位置 print(78) gr1 = greenlet(test1) #啓動一個協程gr1 gr2 = greenlet(test2) #啓動一個協程gr2 gr1.switch() #開始運行gr1
Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。
import gevent def foo(): print('Running in foo') gevent.sleep(2) print('阻塞時間最長,最後運行') def bar(): print('running in bar') gevent.sleep(1) print('foo()還在阻塞,這裏第二個運行') def func3(): print("running in func3 ") gevent.sleep(0) print("其它兩個還在IO阻塞先運行") #建立協程實例 gevent.joinall([ gevent.spawn(foo), #生成, gevent.spawn(bar), gevent.spawn(func3), ]) #遇到IO自動切換 結果: Running in foo running in bar running in func3 其它兩個還在IO阻塞先運行 foo()還在阻塞,這裏第二個運行 阻塞時間最長,最後運行 Process finished with exit code 0
因爲切換是在IO操做時自動完成,因此gevent須要修改Python自帶的一些標準庫,這一過程在啓動時經過monkey patch完成:
from urllib import request import gevent,time from gevent import monkey monkey.patch_all() #做用:把當前程序的全部的io操做給我單獨的作上標記 def f(url): print('GET: %s' % url) resp = request.urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url)) #同步須要的時間 urls = ['https://www.python.org/', 'https://www.yahoo.com/', 'https://github.com/' ] time_start = time.time() for url in urls: f(url) print("同步cost",time.time() - time_start) #下面是異步花費的時間 async_time_start = time.time() gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'), ]) print("異步cost",time.time() - async_time_start) 結果: GET: https://www.python.org/ 48954 bytes received from https://www.python.org/. GET: https://www.yahoo.com/ 491871 bytes received from https://www.yahoo.com/. GET: https://github.com/ 51595 bytes received from https://github.com/. 同步cost 4.928282260894775 GET: https://www.python.org/ GET: https://www.yahoo.com/ GET: https://github.com/ 48954 bytes received from https://www.python.org/. 494958 bytes received from https://www.yahoo.com/. 51599 bytes received from https://github.com/. 異步cost 1.4920852184295654
,特色是根據平臺自動選擇最佳IO多路複用機制,調用順序:epoll > poll > select
作一個socket servers
import selectors import socket sel = selectors.DefaultSelector() # 根據平臺自動選擇最佳IO多路複用機制 def accept(sock, mask): conn, addr = sock.accept() # Should be ready # print('accepted', conn, 'from', addr,mask) conn.setblocking(False) #設置爲非阻塞IO sel.register(conn, selectors.EVENT_READ, read) #新鏈接註冊read回調函數 #將conn和read函數註冊到一塊兒,當conn有變化時執行read函數 def read(conn, mask): data = conn.recv(1024) # Should be ready if data: print('echoing', repr(data), 'to', conn) conn.send(data) # Hope it won't block else: print('closing', conn) sel.unregister(conn) conn.close() sock = socket.socket() sock.bind(('localhost', 9999)) sock.listen(100) sock.setblocking(False) #設置爲非阻塞IO sel.register(sock, selectors.EVENT_READ, accept) # 將sock和accept函數註冊到一塊兒,當sock有變化時執行accept函數 while True: events = sel.select() #默認阻塞,有活動鏈接就返回活動的鏈接列表,監聽[(key1,mask1),(key2),(mask2)] for key, mask in events: callback = key.data #accept #1 key.data就是accept # 2 key.data就是read callback(key.fileobj, mask) #key.fileobj= 文件句柄 # 1 key.fileobj就是sock # 2 key.fileobj就是conn
import socket import sys messages = [ b'This is the message. ', b'It will be sent ', b'in parts.', ] server_address = ('localhost', 9999) # Create a TCP/IP socket socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(5)] print(socks) # Connect the socket to the port where the server is listening print('connecting to %s port %s' % server_address) for s in socks: s.connect(server_address) for message in messages: # Send messages on both sockets for s in socks: print('%s: sending "%s"' % (s.getsockname(), message) ) s.send(message) # Read responses on both sockets for s in socks: data = s.recv(1024) print( '%s: received "%s"' % (s.getsockname(), data) ) if not data: print( 'closing socket', s.getsockname() )