建立/撤銷/切換 進程的開銷遠大於線程的(建立線程比建立進程快10~100倍 UNPv2/P406)。
孤兒進程 —— 父進程退出,而它的一個或多個子進程還在運行,那麼那些子進程將成爲孤兒進程。孤兒進程將被init進程(進程號爲1)所收養,並由init進程對它們完成狀態收集工做。
殭屍進程 —— 一個進程使用fork建立子進程,若是子進程退出,而父進程並無調用wait或waitpid獲取子進程的狀態信息,那麼子進程的進程描述符仍然保存在系統中。
守護進程 —— 守護進程的父進程是init進程,由於它真正的父進程在fork出子進程後就先於子進程exit退出了,因此它是一個由init繼承的孤兒進程。不須要用戶輸入就能運行並且提供某種服務,不是對整個系統就是對某個用戶程序提供服務。常見的守護進程包括系統日誌進程syslogd、 web服務器httpd、郵件服務器sendmail和數據庫服務器mysqld等。
UNP 分了如下幾中形式的IPC:
在 Linux 中,管道的實現藉助了文件系統的file結構和VFS的索引節點inode。經過將兩個file結構指向同一個臨時的VFS索引節點,而這個VFS索引節點又指向一個物理頁面而實現的。
(即不嘗試得到mutex,而直接修改共享變量),互斥鎖不能阻止其修改。因此,互斥鎖機制須要程序員本身來寫出完善的程序來實現互斥鎖的功能(如下鎖 同樣)。
條件變量學名叫管程(monitor)【From muduo P40】。
讀寫鎖也叫作 共享-獨佔鎖,容許更高的併發度。
由fork建立的新進程被稱爲子進程(child process)。該函數被調用一次,但返回兩次。兩次返回的區別是子進程的返回值是0,而父進程的返回值則是新進程(子進程)的進程 id。將子進程id返回給父進程的理由是:由於一個進程的子進程能夠多於一個,沒有一個函數使一個進程能夠得到其全部子進程的進程id。對子進程來講,之因此fork返回0給它,是由於它隨時能夠調用getpid()來獲取本身的pid;也能夠調用getppid()來獲取父進程的id。(進程id 0老是由交換進程使用,因此一個子進程的進程id不可能爲0 )。
6.執行時關閉(close-on-exec) 標誌 (譯者注:close-on-exec標誌可經過fnctl()對文件描述符設置,POSIX.1要求全部目錄流都必須在exec函數調用時關閉。更詳細說明,參見《UNIX環境高級編程》 W. R. Stevens, 1993, 尤晉元等譯(如下簡稱《高級編程》), 3.13節和8.9節)
8.nice值 (譯者注:nice值由nice函數設定,該值表示進程的優先級,數值越小,優先級越高)
進程調度類別(scheduler class)(譯者注:進程調度類別指進程在系統中被調度時所屬的類別,不一樣類別有不一樣優先級,根據進程調度類別和nice值,進程調度程序可計算出每一個進程的全局優先級(Global process prority),優先級高的進程優先執行)
9.對話期ID(Session ID) (譯者注:譯文取自《高級編程》,指:進程所屬的對話期(session)ID, 一個對話期包括一個或多個進程組, 更詳細說明參見《高級編程》9.5節)
11.根目錄 (譯者注:根目錄不必定是「/」,它可由chroot函數改變)
12.文件方式建立屏蔽字(file mode creation mask (umask))(譯者注:譯文取自《高級編程》,指:建立新文件的缺省屏蔽字)
1.不一樣的父進程號(譯者注:即子進程的父進程號與父進程的父進程號不一樣, 父進程號可由getppid函數獲得)
3.子進程不繼承父進程的進程,正文(text), 數據和其它鎖定內存(memory locks)(譯者注:鎖定內存指被鎖定的虛擬內存頁,鎖定後,4.不容許內核將其在必要時換出(page out),詳細說明參見《The GNU C Library Reference Manual》 2.2版, 1999, 3.4.2節)
5.在tms結構中的系統時間(譯者注:tms結構可由times函數得到,它保存四個數據用於記錄進程使用中央處理器 (CPU:Central Processing Unit)的時間,包括:用戶時間,系統時間, 用戶各子進程合計時間,系統各子進程合計時間)
6.資源使用(resource utilizations)設定爲0
ret = os.fork() #後面的代碼子程序和主程序都會執行,只不過寫在一個程序裏了 if ret == 0: child_suite # 子進程代碼 else: parent_suite # 父進程代碼
import os import time print "Before fork process pid=%s, ppid=%s" % (os.getpid(), os.getppid()) pid = os.fork() if pid == 0: print "I am child process pid=%s, ppid=%s" % (os.getpid(), os.getppid()) time.sleep(5) else: print "I am parent process pid=%s, ppid=%s" % (os.getpid(), os.getppid()) time.sleep(5) # 下面的內容會被打印兩次,一次是在父進程中,一次是在子進程中。 print "After fork process pid=%s, ppid=%s" % (os.getpid(), os.getppid()) #######輸出以下 Before fork process pid=3507, ppid=2952 i am parent process pid=3507, ppid=2952 i am child process pid=3509, ppid=3507 After fork process pid=3507, ppid=2952 After fork process pid=3509, ppid=3507
import multiprocessing import time import os import psutil TIME =1 def test1(args1): print("這裏是test1,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test1,個人num是%d 個人名字是是%s"%(args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) def test2(args1): print("這裏是test2,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test2,個人num是%d 個人名字是是%s" % (args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) def test3(args1): print("這裏是test3,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test3,個人num是%d 個人名字是是%s" % (args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) if __name__ == '__main__': start1 = time.process_time() start2 = time.time() print("這裏是主進程,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) num = 10 tlist = [] t1 = multiprocessing.Process(target= test1, args=(num,), name="test1") #name是類名字 tlist.append(t1) t2 = multiprocessing.Process(target= test2, args=(num,), name="test2") tlist.append(t2) t3 = multiprocessing.Process(target= test3, args=(num,), name="test3") tlist.append(t3) for process in tlist: process.start() for process in tlist: process.join() end1 = time.process_time() end2 = time.time() print("總共用時:", end2- start2,"秒,其中CPU計算時間爲",end1-start1,"秒") #####輸出以下 這裏是主進程,個人pid是28120 個人父進程是14632 這裏是test1,個人pid是2496 個人父進程是28120 這裏是test1,個人num是10 個人名字是是python.exe 這裏是test2,個人pid是29128 個人父進程是28120 這裏是test2,個人num是10 個人名字是是python.exe 這裏是test3,個人pid是28588 個人父進程是28120 這裏是test3,個人num是10 個人名字是是python.exe 總共用時: 1.7347896099090576 秒,其中CPU計算時間爲 0.09375 秒
procs=[Process(target=func1,args=(s,))for i in xrange(10)] 這種方式初始化更帥
import multiprocessing import time TIME = 2 def test1(args1): print("這裏是test1,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test1,個人num是%d 個人名字是是%s"%(args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) class MyProcess(multiprocessing.Process): def __init__(self, num, *args , **kwargs): super(MyProcess, self).__init__(*args, **kwargs) self.num = num def run(self): print("開始重寫run了") i = 0 while True: time.sleep(1) i += 1 print("子進程",i) if i == self.num: break if __name__ == '__main__': p = MyProcess(6, target= test1, args= (10, )) #說明target的目標函數被修改了,只執行run方法 p.start() p.join(2) while True: if p.is_alive(): print("父進程") time.sleep(1) elif not p.is_alive(): print("子進程結束了") break
#!/usr/bin/env python3 # encoding: utf-8 """ @Version: ?? @Author: Administrator 周廣露 @Contact: zhouguanglu2012@163.com @Site: http://www.baidu.com @Software: PyCharm @File: multiP @Created_Time: 2019/1/25 18:35 """ import multiprocessing import time import os import psutil TIME =1 def fun_call(): print("我這裏執行完了一個") def test1(args1): print("這裏是test1,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test1,個人num是%d 個人名字是是%s"%(args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) def test2(args1): print("這裏是test2,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test2,個人num是%d 個人名字是是%s" % (args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) def test3(args1): print("這裏是test3,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test3,個人num是%d 個人名字是是%s" % (args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) if __name__ == '__main__': pool = multiprocessing.Pool(2) start1 = time.process_time() start2 = time.time() print("這裏是主進程,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) num = 10 # pool.apply_async(test1, (num,)) #name是類名字 # pool.apply_async(test2, (num,)) # pool.apply_async(test3, (num,)) #先加進去,而後慢慢執行 pool.apply(test1, (num,)) pool.apply(test2, (num,)) pool.apply(test3, (num,)) #一個個執行 print("----------------------") pool.close() pool.join() end1 = time.process_time() end2 = time.time() print("總共用時:", end2- start2,"秒,其中CPU計算時間爲",end1-start1,"秒") ##########輸出結果 這裏是主進程,個人pid是25568 個人父進程是14632 這裏是test1,個人pid是7612 個人父進程是25568 這裏是test1,個人num是10 個人名字是是python.exe 這裏是test2,個人pid是21636 個人父進程是25568 這裏是test2,個人num是10 個人名字是是python.exe 這裏是test3,個人pid是7612 個人父進程是25568 這裏是test3,個人num是10 個人名字是是python.exe ---------------------- 總共用時: 3.7470672130584717 秒,其中CPU計算時間爲 0.015625 秒
pool 經常使用方法:
函數原型:apply(func[, args=()[, kwds={}]])
函數原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])
函數原型:map(func, iterable[, chunksize=None])
Pool類中的map方法,與內置的map函數用法行爲基本一致,它會使進程阻塞直到結果返回。 會阻塞到這一步,比map要快,多進程map
函數原型:map_async(func, iterable[, chunksize[, callback]])
主進程阻塞等待子進程的退出, join方法要在close或terminate以後使用。
#!/usr/bin/env python3 # encoding: utf-8 """ @Version: ?? @Author: Administrator 周廣露 @Contact: zhouguanglu2012@163.com @Site: http://www.baidu.com @Software: PyCharm @File: multiP @Created_Time: 2019/1/25 18:35 """ import multiprocessing import time import os import psutil TIME =1 def fun_call(msg): print("我這裏執行完了一個",msg) def test1(args1): print("這裏是test1,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test1,個人num是%d 個人名字是是%s"%(args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) return "這是test1的返回值" def test2(args1): print("這裏是test2,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test2,個人num是%d 個人名字是是%s" % (args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) def test3(args1): print("這裏是test3,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test3,個人num是%d 個人名字是是%s" % (args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) if __name__ == '__main__': pool = multiprocessing.Pool(2) start1 = time.process_time() start2 = time.time() print("這裏是主進程,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) num = 10 pool.apply_async(test1, (num,),callback = fun_call) #name是類名字 pool.apply_async(test2, (num,)) pool.apply_async(test3, (num,)) #先加進去,而後慢慢執行 # pool.apply(test1, (num,)) # pool.apply(test2, (num,)) # pool.apply(test3, (num,)) #一個個執行 print("----------------------") pool.close() pool.join() end1 = time.process_time() end2 = time.time() print("總共用時:", end2- start2,"秒,其中CPU計算時間爲",end1-start1,"秒") ######輸出以下 這裏是主進程,個人pid是5532 個人父進程是14632 ---------------------- 這裏是test1,個人pid是33372 個人父進程是5532 這裏是test1,個人num是10 個人名字是是python.exe 這裏是test2,個人pid是14904 個人父進程是5532 這裏是test2,個人num是10 個人名字是是python.exe 我這裏執行完了一個 這是test1的返回值 這裏是test3,個人pid是33372 個人父進程是5532 這裏是test3,個人num是10 個人名字是是python.exe 總共用時: 2.742694854736328 秒,其中CPU計算時間爲 0.015625 秒
from multiprocessing import Pipe, Pool, Manager, Process def consumer(q,i): print("進來了c") num = 1 while True: print("這裏是消費者%d正在吃個人第%d個包子"%(i, num )) print(q[0].recv()) # time.sleep(2) num = num + 1 def producer(q,i): print("進來了p") num = 1 while True: baoziname = "%d廚師%d號包子"%(i, num) print("這裏是生產者%d正在生產個人第%d個包子"%(i, num )) # time.sleep(1) q[1].send(baoziname) num += 1 if __name__ == '__main__': pipe = Pipe(5) print(pipe) C_SIZE = 5 P_SIZE = 3 pool_c = Pool(C_SIZE) pool_p = Pool(P_SIZE) print("-------------------------開始--------------") for i in range(P_SIZE): pool_p.apply_async(producer, (pipe,i)) for i in range(C_SIZE): pool_c.apply_async(consumer,(pipe,i)) print("-------------------------完事--------------") pool_c.close() pool_p.close() pool_c.join() pool_p.join() # C_SIZE = 5 # P_SIZE = 3 # # print("-------------------------開始--------------") # clist = [] # plist = [] # for i in range(C_SIZE): # t = Process(target=producer, args=(pipe, i)) # t.start() # plist.append(t) # for i in range(P_SIZE): # t = Process(target=consumer, args=(pipe, i)) # t.start() # clist.append(t) # # for i in clist: # i.join() # # for i in plist: # i.join() # print("-------------------------完事--------------")
# named pipe Client
#encoding: utf-8
import os
import time
write_path = "/tmp/server_in.pipe"
read_path = "/tmp/server_out.pipe"
counter = 1
f = os.open( write_path, os.O_SYNC | os.O_CREAT | os.O_RDWR )
print "Client open f", f
rf = None
while True:
# Client發送請求
req = "%s "%counter
len_send = os.write( f, req )
print "request", req, len_send
counter += 1
if rf == None:
# *要點1:在這裏第一次打開read_path,實際這裏的open是一個阻塞操做
# 打開的時機很重要。若是在程序剛開始,沒發送請求就打開read_path,確定會阻塞住
rf = os.open( read_path, os.O_RDONLY )
print "client opened rf", rf
# 接收Server迴應
s = os.read( rf, 1024 )
if len(s) == 0:
# 通常來講,是管道被意外關閉了,好比Server退出了
print "received", s
# 這個例子裏沒有sleep,客戶端以最高速度發送數據,能夠觀察執行效果
os.close( f )
#named pipe Server
#encoding: utf-8
import os, time
read_path = "/tmp/server_in.pipe"
write_path = "/tmp/server_out.pipe"
# 建立命名管道
os.mkfifo( write_path )
os.mkfifo( read_path )
except OSError, e:
# 若是命名管道已經建立過了,那麼無所謂
print "mkfifo error:", e
# 寫入和讀取的文件,正好和Client相反
rf = os.open( read_path, os.O_RDONLY )
f = os.open( write_path, os.O_SYNC | os.O_CREAT | os.O_RDWR )
while True:
# 接收請求
s = os.read( rf, 2 )
if len(s) == 0:
# 沒有收到字符,通常是惟一的發送方被關閉了。
# 這裏能夠休息一下繼續,對後續消息沒有任何影響,也不會丟包。
time.sleep( 1 )
# 若是收到的字符串帶一個s,打印出來
# 用於調試和測試
if "z" in s:
print "received", s
# 在請求前面加一個s字母,返回
os.write( f, "s%s"%s )
os.close( f )
os.close( rf )
import io
import os
import time
from multiprocessing import Pipe, Pool, Manager, Process, Lock
def consumer(q,i,l):
r1 = os.fdopen(q, "r")
num = 1
while True:
print("這裏是消費者%d正在吃個人第%d個包子"%(i, num ))
# r1 = open("1.txt", "r")
num = num + 1
def producer(q,i,l):
num = 1
w1 = os.fdopen(q, "w")
while True:
baoziname = "%d廚師%d號包子"%(i, num)
print("這裏是生產者%d正在生產個人第%d個包子"%(i, num ))
num += 1
if __name__ == '__main__':
r,w = os.pipe()
l= Lock()
# C_SIZE = 5
# P_SIZE = 3
# pool_c = Pool(C_SIZE)
# pool_p = Pool(P_SIZE)
# for i in range(P_SIZE):
# pool_p.apply_async(producer, (w1,i))
# for i in range(C_SIZE):
# pool_c.apply_async(consumer,(r1,i))
t = Process(target=producer, args=(r,0,l))
t = Process(target=consumer, args=(r,1,l))
# pool_c.close()
# pool_p.close()
# pool_c.join()
# pool_p.join()
# C_SIZE = 5
# P_SIZE = 3
# print("-------------------------開始--------------")
# clist = []
# plist = []
# for i in range(C_SIZE):
# t = Process(target=producer, args=(pipe, i))
# t.start()
# plist.append(t)
# for i in range(P_SIZE):
# t = Process(target=consumer, args=(pipe, i))
# t.start()
# clist.append(t)
# for i in clist:
# i.join()
# for i in plist:
# i.join()
# print("-------------------------完事--------------")
import time from multiprocessing import Pool, Process, Queue, Manager def consumer(q,i): print("進來了c") num = 1 while True: print("這裏是消費者%d正在吃個人第%d個包子"%(i, num )) print(q.get()) # time.sleep(2) num = num + 1 def producer(q,i): print("進來了p") num = 1 while True: baoziname = "%d廚師%d號包子"%(i, num) print("這裏是生產者%d正在生產個人第%d個包子"%(i, num )) # time.sleep(1) q.put(baoziname) num += 1 if __name__ == '__main__': C_SIZE = 5 P_SIZE = 3 pool_c = Pool(C_SIZE) pool_p = Pool(P_SIZE) Queue q = Manager().Queue(5) #默認最多隻能存5個包子 print("-------------------------開始--------------") for i in range(P_SIZE): pool_p.apply_async(producer, (q,i)) for i in range(C_SIZE): pool_c.apply_async(consumer,(q,i)) print("-------------------------完事--------------") pool_c.close() pool_p.close() pool_c.join() pool_p.join()
Queue.Queue(maxsize=0) FIFO, 若是maxsize小於1就表示隊列長度無限 Queue.LifoQueue(maxsize=0) LIFO, 若是maxsize小於1就表示隊列長度無限 Queue.qsize() 返回隊列中數據的大小 Queue.empty() 若是隊列爲空,返回True,反之False Queue.full() 若是隊列滿了,返回True,反之False Queue.full 與 maxsize 大小對應 Queue.get([block[, timeout]])獲取隊列,timeout等待時間 Queue.get_nowait() 至關Queue.get(False) 非阻塞 Queue.put(item) 寫入隊列,timeout等待時間 Queue.put_nowait(item) 至關Queue.put(item, False) Queue.task_done() 在完成一項工做以後,Queue.task_done()函數向任務已經完成的隊列發送一個信號 Queue.join() 實際上意味着等到隊列爲空,再執行別的操做 Queue.maxsize 最大能裝在數據,能夠直接修改
#!/usr/bin/env python3 # encoding: utf-8 """ @Version: ?? @Author: Administrator 周廣露 @Contact: zhouguanglu2012@163.com @Site: http://www.baidu.com @Software: PyCharm @File: 進程管道通訊 @Created_Time: 2019/1/26 16:58 """ import mmap import time from multiprocessing import Pipe, Pool, Manager, Process import pipe # mmap_file = None BUFFSIZE =1024 FILE = "1.txt" mmap_file = mmap.mmap(-1, BUFFSIZE, access = mmap.ACCESS_WRITE, tagname = 'share_mmap') def consumer(i): time.sleep(5) print("進來了c") num = 1 # global mmap_file # mmap_file=mmap.mmap(-1, BUFFSIZE, access = mmap.ACCESS_READ, tagname = 'share_mmap') while True: tell = mmap_file.tell() s = mmap_file.readline().decode("utf8") length = len(s) print("我進入consumer內層循環:") if s : print("這裏是消費者%d正在吃個人第%d個包子"%(i, num )) print("-----",s) num = num + 1 time.sleep(2) else: time.sleep(5) def producer(i): print("進來了p") num = 1 # global mmap_file # mmap_file = mmap.mmap(-1, BUFFSIZE, access=mmap.ACCESS_WRITE, tagname='share_mmap') while True: if num < 25: baoziname = "%d廚師%d號包子"%(i, num) print("這裏是生產者%d正在生產個人第%d個包子"%(i, num )) print("長度:",len(baoziname.encode("utf8") + '\n'.encode("utf8"))) time.sleep(1) mmap_file.write(baoziname.encode("utf8") + '\n'.encode("utf8")) mmap_file.flush() num += 1 if __name__ == '__main__': C_SIZE = 1 P_SIZE = 1 pool_c = Pool(C_SIZE) pool_p = Pool(P_SIZE) print("-------------------------開始--------------") for i in range(P_SIZE): pool_p.apply_async(producer, (i,)) for i in range(C_SIZE): pool_c.apply_async(consumer,(i,)) print("-------------------------完事--------------") pool_c.close() pool_p.close() pool_c.join() pool_p.join() # C_SIZE = 5 # P_SIZE = 3 # print("-------------------------開始--------------") # clist = [] # plist = [] # for i in range(C_SIZE): # t = Process(target=producer, args=(nm, i)) # t.start() # plist.append(t) # for i in range(P_SIZE): # t = Process(target=consumer, args=(nm, i)) # t.start() # clist.append(t) # # for i in clist: # i.join() # # for i in plist: # i.join() # print("-------------------------完事--------------")
----建立 mmap 對象 mmap(filedesc, length, tagname='') #windows mmap(filedesc, length, flag=MAP_SHARED, prot=PROT_READ|PROT_WRITE) #Unix #第一個爲文件,先寫須要大小的文件,保存 建立並返回一個 mmap 對象,參數 filedesc 一般是由 f.fileno()得到的,這在Python文件系列中已經介紹過。 mmap 建立對象的含義是:將指定 fd 的前 length 字節映射到內存。 Windows中,能夠經過參數tagname爲一段內存映射指定名稱,這樣一個文件上面能夠同時具備多個 mmap。windows中的內存映射都是可讀可寫的,同時在進程之間共享。這時候指定fileno爲-1就能夠,第二個爲字節大小就能夠 Unix平臺上,參數 flags 的可選值包括: mmap.MAP_PRIVATE:這段內存映射只有本進程可用; mmap.MAP_SHARED:將內存映射和其餘進程共享,全部映射了同一文件的進程,都可以看到其中一個所作的更改;默認是共享 參數 prot 對應的取值包括:mmap.PROT_READ, mmap.PROT_WRITE 和 mmap.PROT_WRITE | mmap.PROT_READ。最後一者的含義是同時可讀可寫。 length:要映射文件部分的大小(以字節爲單位),這個值爲0,則映射整個文件,若是大小大於文件當前大小,則擴展這個文件。 flags:MAP_PRIVATE:這段內存映射只有本進程可用;mmap.MAP_SHARED:將內存映射和其餘進程共享,全部映射了同一文件的進程,都可以看到其中一個所作的更改; prot:mmap.PROT_READ, mmap.PROT_WRITE 和 mmap.PROT_WRITE | mmap.PROT_READ。最後一者的含義是同時可讀可寫。 access:在mmap中有可選參數access的值有 ACCESS_READ:讀訪問。 ACCESS_WRITE:寫訪問,默認。 ACCESS_COPY:拷貝訪問,不會把更改寫入到文件,使用flush把更改寫到文件。
m.close() 關閉 m 對應的文件; m.find(str, start=0) 從 start 下標開始,在 m 中從左往右尋找子串 str 最先出現的下標; m.flush([offset, n]) 把 m 中從offset開始的n個字節刷到對應的文件中,參數 offset 要麼同時指定,要麼同時不指定; m.move(dstoff, srcoff, n) 等於 m[dstoff:dstoff+n] = m[srcoff:srcoff+n],把從 srcoff 開始的 n 個字節複製到從 dstoff 開始的n個字節,可能會覆蓋重疊的部分。 m.read(n) 返回一個字符串,從 m 對應的文件中最多讀取 n 個字節,將會把 m 對應文件的位置指針向後移動; m.read_byte() 返回一個1字節長的字符串,從 m 對應的文件中讀1個字節,要是已經到了EOF還調用 read_byte(),則拋出異常 ValueError; m.readline() 返回一個字符串,從 m 對應文件的當前位置到下一個'\n',當調用 readline() 時文件位於 EOF,則返回空字符串; m.resize(n) 把 m 的長度改成 n,m 的長度和 m 對應文件的長度是獨立的; m.seek(pos, how=0) 同 file 對象的 seek 操做,改變 m 對應的文件的當前位置; m.size() 返回 m 對應文件的長度(不是 m 對象的長度len(m)); m.tell() 返回 m 對應文件的當前位置; m.write(str) 把 str 寫到 m 對應文件的當前位置,若是從 m 對應文件的當前位置到 m 結尾剩餘的空間不足len(str),則拋出 ValueError; m.write_byte(byte) 把1個字節(對應一個字符)寫到 m 對應文件的當前位置,實際上 m.write_byte(ch) 等於 m.write(ch)。若是 m 對應文件的當前位置在 m 的結尾,也就是 m 對應文件的當前位置到 m 結尾剩餘的空間不足1個字節,write() 拋出異常ValueError,而 write_byte() 什麼都不作
缺點: 數據安全上存在風險,內存中的內容會被其餘進程覆蓋或 者篡改
from multiprocessing import Value , Array
class ctypes.c_byte Represents the C signed char datatype, and interprets the value as small integer. The constructor accepts an optional integer initializer; no overflow checking is done. class ctypes.c_char Represents the C char datatype, and interprets the value as a single character. The constructor accepts an optional string initializer, the length of the string must be exactly one character. class ctypes.c_char_p Represents the C char * datatype when it points to a zero-terminated string. For a general character pointer that may also point to binary data, POINTER(c_char) must be used. The constructor accepts an integer address, or a bytes object. class ctypes.c_double Represents the C double datatype. The constructor accepts an optional float initializer. class ctypes.c_longdouble Represents the C long double datatype. The constructor accepts an optional float initializer. On platforms where sizeof(long double) == sizeof(double) it is an alias to c_double. class ctypes.c_float Represents the C float datatype. The constructor accepts an optional float initializer. class ctypes.c_int Represents the C signed int datatype. The constructor accepts an optional integer initializer; no overflow checking is done. On platforms where sizeof(int) == sizeof(long) it is an alias to c_long. class ctypes.c_int8 Represents the C 8-bit signed int datatype. Usually an alias for c_byte. class ctypes.c_int16 Represents the C 16-bit signed int datatype. Usually an alias for c_short. class ctypes.c_int32 Represents the C 32-bit signed int datatype. Usually an alias for c_int. class ctypes.c_int64 Represents the C 64-bit signed int datatype. Usually an alias for c_longlong. class ctypes.c_long Represents the C signed long datatype. The constructor accepts an optional integer initializer; no overflow checking is done. class ctypes.c_longlong Represents the C signed long long datatype. The constructor accepts an optional integer initializer; no overflow checking is done. class ctypes.c_short Represents the C signed short datatype. The constructor accepts an optional integer initializer; no overflow checking is done. class ctypes.c_size_t Represents the C size_t datatype. class ctypes.c_ssize_t Represents the C ssize_t datatype. New in version 3.2. class ctypes.c_ubyte Represents the C unsigned char datatype, it interprets the value as small integer. The constructor accepts an optional integer initializer; no overflow checking is done. class ctypes.c_uint Represents the C unsigned int datatype. The constructor accepts an optional integer initializer; no overflow checking is done. On platforms where sizeof(int) == sizeof(long) it is an alias for c_ulong. class ctypes.c_uint8 Represents the C 8-bit unsigned int datatype. Usually an alias for c_ubyte. class ctypes.c_uint16 Represents the C 16-bit unsigned int datatype. Usually an alias for c_ushort. class ctypes.c_uint32 Represents the C 32-bit unsigned int datatype. Usually an alias for c_uint. class ctypes.c_uint64 Represents the C 64-bit unsigned int datatype. Usually an alias for c_ulonglong. class ctypes.c_ulong Represents the C unsigned long datatype. The constructor accepts an optional integer initializer; no overflow checking is done. class ctypes.c_ulonglong Represents the C unsigned long long datatype. The constructor accepts an optional integer initializer; no overflow checking is done. class ctypes.c_ushort Represents the C unsigned short datatype. The constructor accepts an optional integer initializer; no overflow checking is done. class ctypes.c_void_p Represents the C void * type. The value is represented as integer. The constructor accepts an optional integer initializer. class ctypes.c_wchar Represents the C wchar_t datatype, and interprets the value as a single character unicode string. The constructor accepts an optional string initializer, the length of the string must be exactly one character. class ctypes.c_wchar_p Represents the C wchar_t * datatype, which must be a pointer to a zero-terminated wide character string. The constructor accepts an integer address, or a string. class ctypes.c_bool Represent the C bool datatype (more accurately, _Bool from C99). Its value can be True or False, and the constructor accepts any object that has a truth value. class ctypes.HRESULT Windows only: Represents a HRESULT value, which contains success or error information for a function or method call. class ctypes.py_object Represents the C PyObject * datatype. Calling this without an argument creates a NULL PyObject * pointer. The ctypes.wintypes module provides quite some other Windows specific data types, for example HWND, WPARAM, or DWORD. Some useful structures like MSG or RECT are also defined. Structured data types class ctypes.Union(*args, **kw) Abstract base class for unions in native byte order. class ctypes.BigEndianStructure(*args, **kw) Abstract base class for structures in big endian byte order. class ctypes.LittleEndianStructure(*args, **kw) Abstract base class for structures in little endian byte order.
Array(typecode_or_type, size_or_initializer, *, lock=True)
使用基本相似於Value,Returns a synchronized shared array
from multiprocessing.sharedctypes import Array from multiprocessing import Process from time import sleep def read(data): sleep(0.5) while True: data.append(1) # 每隔一秒讀一次 print(data) sleep(1) def write(data): while True: print(data[0],data[1],data[2]) if __name__ == '__main__': data = Array('i', [1,2,3]) # 定義一個共享內存,數據類型string類型 Process(target=write, args=(data,)).start() while True: sleep(1)
from multiprocessing import Process, Manager def test(data): print(id(data)) data.append(2) if __name__ == '__main__': m = Manager() data = m.list() #dict({"1":"2"}) data.append(1) print(id(data)) p =Process(target=test, args=(data,)) p.start() p.join() print(data,id(data))
from multiprocessing import Process,Value,Lock from multiprocessing.managers import BaseManager class Employee(object): def __init__(self,name,salary): self.name=name self.salary=Value('i',salary) def increase(self): self.salary.value+=100 def getPay(self): return self.name+':'+str(self.salary.value) class MyManager(BaseManager): pass def Manager2(): m=MyManager() m.start() return m MyManager.register('Employee',Employee) def func1(em,lock): with lock: em.increase() if __name__ == '__main__': manager=Manager2() em=manager.Employee('zhangsan',1000) lock=Lock() proces=[Process(target=func1,args=(em,lock))for i in xrange(10)] for p in proces: p.start() for p in proces: p.join() print em.getPay()
import time from multiprocessing import Process from random import randint from multiprocessing.managers import BaseManager from queue import Queue,LifoQueue,PriorityQueue class MyManager(BaseManager): pass MyManager.register("Queue", Queue) MyManager.register("LifoQueue", LifoQueue) MyManager.register("PriorityQueue", PriorityQueue) m = LifoQueue() def producer(q): count = 0 while 1: if count > 5: break pri = randint(0, 100) print(f'put :{pri}') q.put((pri, 222, pri)) # (priority, func, args) count += 1 if __name__ == '__main__': m = MyManager() m.start() q = m.PriorityQueue() # q是個代理(proxy),等因而用manager構造了一個多進程能夠共享的PriorityQueue, 而原本PriorityQueue只能線程間共享, 等因而用manager構造了一個多進程能夠共享的PriorityQueue, 而原本PriorityQueue只能線程間共享 t = Process(target=producer, args=(q,)) t.start() time.sleep(1) t.join()
Semaphore爲信號量機制。當共享的資源擁有多個時,可用Semaphore來實現進程同步。其用法和Lock差很少,s = Semaphore(N),每執行一次s.acquire(),該資源的可用個數將減小1,當資源個數已爲0時,就進入阻塞;每執行一次s.release(),佔用的資源被釋放,該資源的可用個數增長1。
import time import multiprocessing def test1(i): while True: i.acquire() # print("test1得到一個資源",multiprocessing.current_process(),i.__dict__) time.sleep(3) i.release() def test2(i): while True: i.acquire() print("test2得到一個資源") time.sleep(3) i.release() from multiprocessing import Semaphore, Process if __name__ == '__main__': s = Semaphore(5) t = Process(target=test1, args=(s,)) t.start() t = Process(target=test2, args=(s,)) t.start()
信號(signal)-- 進程之間通信的方式。一個進程一旦接收到信號就會打斷原來的程序執行流程來處理信號。
SIGINT 終止進程 中斷進程 (control+c)
SIGTERM 終止進程 軟件終止信號
SIGKILL 終止進程 殺死進程
#encoding=utf-8 import os import signal from time import sleep def my_term(a,b): print "收到sigterm信號" signal.signal(signal.SIGTERM,my_term) def my_usr1(a,b): print "收到SIGUSR1信號" signal.signal(signal.SIGUSR1,my_usr1) while 1: print "我是進程id是",os.getpid() sleep(1)
signal.SIGHUP # 鏈接掛斷; signal.SIGILL # 非法指令; signal.SIGINT # 終止進程(ctrl+c); signal.SIGTSTP # 暫停進程(ctrl+z); signal.SIGKILL # 殺死進程(此信號不能被捕獲或忽略); signal.SIGQUIT # 終端退出; signal.SIGTERM # 終止信號,軟件終止信號; signal.SIGALRM # 鬧鐘信號,由signal.alarm()發起; signal.SIGCONT # 繼續執行暫停進程;
signal.signal(signalnum, handler) 設置信號處理的函數
signal.alarm(time) 設置發送SIGALRM信號的定時器
os.kill 這個不屬於signal模塊,但其可使用給某一進程發送信號
子進程或者兄弟進程間的:Queue, manager,匿名管道
Queue: pool不能用multiprocessing,能用manager的
import threading import time def test1(): while True: print("這是test1",threading.current_thread()) time.sleep(5) def test2(): while True: print("這是test2",threading.current_thread()) time.sleep(5) # list = ["test1", "test2"] print( eval("test1")) # tlist = [threading.Thread(target=globals().get(x), name=x+"的名字") for x in list] #高級建立多個 tlist = [threading.Thread(target=eval(x), name=x+"的名字") for x in list] #高級建立多個 # threading.Thread(target=test1, args=(), name="test1的名字") # threading.Thread(target=test2, args=(), name="test2的名字") for thread in tlist: thread.start() for thread in tlist: thread.join() print("=-------------------end--------")
import threading import time class MyThread(threading.Thread): def __init__(self,*args, **kwargs): super().__init__(*args, **kwargs) def run(self): print("這個一個自定義類必須運行的函數") try: if self._target: self._target(*self._args, **self._kwargs) finally: # Avoid a refcycle if the thread is running a function with # an argument that has a member that points to the thread. del self._target, self._args, self._kwargs def test1(): while True: print("這是test1",threading.current_thread()) time.sleep(5) def test2(): while True: print("這是test2",threading.current_thread()) time.sleep(5) # list = ["test1", "test2"] t = [MyThread(target=eval(i)) for i in list] for i in t: i.start() for i in t: i.join()
threading.setDaemon(True) ###設置爲守護進程,若主進程完事,不用等待子進程 threading.join(timeout) ###若設置,則主進程阻塞,知道線程完成或者時間超過,若設置了threading.setDamon, 則仍然會阻塞 #####查詢類 threading.current_thread() threading.currentThread() #當前線程的描述符 threading.active_count() threading.activeCount() #當前活動線程數量 threading.current_thread().name threading.current_thread().getName() t[0].getName() t[0].name #當前進程的名字 threading.current_thread().daemon 是否爲守護進程 threading.current_thread().ident 線程的標識符 threading.main_thread() 線程的主線程 threading.enumerate() ###當前進程中的線程 ####判斷類 threading.current_thread().isAlive() threading.main_thread().is_alive() ##是否激活狀態 ###設置類 threading.currentThread().setName("Main Thread zzzz") #設置名字 threading.currentThread().setDaemon(True) ####設置守護線程,可是必須在start以前設置
threading.settrace(func) ###設置一個跟蹤函數,用於在run()執行以前被調用。
threading.setprofile(func) ###設置一個跟蹤函數,用於在run()執行完畢以後調用。
threading.Timer(time, func) ###幾秒以後執行func函數
import threading import time class MyThread(threading.Thread): def __init__(self,*args, **kwargs): super().__init__(*args, **kwargs) def run(self): print("這個一個自定義類必須運行的函數") try: if self._target: self._target(*self._args, **self._kwargs) finally: # Avoid a refcycle if the thread is running a function with # an argument that has a member that points to the thread. del self._target, self._args, self._kwargs def test1(): while True: print("這是test1",threading.current_thread()) time.sleep(5) def test2(): while True: print("這是test2",threading.current_thread()) time.sleep(5) # list = ["test1", "test2"] t = [MyThread(target=eval(i)) for i in list] t[0].setDaemon(True) t[1].setDaemon(True) for i in t: i.start() print(threading.current_thread(),threading.currentThread().name,threading.active_count(),threading.activeCount()) #####<_MainThread(MainThread, started 27672)> MainThread 3 3 print(threading.current_thread().name,threading.current_thread().getName(), t[0].getName(), t[0].name, threading.current_thread().daemon ) #####MainThread MainThread Thread-1 Thread-1 False print( threading.current_thread().ident, threading.main_thread()) ####14172 <_MainThread(MainThread, started 14172)> print(threading.current_thread().isAlive(),threading.main_thread().is_alive(),threading.enumerate()) ######True True [<_MainThread(MainThread, started 2276)>, <MyThread(Thread-1, started daemon 13176)>, <MyThread(Thread-2, started daemon 14848)>] threading.currentThread().setName("Main Thread zzzz") #######Main Thread zzzz # threading.currentThread().setDaemon(True) ####在start以前設置才能夠 print(threading.currentThread().getName()) for i in t: i.join(2)
import time from threading import Timer def test1(a): a = a+1 Timer(3,test1,(a,)).start() print(f"這是test1的{a}次調用",time.time()) if __name__ == '__main__': test1(0)
在同一個進程中只要有一個線程獲取了全局解釋器(cpu)的使用權限,那麼其餘的線程就必須等待該線程的全局解釋器(cpu)使 用權消失後才能使用全局解釋器(cpu),即時多個線程直接不會相互影響在同一個進程下也只有一個線程使用cpu,這樣的機制稱爲全局 解釋器鎖(GIL)。
import threading def test1(): while True: print("這是test1的名字", threading.currentThread().name) def test2(): while True: print("這是test2的名字", threading.currentThread().name) funclist = [test1, test2] tlist = [threading.Thread(target=x) for x in funclist] for thread in tlist: thread.start()
import threading def test1(): while True: lock.acquire() print("這是test1的名字", threading.currentThread().name) lock.release() def test2(): while True: lock.acquire() print("這是test2的名字", threading.currentThread().name) lock.release() funclist = [test1, test2] tlist = [threading.Thread(target=x) for x in funclist] lock = threading.Lock() for thread in tlist: thread.start()
import multiprocessing def test1(lock): while True: lock.acquire() print("這是test1的名字", multiprocessing.current_process().name) lock.release() def test2(lock): while True: lock.acquire() print("這是test2的名字", multiprocessing.current_process().name) lock.release() if __name__ == '__main__': funclist = [test1, test2] lock = multiprocessing.Lock() tlist = [multiprocessing.Process(target=x , args=(lock,)) for x in funclist] for process in tlist: process.start()
import threading import time #####構造一個單例阻塞類,只能一我的構造,再有人構造阻塞 class single1(object): lock = threading.Lock() num = 0 def __del__(self): #實例刪除時候,釋放鎖 single1.lock.release() def __new__(cls, *args, **kwargs): print("進入single1的__new__一次") single1.lock.acquire() single1.num += 1 print(f"獲取到single1構造函數,目前裏面有{single1.num}人") return object.__new__(cls,*args, **kwargs) class single2(object): lock = threading.Lock() num = 0 def __del__(self): #實例刪除時候,釋放鎖 single2.lock.release() def __new__(cls, *args, **kwargs): print("進入single2的__new__一次") single2.lock.acquire() single2.num += 1 print(f"獲取到single2構造函數,目前裏面有{single2.num}人") return object.__new__(cls,*args, **kwargs) def test1(): while True: a = single1() print( "-----------------------") time.sleep(2) b = single2() print("這是{0}的內部".format( threading.currentThread().name)) def test2(): while True: a = single2() print("==================") time.sleep(2) b = single1() print("這是{0}的內部".format(threading.currentThread().name)) funclist = [test1, test2] tlist = [threading.Thread(target=x) for x in funclist] for thread in tlist: thread.start()
import threading import time #####構造一個單例阻塞類,只能一我的構造,再有人構造阻塞 class single1(object): lock = threading.Lock() num = 0 def __del__(self): #實例刪除時候,釋放鎖 single1.lock.release() single1.num -= 1 def __new__(cls, *args, **kwargs): print("進入single1的__new__一次") single1.lock.acquire() single1.num += 1 print(f"獲取到single1構造函數,目前裏面有{single1.num}人") return object.__new__(cls,*args, **kwargs) class single2(object): lock = threading.Lock() num = 0 def __del__(self): #實例刪除時候,釋放鎖 single2.lock.release() single2.num -= 1 def __new__(cls, *args, **kwargs): print("進入single2的__new__一次") single2.lock.acquire() single2.num += 1 print(f"獲取到single2構造函數,目前裏面有{single2.num}人") return object.__new__(cls,*args, **kwargs) def test1(): while True: print("-----------------------") rlock.acquire() #獲取single1 a = single1() rlock.acquire() #獲取single2 b = single2() print("這是{0}的內部".format( threading.currentThread().name)) del a,b rlock.release() rlock.release() time.sleep(2) def test2(): while True: print("==================") rlock.acquire() a = single2() rlock.acquire() b = single1() print("這是{0}的內部".format(threading.currentThread().name)) del a, b rlock.release() rlock.release() time.sleep(2) rlock = threading.RLock() funclist = [test1, test2] tlist = [threading.Thread(target=x) for x in funclist] for thread in tlist: thread.start()
import multiprocessing import time #####構造一個單例阻塞類,只能一我的構造,再有人構造阻塞 class single(object): def __init__(self,name): self.lock = multiprocessing.Lock() self.num = 0 self.name = name def get(self): self.lock.acquire() self.num += 1 return self.num def dele(self): self.lock.release() def test1(a,b): while True: print("-----------------------") # rlock.acquire() #獲取single1 print("a:",a,a.get(),a.name) time.sleep(2) # rlock.acquire() #獲取single2 print("b:",b,b.get(),b.name) print("這是{0}的內部".format( multiprocessing.current_process().name)) a.dele() b.dele() # rlock.release() # rlock.release() def test2(a,b): while True: print("==================") # rlock.acquire() print("b:",b,b.get(),b.name) time.sleep(2) print("--------",a.num) # rlock.acquire() print("a:",a,a.get(),a.name) print("這是{0}的內部".format(multiprocessing.current_process().name)) a.dele() b.dele() # rlock.release() # rlock.release() rlock = multiprocessing.RLock() if __name__ == '__main__': a = single("a") b = single("b") print("a:",a,"b:",b) funclist = [test1, test2] tlist = [multiprocessing.Process(target=x,args=(a,b)) for x in funclist] for process in tlist: process.start()
import multiprocessing import time #####構造一個單例阻塞類,只能一我的構造,再有人構造阻塞 class single(object): def __init__(self,name): self.lock = multiprocessing.Lock() self.num = 0 self.name = name def get(self): self.lock.acquire() self.num += 1 return self.num def dele(self): self.lock.release() def test1(a,b,rlock): while True: rlock.acquire() #獲取single1 print("-----------------------") print("a:",a,a.get(),a.name) time.sleep(2) rlock.acquire() #獲取single2 print("b:",b,b.get(),b.name) print("這是{0}的內部".format( multiprocessing.current_process().name)) a.dele() b.dele() rlock.release() rlock.release() time.sleep(1) def test2(a,b,rlock): while True: rlock.acquire() print("==================") print("b:",b,b.get(),b.name) time.sleep(2) rlock.acquire() print("a:",a,a.get(),a.name) print("這是{0}的內部".format(multiprocessing.current_process().name)) a.dele() b.dele() rlock.release() rlock.release() time.sleep(1) if __name__ == '__main__': rlock = multiprocessing.RLock() a = single("a") b = single("b") print("a:",a,"b:",b) funclist = [test1, test2] tlist = [multiprocessing.Process(target=x,args=(a,b,rlock)) for x in funclist] for process in tlist: process.start()
Condition([lock/rlock]) 默認遞歸鎖
acquire([timeout])/release(): 調用關聯的鎖的相應方法。
wait([timeout]): 調用這個方法將使線程進入Condition的等待池等待通知,並釋放鎖。使用前線程必須已得到鎖定,不然將拋出異常。
notify(): 調用這個方法將從等待池挑選一個線程並通知,收到通知的線程將自動調用acquire()嘗試得到鎖定(進入鎖定池);其餘線程仍然在等待池中。調用這個方法不會釋放鎖定。使用前線程必須已得到鎖定,不然將拋出異常。
notifyAll(): 調用這個方法將通知等待池中全部的線程,這些線程都將進入鎖定池嘗試得到鎖定。調用這個方法不會釋放鎖定。使用前線程必須已得到鎖定,不然將拋出異常。
import threading import queue, time, random class Goods: # 產品類 def __init__(self): self.count = 0 def add(self, num=1): self.count += num def sub(self): if self.count >= 0: self.count -= 1 def empty(self): return self.count <= 0 class Producer(threading.Thread): # 生產者類 def __init__(self, condition, goods, sleeptime=1): # sleeptime=1 threading.Thread.__init__(self) self.cond = condition self.goods = goods self.sleeptime = sleeptime def run(self): cond = self.cond goods = self.goods time.sleep(3) while True: cond.acquire() # 鎖住資源 goods.add() print("產品數量:", goods.count, "生產者線程", time.asctime()) cond.notifyAll() # 喚醒全部等待的線程--》其實就是喚醒消費者進程 cond.release() # 解鎖資源 time.sleep(self.sleeptime) class Consumer(threading.Thread): # 消費者類 def __init__(self, condition, goods, sleeptime=1): # sleeptime=2 threading.Thread.__init__(self) self.cond = condition self.goods = goods self.sleeptime = sleeptime def run(self): cond = self.cond goods = self.goods while True: time.sleep(self.sleeptime) cond.acquire() # 鎖住資源 while goods.empty(): # 如無產品則讓線程等待 cond.wait() goods.sub() print("產品數量:", goods.count, "消費者線程",time.asctime()) cond.release() # 解鎖資源 g = Goods() c = threading.Condition() P_SIZE = 1 C_SIZE = 1 pro = [Producer(c, g)for i in range(P_SIZE)] for p in pro: p.start() con = [Consumer(c, g) for i in range(C_SIZE)] for c in con: c.start()
import threading import time class Semaphore_ac(threading.Semaphore): def acquire(self, num, blocking: bool = True, timeout: float = None): if not blocking and timeout is not None: raise ValueError("can't specify timeout for non-blocking acquire") rc = False endtime = None with self._cond: while self._value < num: if not blocking: break if timeout is not None: if endtime is None: endtime = _time() + timeout else: timeout = endtime - _time() if timeout <= 0: break self._cond.wait(timeout) else: self._value -= num rc = True return rc def release(self,num): """Release a semaphore, incrementing the internal counter by one. When the counter is zero on entry and another thread is waiting for it to become larger than zero again, wake up that thread. """ with self._cond: self._value += num self._cond.notify() def test1(s): s.acquire(2) print("進入到%s線程"%threading.currentThread().name , time.ctime()) time.sleep(2) s.release(2) if __name__ == '__main__': # sema = threading.Semaphore(5) sema = Semaphore_ac(5) T_SIZE = 10 tlist = [threading.Thread(target=test1,args=(sema,))for i in range(T_SIZE)] for i in tlist: i.start()
event.isSet()返回event的狀態值 event.wait()若是event.isSet()==False將阻塞線程 event.set()設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態,等待操做系統調度 event.clear() 恢復event的狀態值爲Flase
import random import threading import time def test1(event): if not event.isSet(): event.wait() time.sleep(random.random()) print("--------------",event.isSet()) print("這是%s獲取test1的值" % threading.currentThread().name) event.clear() def test2(s): time.sleep(2) print("這是%s設置test2的值"%threading.currentThread().name) event.set() if __name__ == '__main__': event = threading.Event() T_SIZE = 5 tlist1 = [threading.Thread(target=test1,args=(event,))for i in range(T_SIZE)] tlist2 = [threading.Thread(target=test2,args=(event,))for i in range(T_SIZE)] for i in tlist1: i.start() for i in tlist2: i.start()
import random import threading import time def test1(lock): with lock: print("這是test1") raise ValueError("我錯了") def test2(lock): time.sleep(2) print("-------------") with lock : print("這是test2") print("==============") if __name__ == '__main__': lock = threading.Lock() T_SIZE = 1 tlist1 = [threading.Thread(target=test1,args=(lock,))for i in range(T_SIZE)] tlist2 = [threading.Thread(target=test2,args=(lock,))for i in range(T_SIZE)] for i in tlist1: i.start() for i in tlist2: i.start()
import contextlib @contextlib.contextmanager def test(): print("1111") yield "2" print("333333") with test() as f: print("======",f)
上文提到若是咱們要實現一個自定義的上下文管理器,須要定義一個實現了__enter__和__exit__兩個方法的類, 這顯示不是很方便。Python的contextlib模塊給咱們提供了更方便的方式來實現一個自定義的上下文管理器。contextlib模塊包含一個裝飾器contextmanager和一些輔助函數,裝飾器contextmanager只須要寫一個生成器函數就能夠代替自定義的上下文管理器,典型用法以下:
def locked(lock):
with locked(myLock):
@contextmanager def myopen(filename, mode="r"): f = open(filename,mode) try: yield f finally: f.close() with myopen("test.txt") as f: for line in f: print(line)
@contextmanager def transaction(db): db.begin() try: yield except: db.rollback() raise else: db.commit() with transaction(mydb): mydb.cursor.execute(sql) mydb.cursor.execute(sql) mydb.cursor.execute(sql) mydb.cursor.execute(sql)
with mgr1: with mgr2: ... with mgrn: pass
1 from contextlib import contextmanager 2 from contextlib import nested 3 from contextlib import closing 4 5 @contextmanager 6 def my_context(name): 7 print("enter") 8 try: 9 yield name 10 finally: 11 print("exit") 12 13 #使用nested函數來調用多個管理器 14 print("---------使用nested函數調用多個管理器-----------") 15 with nested(my_context("管理器一"), my_context("管理器二"),my_context("管理器三")) as (m1,m2,m3): 16 print(m1) 17 print(m2) 18 print(m3) 19 20 #直接使用with來調用調用多個管理器 21 print("---------使用with調用多個管理器-----------") 22 with my_context("管理器一") as m1, my_context("管理器二") as m2, my_context("管理器三") as m3: 23 print(m1) 24 print(m2) 25 print(m3)
1 class closing(object): 18 def __init__(self, thing): 19 self.thing = thing 20 def __enter__(self): 21 return self.thing 22 def __exit__(self, *exc_info): 23 self.thing.close()
import urllib, sys from contextlib import closing with closing(urllib.urlopen('http://www.yahoo.com')) as f: for line in f: sys.stdout.write(line)
Python的select()方法直接調用操做系統的IO接口,它監控sockets,open files, and pipes(全部帶fileno()方法的文件句柄)什麼時候變成readable 和writeable, 或者通訊錯誤,select()使得同時監控多個鏈接變的簡單,而且這比寫一個長循環來等待和監控多客戶端鏈接要高效,由於select直接經過操做系統提供的C的網絡接口進行操做,而不是經過Python的解釋器。
select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢。select的一 個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制,可是這樣會形成效率的下降
select(rlist, wlist, xlist, timeout=None)
select()方法接收並監控3個通訊列表, 第一個rlist監控全部要進來的輸入數據,第二個wlist是監控全部要發出去的輸出數據,第三個監控異常錯誤數據,第四個設置指定等待時間,若是想當即返回,設爲null便可,最後須要建立2個列表來包含輸入和輸出信息來傳給select(),讓select方法經過內核去監控,而後生成三個實例。
#創建兩個列表,好比想讓內核去檢測50個鏈接,須要傳給它一個列表,就是這個inputs(列表中裏面存放的是須要被內核監控的連接),而後交給select,就至關於交給內核了 inputs = [server,] #輸入列表,監控全部輸入數據 outputs = [] #輸入列表,監控全部輸出數據 #把兩個列表傳給select方法經過內核去監控,生成三個實例 readable,writeable,exceptional = select.select(inputs,outputs,inputs) # 這裏select方法的第三個參數一樣傳入input列表是由於,input列表中存放着全部的連接,好比以前放入的50被監控連接中有5個斷了,出現了異常,就會輸入到exceptional裏面,但這5連接自己是放在inputs列表中
import select, socket, queue server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(("localhost", 9999)) server.listen(1000) server.setblocking(False) # 設置爲非阻塞 msg_dic = dict() # 定義一個隊列字典 inputs = [server, ] # 因爲設置成非阻塞模式,accept和recive都不阻塞了,沒有值就會報錯,所以最開始須要最開始須要監控服務端自己,等待客戶端鏈接 outputs = [] while True: # exceptional表示若是inputs列表中出現異常,會輸出到這個exceptional中 readable, writeable, exceptional = select.select(inputs, outputs, inputs) # 若是沒有任何客戶端鏈接,就會阻塞在這裏 for r in readable: # 沒有個r表明一個socket連接 if r is server: # 若是這個socket是server的話,就說明是是新客戶端鏈接了 conn, addr = r.accept() # 新鏈接進來了,接受這個鏈接,生成這個客戶端實例 print("來了一個新鏈接", addr) inputs.append(conn) # 爲了避免阻塞整個程序,咱們不會馬上在這裏開始接收客戶端發來的數據, 把它放到inputs裏, 下一次loop時,這個新鏈接 # 就會被交給select去監聽 msg_dic[conn] = queue.Queue() # 初始化一個隊列,後面存要返回給這個客戶端的數據 else: # 若是不是server,就說明是以前創建的客戶端來數據了 data = r.recv(1024) print("收到數據:", data) msg_dic[r].put(data) # 收到的數據先放到queue裏,一會返回給客戶端 outputs.append(r) # 爲了避免影響處理與其它客戶端的鏈接 , 這裏不馬上返回數據給客戶端 # r.send(data) # print("send done....") for w in writeable: # 要返回給客戶端的連接列表 print("進入到writeable") data_to_client = msg_dic[w].get() w.send(data_to_client) # 返回給客戶端的源數據 outputs.remove(w) # 確保下次循環的時候writeable,不返回這個已經處理完的這個鏈接了 for e in exceptional: # 處理異常的鏈接 if e in outputs: # 由於e不必定在outputs,因此先要判斷 outputs.remove(e) inputs.remove(e) # 刪除inputs中異常鏈接 del msg_dic[e] # 刪除此鏈接對應的隊列
import select import sys import time import socket sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ip_port = ("", 9999) sk.connect(ip_port) if __name__ == '__main__': while True: sk.send(b"123") print("發送了一遍123") time.sleep(2) try : print("接受到",sk.recv(1024,False)) except Exception as e: print("沒接受到",e)
事件常量 | 意義 |
EPOLLIN | 讀就緒 |
EPOLLOUT | 寫就緒 |
EPOLLPRI | 有數據緊急讀取 |
EPOLLERR | assoc. fd有錯誤狀況發生 |
EPOLLHUP | assoc. fd發生掛起 |
EPOLLET | 設置邊緣觸發(ET)(默認的是水平觸發) |
EPOLLONESHOT | 設置爲 one-short 行爲,一個事件(event)被拉出後,對應的fd在內部被禁用 |
EPOLLRDBAND | 優先讀取的數據帶(data band) |
EPOLLWRBAND | 優先寫的數據帶(data band) |
返回control fd的文件描述符number。epoll.fromfd(fd)
用給予的fd來建立一個epoll對象。epoll.register(fd[, eventmask])
)epoll.modify(fd, eventmask)
註銷一個文件描述符。epoll.poll(timeout=-1[, maxevnets=-1])
等待事件,timeout(float)的單位是秒(second)。import socket import select import queue # 建立socket對象 serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 設置IP地址複用 serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # ip地址和端口號 server_address = ("", 6666) # 綁定IP地址 serversocket.bind(server_address) # 監聽,並設置最大鏈接數 serversocket.listen(10) print("服務器啓動成功,監聽IP:", server_address) # 服務端設置非阻塞 serversocket.setblocking(False) # 超時時間 timeout = 10 # 建立epoll事件對象,後續要監控的事件添加到其中 epoll = select.epoll() # 註冊服務器監聽fd到等待讀事件集合 epoll.register(serversocket.fileno(), select.EPOLLIN) # 保存鏈接客戶端消息的字典,格式爲{} message_queues = {} # 文件句柄到所對應對象的字典,格式爲{句柄:對象} fd_to_socket = {serversocket.fileno(): serversocket, } num = 0 while True: print("等待活動鏈接......") # 輪詢註冊的事件集合,返回值爲[(文件句柄,對應的事件),(...),....] events = epoll.poll(timeout) if not events: print("epoll超時無活動鏈接,從新輪詢......") continue print("有", len(events), "個新事件,開始處理......") for fd, event in events: print('fd and event: ', fd, event) socket = fd_to_socket[fd] # 若是活動socket爲當前服務器socket,表示有新鏈接 print('socket and serversocket: ', socket, serversocket) print('----------event & select.EPOLLIN: ', event & select.EPOLLIN) if socket == serversocket: connection, address = serversocket.accept() # connection新套接字對象 print("新鏈接:", address) # 新鏈接socket設置爲非阻塞 connection.setblocking(False) # 註冊新鏈接fd到待讀事件集合 epoll.register(connection.fileno(), select.EPOLLIN) # 把新鏈接的文件句柄以及對象保存到字典 fd_to_socket[connection.fileno()] = connection # 以新鏈接的對象爲鍵值,值存儲在隊列中,保存每一個鏈接的信息 message_queues[connection] = queue.Queue() # 關閉事件 elif event & select.EPOLLHUP: print('client close') # 在epoll中註銷客戶端的文件句柄 epoll.unregister(fd) # 關閉客戶端的文件句柄 fd_to_socket[fd].close() # 在字典中刪除與已關閉客戶端相關的信息 del fd_to_socket[fd] # 可讀事件 elif event & select.EPOLLIN: # 接收數據 data = socket.recv(1024) if data: print("收到數據:", data, "客戶端:", socket.getpeername()) # 將數據放入對應客戶端的字典 message_queues[socket].put(data) # 修改讀取到消息的鏈接到等待寫事件集合(即對應客戶端收到消息後,再將其fd修改並加入寫事件集合) epoll.modify(fd, select.EPOLLOUT) num = 0 else: print('-------client close connect or send null data -----------') epoll.modify(fd, select.EPOLLHUP) # 客戶端關閉鏈接 # 可寫事件 elif event & select.EPOLLOUT: try: # 從字典中獲取對應客戶端的信息 msg = message_queues[socket].get_nowait() except queue.Empty: print(socket.getpeername(), " queue empty") # 修改文件句柄爲讀事件 epoll.modify(fd, select.EPOLLIN) else: print("發送數據:", data, "客戶端:", socket.getpeername()) # 發送數據 socket.send(msg) # 在epoll中註銷服務端文件句柄 epoll.unregister(serversocket.fileno()) # 關閉epoll epoll.close() # 關閉服務器socket serversocket.close()
#!/usr/bin/env python #-*- coding:utf-8 -*- import socket #建立客戶端socket對象 clientsocket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) #服務端IP地址和端口號元組 server_address = ('', 6666) #客戶端鏈接指定的IP地址和端口號 clientsocket.connect(server_address) while True: #輸入數據 data = input('please input:') #客戶端發送數據 clientsocket.sendall(data.encode("utf8")) #客戶端接收數據 server_data = clientsocket.recv(1024) print('客戶端收到的數據:', server_data) #關閉客戶端socket # clientsocket.close()
在linux是用epoll 在windows是用select實現的
selector.DefaultSelector()--------->註冊socket sel.register(server, selectors.EVENT_READ, accept)
import selectors, socket sel = selectors.DefaultSelector() def accept(sock, mask): "接收客戶端信息實例" conn, addr = sock.accept() print("accepted", conn, 'from', addr) conn.setblocking(False) sel.register(conn, selectors.EVENT_READ, read) # 新鏈接註冊read回調函數 def read(conn, mask): "接收客戶端的數據" data = conn.recv(1024) if data: print("echoing", repr(data), 'to', conn) conn.send(data) else: print("closing", conn) sel.unregister(conn) conn.close() server = socket.socket() server.bind(('localhost', 9999)) server.listen(500) server.setblocking(False) sel.register(server, selectors.EVENT_READ, accept) # 註冊事件,只要來一個鏈接就調accept這個函數, # sel.register(server,selectors.EVENT_READ,accept) == inputs=[server,] while True: events = sel.select() # 這個select,看起來是select,有可能調用的是epoll,看你操做系統是Windows的仍是Linux的 # 默認阻塞,有活動鏈接就返回活動鏈接列表 print("事件:", events) for key, mask in events: #key是事件組合,fileobj是socket, key.data是回調函數 callback = key.data # 至關於調accept了 callback(key.fileobj, mask) # key.fileobj=文件句柄
import socket, sys messages = [b'This is the message. ', b'It will be sent ', b'in parts.', ] server_address = ('localhost', 9999) # 建立100個 TCP/IP socket實例 socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(1)] # 鏈接服務端 print('connecting to %s port %s' % server_address) for s in socks: s.connect(server_address) for message in messages: # 發送消息至服務端 for s in socks: print('%s: sending "%s"' % (s.getsockname(), message)) s.send(message) # 從服務端接收消息 for s in socks: data = s.recv(1024) print('%s: received "%s"' % (s.getsockname(), data)) if not data: print(sys.stderr, 'closing socket', s.getsockname())
from greenlet import greenlet def test1(): print(12) gr2.switch() print(34) def test2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
import gevent def test(time): print(1) gevent.sleep(time) print(2) def test2(time): print(3) gevent.sleep(time) print(4) if __name__ == '__main__': gevent.joinall([ gevent.spawn(test, 2), gevent.spawn(test2, 3) ])
# /usr/bin/python3 # -*- coding: utf-8 -*- # Copyright (c) 2017 - walker <cail1844@gmail.com> import gevent import requests import re import timeit import codecs from threading import Thread from gevent import monkey monkey.patch_socket() def get_title(url,title_list=[]): try: r = requests.get(url,timeout=5) r.encoding = 'utf8' html = r.text title = re.search(r'<title>(.*?)</title>',html).group(1) except TimeoutError: title = '' if title: title_list.append(title) return title def get_url(): baseurl = 'http://www.baidu.com/s?cl=3&tn=baidutop10&fr=top1000&wd=%s' f = codecs.open('muci.txt','r','utf8') url_list = [] for key in f.readlines(): url_list.append(baseurl % key.strip()) return url_list url_list = get_url() def run1(): title_list = [] for url in url_list: get_title(url,title_list) # title_list.append(title) print('Sync result length:',len(title_list),title_list) return title_list def run2(): title_list = [] threads = [gevent.spawn(get_title,url,title_list) for url in url_list] gevent.joinall(threads) # title_list = [thread.value for thread in threads] print('gevent result length:',len(title_list),title_list) return title_list def run3(): title_list = [] th = [] for url in url_list: t = Thread(target=get_title,args=(url,title_list)) th.append(t) t.start() for t in th: t.join() print('threading result length:',len(title_list),title_list) return title_list if __name__ == '__main__': t1 = timeit.timeit('run1()',setup="from __main__ import run1",number=1) print('sync time:',t1) t2 = timeit.timeit('run2()',setup="from __main__ import run2",number=1) print('gevent time:',t2) t3 = timeit.timeit('run3()',setup="from __main__ import run3",number=1) print('thread time:',t3)
event_loop 事件循環:理解爲一個循環的池,裏面存放一些async關鍵詞定義的協程函數,只有放到循環池裏才能執行
coroutine 協程:協程對象,指一個使用async關鍵字定義的函數,它的調用不會當即執行函數,而是會返回一個協程對象。協程對象須要註冊到事件循環,由事件循環調用。
task 任務:一個協程對象就是一個原生能夠掛起的函數,任務則是對協程進一步封裝,其中包含任務的各類狀態。
async/await 關鍵字:python3.5 用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的異步調用接口。
import time import asyncio now = lambda : time.time() async def do_some_work(x): # 使用async關鍵字定義協程 print('Waiting: ', x) start = now() coroutine = do_some_work(2) # 建立協程對象 loop = asyncio.get_event_loop() # 建立一個事件循環(池) loop.run_until_complete(coroutine) # 將協程對象包裝並註冊協程對象 2 Waiting: 2 TIME: 0.0004658699035644531
import asyncio import time now = lambda : time.time() async def do_some_work(x): print('Waiting: ', x) start = now() coroutine = do_some_work(2) loop = asyncio.get_event_loop() # task = asyncio.ensure_future(coroutine) # 方式一 task = loop.create_task(coroutine) # 方式二 print(task) loop.run_until_complete(task) print(task) print('TIME: ', now() - start) 2 3 4 <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:17>> Waiting: 2 <Task finished coro=<do_some_work() done, defined at /Users/ghost/Rsj217/python3.6/async/async-main.py:17> result=None> TIME: 0.0003490447998046875
import asyncio async def test(x): return x+3 def callback(y): print(y.result()) coroutine = test(5) loop = asyncio.get_event_loop() task = loop.create_task(coroutine) task <Task pending coro=<test() running at <ipython-input-4-61142fef17d8>:1>> task.add_done_callback(callback) loop.run_until_complete(task) Out[10]: 8
import asyncio import time async def test(1): time.sleep(1) print(time.time()) tasks = [asyncio.ensure_future(test()) for _ in range(3)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) 1547187398.7611663 1547187399.7611988 1547187400.7632194 Out[8]: ({<Task finished coro=<test() done, defined at <ipython-input-5-1534f9ca2d8e>:4> result=None>, <Task finished coro=<test() done, defined at <ipython-input-5-1534f9ca2d8e>:4> result=None>, <Task finished coro=<test() done, defined at <ipython-input-5-1534f9ca2d8e>:4> result=None>}, set())
import asyncio import time async def test(t): await asyncio.sleep(1) print(time.time()) tasks = [asyncio.ensure_future(test()) for _ in range(3)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) 1547187398.7611663 1547187399.7611988 1547187400.7632194 Out[11]: ({<Task finished coro=<test() done, defined at <ipython-input-9-3a874803716b>:4> result=None>, <Task finished coro=<test() done, defined at <ipython-input-9-3a874803716b>:4> result=None>, <Task finished coro=<test() done, defined at <ipython-input-9-3a874803716b>:4> result=None>}, set())
原生異步函數(coroutine )
由 types.coroutine() 修飾的生成器,這個生成器能夠返回 coroutine 對象。
包含 __await 方法的對象返回的一個迭代器
Usage Most of aiomultiprocess mimics the standard multiprocessing module whenever possible, while accounting for places that benefit from async functionality. Executing a coroutine on a child process is as simple as: import asyncio from aiohttp import request from aiomultiprocess import Process async def put(url, params): async with request("PUT", url, params=params) as response: pass async def main(): p = Process(target=put, args=("https://jreese.sh", )) await p asyncio.run(main()) If you want to get results back from that coroutine, Worker makes that available: import asyncio from aiohttp import request from aiomultiprocess import Worker async def get(url): async with request("GET", url) as response: return await response.text("utf-8") async def main(): p = Worker(target=get, args=("https://jreese.sh", )) response = await p asyncio.run(main()) If you want a managed pool of worker processes, then use Pool: import asyncio from aiohttp import request from aiomultiprocess import Pool async def get(url): async with request("GET", url) as response: return await response.text("utf-8") async def main(): urls = ["https://jreese.sh", ...] async with Pool() as pool: result = await pool.map(get, urls) asyncio.run(main())
使用loop.run_until_complete(syncio.wait(tasks)) 也可使用 loop.run_until_complete(asyncio.gather(*tasks)) ,前者傳入task列表,會對task進行解包操做。
async def get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: print(response) print(time.time()) import time async def request(): url = "http://www.baidu.com" resulit = await get(url) tasks = [asyncio.ensure_future(request()) for _ in range(10000)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
async def get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: print(response) print(time.time()) async def request(): url = "http://www.baidu.com" tasks = [asyncio.ensure_future(url) for _ in range(1000)] 方式一: dones, pendings = await asyncio.wait(tasks) # 返回future對象,不返回直接結果 for task in dones: print('Task ret: ', task.result()) 方式二: results = await asyncio.gather(*tasks) # 直接返回結果 方式三: for task in asyncio.as_completed(tasks): result = await task print('Task ret: {}'.format(result)) # 迭代方式返回結果 tasks = asyncio.ensure_future(request()) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
方式一:適用於內嵌協程函數,先取內嵌協程任務 async def get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: print(response) print(time.time()) async def request(): url = "http://www.baidu.com" tasks = [asyncio.ensure_future(url) for _ in range(1000)] dones, pendings = await asyncio.wait(tasks) task = asyncio.ensure_future(request()) loop = asyncio.get_event_loop() try: loop.run_until_complete(task) except KeyboardInterrupt as e: asyncio.gather(*asyncio.Task.all_tasks()).cancel() loop.stop() loop.run_forever() finally: loop.close() 方式二:適用於無內嵌函數,直接遍歷協程任務 loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: for task in asyncio.Task.all_tasks(): print(task.cancel()) loop.stop() loop.run_forever() finally: loop.close()
