(操做系統採用了多道技術後,能夠控制進程的切換,或者說進程之間去爭搶cpu的執行權限。這種切換不只會在一個進程遇到io時進行,一個進程佔用cpu時間過長也會切換,或者說被操做系統奪走 cpu的執行權限)
1 一個程序至少有一個進程,一個進程至少有一個線程.(進程能夠理解成線程的容器) 2 進程在執行過程當中擁有獨立的內存單元,而多個線程共享內存,從而極大地提升了程序的運行效率。 3 線程在執行過程當中與進程仍是有區別的。每一個獨立的線程有一個程序運行的入口、順序執行序列和 4 程序的出口。可是線程不可以獨立執行,必須依存在應用程序中,由應用程序提供多個線程執行控制。 5 進程是具備必定獨立功能的程序關於某個數據集合上的一次運行活動,進程是系統進行資源分配和調 6 度的一個獨立單位. 7 線程是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位.線程 8 本身基本上不擁有系統資源,只擁有一點在運行中必不可少的資源(如程序計數器,一組寄存器和棧)可是 9 它可與同屬一個進程的其餘的線程共享進程所擁有的所有資源. 10 一個線程能夠建立和撤銷另外一個線程;同一個進程中的多個線程之間能夠併發執行.
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
翻譯:不管你啓多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只容許一個線程運行
hreading 模塊創建在thread 模塊之上。thread模塊以低級、原始的方式來處理和控制線程,而threading 模塊經過對thread進行二次封裝,提供了更方便的api來處理線程。
import threading import time def sayhi(num): #定義每一個線程要運行的函數 print("running on number:%s" %num) time.sleep(3) if __name__ == '__main__': t1 = threading.Thread(target=sayhi,args=(1,)) #生成一個線程實例 t2 = threading.Thread(target=sayhi,args=(2,)) #生成另外一個線程實例 t1.start() #啓動線程 t2.start() #啓動另外一個線程 print(t1.getName()) #獲取線程名 print(t2.getName())
import threading import time class MyThread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) self.num = num def run(self):#定義每一個線程要運行的函數 print("running on number:%s" %self.num) time.sleep(3) if __name__ == '__main__': t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start() print("ending......")
t1.join() 在子線程t1完成運行以前,這個子線程的父線程將一直被阻塞
1 import threading,time 2 3 def eat(money): #t1 4 print(time.ctime(), ' start to eat, need %s¥' %(money)) 5 time.sleep(3) 6 print(time.ctime(), ' eating..') 7 time.sleep(3) 8 print(time.ctime(), ' end to eat agin') 9 10 def drink(money): #t2 11 print(time.ctime(), ' start to drink, need %s¥' %(money)) 12 time.sleep(5) 13 print(time.ctime(), ' end to drink') 14 15 16 if __name__ == '__main__': 17 t1 = threading.Thread(target=eat, args=(15,)) #建立線程對象 18 # print(type(t1)) 19 t2 = threading.Thread(target=drink, args=(1,)) 20 21 # t1.start() #將t1加入主線程,t1不用cpu時就暫時保存t1的運行狀態,繼續往下執行t2,待t1須要cpu時再次執行 22 # t2.start() 23 24 t1.start() #將t1加入主線程,t1不用cpu時就暫時保存t1的運行狀態,繼續往下執行 25 t1.join() #繼續執行t1至t1結束 26 print('這是有關線程的一些演示01') 27 t2.start() 28 print('這是有關線程的一些演示02') 29 30 print('t1是不是活動的',t1.isAlive()) 31 print('t1的線程名', t1.getName())
import threading from time import ctime,sleep import time def ListenMusic(name): print ("Begin listening to %s. %s" %(name,ctime())) sleep(3) print("end listening %s"%ctime()) def RecordBlog(title): print ("Begin recording the %s! %s" %(title,ctime())) sleep(5) print('end recording %s'%ctime()) threads = [] t1 = threading.Thread(target=ListenMusic,args=('水手',)) t2 = threading.Thread(target=RecordBlog,args=('python線程',)) threads.append(t1) threads.append(t2) if __name__ == '__main__': # t1.setDaemon(True) # t2.setDaemon(True) for t in threads: #t.setDaemon(True) #注意:必定在start以前設置 t.start() # t.join() # t1.join() # t1.setDaemon(True) # t2.join()########考慮這三種join位置下的結果? print ("all over %s" %ctime())
# run(): 線程被cpu調度後自動執行線程對象的run方法 # start():啓動線程活動。 # isAlive(): 返回線程是否活動的。 # getName(): 返回線程名。 # setName(): 設置線程名。 threading模塊提供的一些方法: # threading.currentThread(): 返回當前的線程變量。 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
1 import time 2 import threading 3 4 def addNum(): 5 global num #在每一個線程中都獲取這個全局變量 6 #num-=1 7 8 temp=num 9 #print('--get num:',num ) 10 time.sleep(0.1) 11 num =temp-1 #對此公共變量進行-1操做 12 13 num = 100 #設定一個共享變量 14 thread_list = [] 15 for i in range(100): 16 t = threading.Thread(target=addNum) 17 t.start() 18 thread_list.append(t) 19 20 for t in thread_list: #等待全部線程執行完畢 21 t.join() 22 23 print('final num:', num )
1 R=threading.Lock() #獲取鎖的對象 2 3 #### 4 def sub(): 5 global num 6 R.acquire() #得到一把鎖 7 temp=num-1 8 time.sleep(0.1) 9 num=temp 10 R.release() #釋放鎖
1 import threading,time 2 3 class myThread(threading.Thread): 4 def funA(self): 5 lock_A.acquire() 6 print(self.name, 'get lock_A', time.ctime()) 7 time.sleep(2) 8 lock_B.acquire() 9 print(self.name, 'get lock_B', time.ctime()) 10 time.sleep(2) 11 lock_B.release() 12 lock_A.release() 13 14 def funB(self): 15 lock_B.acquire() 16 print(self.name, 'get lock_B', time.ctime()) 17 time.sleep(2) 18 lock_A.acquire() 19 print(self.name, 'get lock_A', time.ctime()) 20 time.sleep(2) 21 lock_A.release() 22 lock_B.release() 23 24 def run(self): 25 self.funA() 26 self.funB() 27 28 th_li = [] 29 lock_A = threading.Lock() 30 lock_B = threading.Lock() 31 32 if __name__ == '__main__': 33 for i in range(4): 34 th_li.append(myThread()) 35 for t in th_li: 36 t.start() 37 # 此時會產生死鎖,一個線程獲得了鎖B,並等待鎖A的釋放,另外一個線程獲得了鎖A,並等待鎖B的釋放, 38 # 這個時候雙方都不會主動釋放,形成死鎖 39 # for t in th_li: 40 # t.join()
1 # 遞歸鎖 2 import threading,time 3 4 class myThread(threading.Thread): 5 def funA(self): 6 lock.acquire() 7 print(self.name, 'get lock_A', time.ctime()) 8 time.sleep(1) 9 lock.acquire() 10 print(self.name, 'get lock_B', time.ctime()) 11 time.sleep(1) 12 lock.release() 13 lock.release() 14 15 def funB(self): 16 lock.acquire() 17 print(self.name, 'get lock_B', time.ctime()) 18 time.sleep(1) 19 lock.acquire() 20 print(self.name, 'get lock_A', time.ctime()) 21 time.sleep(1) 22 lock.release() 23 lock.release() 24 25 def run(self): 26 self.funA() 27 self.funB() 28 29 30 lock = threading.RLock() #遞歸鎖 31 th_li = [] 32 if __name__ == '__main__': 33 for i in range(4): 34 th_li.append(myThread()) 35 for t in th_li: 36 t.start()
An event is a simple synchronization object;the event represents an internal flag,
and threads can wait for the flag to be set, or set or clear the flag themselves.
event = threading.Event()
# a client thread can wait for the flag to be set
# a server thread can set or reset it
If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.
import time import threading class Account: def __init__(self, _id, balance): self.id = _id self.balance = balance self.lock = threading.RLock() def withdraw(self, amount): with self.lock: self.balance -= amount def deposit(self, amount): with self.lock: self.balance += amount def drawcash(self, amount):#lock.acquire中嵌套lock.acquire的場景 with self.lock: interest=0.05 count=amount+amount*interest self.withdraw(count) def transfer(_from, to, amount): #鎖不能夠加在這裏 由於其餘的其它線程執行的其它方法在不加鎖的狀況下數據一樣是不安全的 _from.withdraw(amount) to.deposit(amount) alex = Account('alex',1000) yuan = Account('yuan',1000) t1=threading.Thread(target = transfer, args = (alex,yuan, 100)) t1.start() t2=threading.Thread(target = transfer, args = (yuan,alex, 200)) t2.start() t1.join() t2.join() print('>>>',alex.balance) print('>>>',yuan.balance)
信號量用來控制線程併發數的,BoundedSemaphore或Semaphore管理一個內置的計數 器,每當調用acquire()時-1,調用release()時+1。
計數器不能小於0,當計數器爲 0時,acquire()將阻塞線程至同步鎖定狀態,直到其餘線程調用release()。(相似於停車位的概念)
BoundedSemaphore與Semaphore的惟一區別在於前者將在調用release()時檢查計數 器的值是否超過了計數器的初始值,若是超過了將拋出一個異常。
import threading,time class myThread(threading.Thread): def run(self): if semaphore.acquire(): print(self.name) time.sleep(5) semaphore.release() if __name__=="__main__": semaphore=threading.Semaphore(5) #設置線程併發數爲5 thrs=[] for i in range(100): thrs.append(myThread()) for t in thrs: t.start()
1 import threading,time 2 3 li=[1,2,3,4,5] 4 5 def pri(): 6 while li: 7 a=li[-1] 8 print(a) 9 time.sleep(1) 10 try: 11 li.remove(a) 12 except Exception as e: 13 print('----',a,e) 14 15 t1=threading.Thread(target=pri,args=()) 16 t1.start() 17 t2=threading.Thread(target=pri,args=()) 18 t2.start()
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
1 建立一個「隊列」對象 2 import Queue 3 q = Queue.Queue(maxsize = 10) 4 Queue.Queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過Queue的構造函數的可選參數maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。 5 6 將一個值放入隊列中 7 q.put(10) 8 調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;第二個block爲可選參數,默認爲 9 1。若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲0,put方法將引起Full異常。 10 11 將一個值從隊列中取出 12 q.get() 13 調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲True。若是隊列爲空且block爲True, 14 get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常。 15 16 Python Queue模塊有三種隊列及構造函數: 17 一、Python Queue模塊的FIFO隊列先進先出。 class queue.Queue(maxsize) 18 二、LIFO相似於堆,即先進後出。 class queue.LifoQueue(maxsize) 19 三、還有一種是優先級隊列級別越低越先出來。 class queue.PriorityQueue(maxsize) 20 21 此包中的經常使用方法(q = Queue.Queue()): 22 q.qsize() 返回隊列的大小 23 q.empty() 若是隊列爲空,返回True,反之False 24 q.full() 若是隊列滿了,返回True,反之False 25 q.full 與 maxsize 大小對應 26 q.get([block[, timeout]]) 獲取隊列,timeout等待時間 27 q.get_nowait() 至關q.get(False) 28 非阻塞 q.put(item) 寫入隊列,timeout等待時間 29 q.put_nowait(item) 至關q.put(item, False) 30 q.task_done() 在完成一項工做以後,q.task_done() 函數向任務已經完成的隊列發送一個信號 31 q.join() 實際上意味着等到隊列爲空,再執行別的操做
import queue #先進後出 q=queue.LifoQueue() q.put(34) q.put(56) q.put(12) #優先級 # q=queue.PriorityQueue() # q.put([5,100]) # q.put([7,200]) # q.put([3,"hello"]) # q.put([4,{"name":"alex"}]) while 1: data=q.get() print(data)
1 import time,random 2 import queue,threading 3 4 q = queue.Queue() 5 6 def Producer(name): 7 count = 0 8 while count <10: 9 print("making........") 10 time.sleep(random.randrange(3)) 11 q.put(count) 12 print('Producer %s has produced %s baozi..' %(name, count)) 13 count +=1 14 #q.task_done() 15 #q.join() 16 print("ok......") 17 def Consumer(name): 18 count = 0 19 while count <10: 20 time.sleep(random.randrange(4)) 21 if not q.empty(): 22 data = q.get() 23 #q.task_done() 24 #q.join() 25 print(data) 26 print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) 27 else: 28 print("-----no baozi anymore----") 29 count +=1 30 31 p1 = threading.Thread(target=Producer, args=('A',)) 32 c1 = threading.Thread(target=Consumer, args=('B',)) 33 # c2 = threading.Thread(target=Consumer, args=('C',)) 34 # c3 = threading.Thread(target=Consumer, args=('D',)) 35 p1.start() 36 c1.start() 37 # c2.start() 38 # c3.start()
is a package that supports spawning processes using an API similar to the threading module. The ultiprocessing
package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing
module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.multiprocessing
multiprocessing包是Python中的多進程管理包。與threading.Thread相似,它能夠利用multiprocessing.Process對象來建立一個進程。該進程能夠運行在Python程序內部編寫的函數。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象能夠像多線程那樣,經過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。因此,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境。
1 from multiprocessing import Process 2 import time 3 def f(name): 4 time.sleep(1) 5 print('hello', name,time.ctime()) 6 7 if __name__ == '__main__': 8 p_list=[] 9 for i in range(3): 10 p = Process(target=f, args=('alvin',)) 11 p_list.append(p) 12 p.start() 13 for i in p_list: 14 p.join() 15 print('end')
1 from multiprocessing import Process 2 import time 3 4 class MyProcess(Process): 5 def __init__(self): 6 super(MyProcess, self).__init__() 7 #self.name = name 8 9 def run(self): 10 time.sleep(1) 11 print ('hello', self.name,time.ctime()) 12 13 14 if __name__ == '__main__': 15 p_list=[] 16 for i in range(3): 17 p = MyProcess() 18 p.start() 19 p_list.append(p) 20 21 for p in p_list: 22 p.join() 23 24 print('end')
To show the individual process IDs involved, here is an expanded example:
from multiprocessing import Process import os import time def info(title): print("title:",title) print('parent process:', os.getppid()) print('process id:', os.getpid()) def f(name): info('function f') print('hello', name) if __name__ == '__main__': info('main process line') time.sleep(1) print("------------------") p = Process(target=info, args=('yuan',)) p.start() p.join()
Process([group [, target [, name [, args [, kwargs]]]]])
group: 線程組,目前尚未實現,庫引用中提示必須是None;
target: 要執行的方法;
name: 進程名;
args/kwargs: 要傳入方法的參數。
1 import time 2 from multiprocessing import Process 3 4 def foo(i): 5 time.sleep(1) 6 print (p.is_alive(),i,p.pid) 7 time.sleep(1) 8 9 if __name__ == '__main__': 10 p_list=[] 11 for i in range(10): 12 p = Process(target=foo, args=(i,)) 13 #p.daemon=True 14 p_list.append(p) 15 16 for p in p_list: 17 p.start() 18 # for p in p_list: 19 # p.join() 20 21 print('main process end')
1 from multiprocessing import Process, Queue 2 import queue 3 4 def f(q,n): 5 #q.put([123, 456, 'hello']) 6 q.put(n*n+1) 7 print("son process",id(q)) 8 9 if __name__ == '__main__': 10 q = Queue() #try: q=queue.Queue() 11 print("main process",id(q)) 12 13 for i in range(3): 14 p = Process(target=f, args=(q,i)) 15 p.start() 16 17 print(q.get()) 18 print(q.get()) 19 print(q.get())
The Pipe()
function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:
from multiprocessing import Process, Pipe def f(conn): conn.send([12, {"name":"yuan"}, 'hello']) response=conn.recv() print("response",response) conn.close() print("q_ID2:",id(child_conn)) if __name__ == '__main__': parent_conn, child_conn = Pipe() print("q_ID1:",id(child_conn)) p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" parent_conn.send("兒子你好!") p.join()
The two connection objects returned by Pipe()
represent the two ends of the pipe. Each connection object has send()
and recv()
methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.
A manager object returned by Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager()
will support types list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
and Array
. For example:
from multiprocessing import Process, Manager def f(d, l,n): d[n] = '1' d['2'] = 2 d[0.25] = None l.append(n) #print(l) print("son process:",id(d),id(l)) if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(5)) print("main process:",id(d),id(l)) p_list = [] for i in range(10): p = Process(target=f, args=(d,l,i)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)
Without using the lock output from the different processes is liable to get all mixed up.
from multiprocessing import Process, Lock def f(l, i): l.acquire() print('hello world %s' % i) l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
from multiprocessing import Process,Pool import time,os def Foo(i): time.sleep(1) print(i) return i+100 def Bar(arg): print(os.getpid()) print(os.getppid()) print('logger:',arg) pool = Pool(5) Bar(1) print("----------------") for i in range(10): #pool.apply(func=Foo, args=(i,)) #pool.apply_async(func=Foo, args=(i,)) pool.apply_async(func=Foo, args=(i,),callback=Bar) pool.close() pool.join() print('end')
優勢1: 協程極高的執行效率。由於子程序切換不是線程切換,而是由程序自身控制,所以,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優點就越明顯。
優勢2: 不須要多線程的鎖機制,由於只有一個線程,也不存在同時寫變量衝突,在協程中控制共享資源不加鎖,只須要判斷狀態就行了,因此執行效率比多線程高不少。
1 import time 2 import queue 3 4 def consumer(name): 5 print("--->ready to eat baozi...") 6 while True: 7 new_baozi = yield 8 print("[%s] is eating baozi %s" % (name,new_baozi)) 9 #time.sleep(1) 10 11 def producer(): 12 13 r = con.__next__() 14 r = con2.__next__() 15 n = 0 16 while 1: 17 time.sleep(1) 18 print("\033[32;1m[producer]\033[0m is making baozi %s and %s" %(n,n+1) ) 19 con.send(n) 20 con2.send(n+1) 21 22 n +=2 23 24 25 if __name__ == '__main__': 26 con = consumer("c1") 27 con2 = consumer("c2") 28 p = producer()
1 from greenlet import greenlet 2 3 4 def test1(): 5 print(12) 6 gr2.switch() 7 print(34) 8 gr2.switch() 9 10 11 def test2(): 12 print(56) 13 gr1.switch() 14 print(78) 15 16 17 gr1 = greenlet(test1) 18 gr2 = greenlet(test2) 19 gr1.switch()
協程: 碰見IO操做會自動切換進行下一個須要cpu的操做
1 import gevent 2 3 import requests,time 4 5 6 start=time.time() 7 8 def f(url): 9 print('GET: %s' % url) 10 resp =requests.get(url) 11 data = resp.text 12 print('%d bytes received from %s.' % (len(data), url)) 13 14 gevent.joinall([ 15 16 gevent.spawn(f, 'https://www.python.org/'), 17 gevent.spawn(f, 'https://www.yahoo.com/'), 18 gevent.spawn(f, 'https://www.baidu.com/'), 19 gevent.spawn(f, 'https://www.sina.com.cn/'), 20 21 ]) 22 23 # f('https://www.python.org/') 24 # 25 # f('https://www.yahoo.com/') 26 # 27 # f('https://baidu.com/') 28 # 29 # f('https://www.sina.com.cn/') 30 31 print("cost time:",time.time()-start)
如何打開一個文件,並寫入"hello world"
filename="my.txt" mode="w" f=open(filename,mode) f.write("hello world") f.close()
writer=open(filename,mode) try: writer.write("hello world") finally: writer.close()
with open(filename,mode) as writer: writer.write("hello world")
1 class echo(): 2 def output(self): 3 print "hello world" 4 def __enter__(self): 5 print "enter" 6 return self #能夠返回任何但願返回的東西 7 def __exit__(self,exception_type,value,trackback): 8 print "exit" 9 if exception_type==ValueError: 10 return True 11 else: 12 return Flase 13 14 >>>with echo as e: 15 e.output() 16 17 輸出: 18 enter 19 hello world 20 exit
def __exit__(self,exc_type,exc_value,exc_tb)
1 from contextlib import contextmanager 2 @contextmanager 3 def make_context(): 4 print 'enter' 5 try: 6 yield "ok" 7 except RuntimeError,err: 8 print 'error',err 9 finally: 10 print 'exit' 11 12 >>>with make_context() as value: 13 print value 14 15 輸出爲: 16 enter 17 ok 18 exit
@contextlib.contextmanager def loudLock(): print 'Locking' lock.acquire() yield print 'Releasing' lock.release() with loudLock(): print 'Lock is locked: %s' % lock.locked() print 'Doing something that needs locking' #Output: #Locking #Lock is locked: True #Doing something that needs locking #Releasing
with open(filename,mode) as reader:
with open(filename1,mode1) as writer:
with contextlib.nested(open(filename,mode),open(filename1,mode1)) as (reader,writer):
在python 2.7及之後,被一種新的語法取代:
with open(filename,mode) as reader,open(filename1,mode1) as writer: