一. 線程:python
基本使用git
線程鎖github
線程池緩存
隊列(生產者消費者模型)安全
二. 進程:服務器
基本使用網絡
進程鎖多線程
進程池併發
進程數據共享app
三. 協程:
gevent
greenlet
四. 緩存:
memcache
(一)線程:
全部的線程都運行於一個進程中,一個進程中能夠執行多個線程。多個線程共享進程內的資源。因此能夠將線程能夠當作是共享同一虛擬內存以及其餘屬性的進程。
Threading用於提供線程相關的操做,線程是應用程序中工做的最小單元。
Thread(target=None, name=None, args=(), kwargs={}) : 建立一個新的線程實例
target:可調用對象,線程啓動時,run()將調用此對象,默認爲None
name: 線程名
args: 傳遞給target的參數元組
kwargs: 傳遞給target的關鍵字參數字典
Thread的實例: t.start: #線程準備就緒,等待CPU調度 t.run: #線程被cpu調度後自動執行線程對象的run方法 t.join([timeout]): #等待直到線程終止或者出現超時爲止。 t.is_alive(): #若是線程是活動的。返回True,不然返回False t.name: #線程名稱 t.daemon: #線程的布爾型後臺標誌,必須在調用start()方法以前設置這個標誌
###以線程的形式建立和啓動一個函數:
1 import threading 2 import time 3 4 def clock(interval): 5 while True: 6 print("The time is %s" % time.ctime()) 7 time.sleep(interval) 8 9 t = threading.Thread(target=clock,args=(5,)) 10 #t.daemon = True 11 t.start() 12 13 The time is Sat Jul 23 02:08:58 2016 14 The time is Sat Jul 23 02:09:03 2016 15 The time is Sat Jul 23 02:09:08 2016
###將同一個線程定義爲一個類:
1 import threading 2 import time 3 4 class ClockThread(threading.Thread): 5 def __init__(self,interval): 6 threading.Thread.__init__(self) 7 self.interval = interval 8 def run(self): 9 while True: 10 print("The time is %s" % time.ctime()) 11 time.sleep(self.interval) 12 t = ClockThread(5) 13 t.start() 14 15 16 The time is Sat Jul 23 02:15:48 2016 17 The time is Sat Jul 23 02:15:53 2016 18 The time is Sat Jul 23 02:15:58 2016
Timer對象:
定時器,在某個時間執行某個函數
格式:
Timer(interval, func [,args [, kwargs]])
對象:
p.start(): 啓動定時器
p.cancel(): 若是函數還沒有執行,取消定時器
1 from threading import Timer 2 3 4 def hello(): 5 print("hello, world") 6 7 t = Timer(3, hello) 8 t.start() #3s後執行函數顯示"hello,word" 9 10 hello, world
信號量與有邊界的信號量(semaphore):
互斥鎖 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 ,每次調用acquire()方法時此計數器減1,每次調用release()方法時此計數器加1,若是計數器爲0,acquire方法將會阻塞,直到其餘線程調用release方法爲止。好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去。
Semaphore([value]) :建立一個信號量,value爲初始值,省略時,默認爲1
p.acquire([blocking]):獲取信號量
p.release() :經過將內部計數器值加1來釋放一個信號量。
BoundedSemaphore([value]): 建立一個新的信號機,與Semaphore的工做方式徹底相同,可是release()操做的次數不能超過acquire()操做次數
注: 信號機與互斥鎖的差異在於:
信號機可用於發射信號,能夠從不一樣線程調用以上兩個方法。
1 import threading,time 2 3 def run(n): 4 semaphore.acquire() 5 time.sleep(1) 6 print("run the thread: %s" %n) 7 semaphore.release() 8 9 if __name__ == '__main__': 10 11 num= 0 12 semaphore = threading.BoundedSemaphore(5) #最多容許5個線程同時運行 13 for i in range(10): 14 t = threading.Thread(target=run,args=(i,)) 15 t.start() 16 17 18 run the thread: 0 19 run the thread: 4 20 run the thread: 3 21 run the thread: 2 22 run the thread: 1 23 run the thread: 5 24 run the thread: 9 25 run the thread: 8 26 run the thread: 7 27 run the thread: 6
事件(Event):
用於在線程之間通訊。一個線程發出「事件」信號,一個或多個其它線程等待,Event實例管理者一個內部標誌,可使用set()方法將它置爲True,clear()置爲Flase, wait()方法將阻塞直到標誌位True.
Event()
e.is_set(): 當內部標誌位Ture時才返回True
e.set(): 將內部標誌設置爲True。等待它變爲True的全部線程都將被喚醒
e.clear(): 將內部標誌重置爲False
e.wait([timeout]): 阻塞直到內部標誌位True。
1 import threading 2 3 4 def do(event): 5 print 'start' 6 event.wait() 7 print 'execute' 8 9 10 event_obj = threading.Event() 11 for i in range(5): 12 t = threading.Thread(target=do, args=(event_obj,)) 13 t.start() 14 15 event_obj.clear() 16 inp = raw_input('input:') 17 if inp == 'true': 18 event_obj.set() 19 20 21 start 22 start 23 start 24 start 25 start 26 input:true 27 execute 28 execute 29 execute 30 execute 31 execute
條件(Condition):
使得線程等待,只有知足某條件時,才釋放n個線程
Condition([lock]) :建立一個條件變量,lock爲可選的Lock或RLock實例,爲指定則建立新的RLock實例供條件變量使用。
c.acquire(*args): 獲取底層鎖定
c.release(): 釋放底層鎖定
c.wait([timeout]): 等待直到得到通知或出現超時爲止
c.notify([n]) : 喚醒一個或多個等待此條件變量的線程。
c.notify_all(): 喚醒全部等待此條件的線程。
1 import threading 2 3 def condition_func(): 4 5 ret = False 6 inp = input('>>>') 7 if inp == '1': 8 ret = True 9 10 return ret 11 12 13 def run(n): 14 con.acquire() 15 con.wait_for(condition_func) 16 print("run the thread: %s" %n) 17 con.release() 18 19 if __name__ == '__main__': 20 21 con = threading.Condition() 22 for i in range(10): 23 t = threading.Thread(target=run, args=(i,)) 24 t.start() 25 26 >>>1 27 run the thread: 0 28 >>>1 29 run the thread: 1 30 >>>1 31 run the thread: 2 32 >>>1 33 run the thread: 3
線程池:
線程池是一個存放不少線程的單位,同時還有一個對應的任務隊列。整個執行過程其實就是使用線程池中已有有限的線程把任務 隊列中的任務作完。
1 import queue,threading,time 2 3 class ThreadPool: 4 def __init__(self,maxsize = 5): 5 self.maxsize = maxsize 6 self._q = queue.Queue(maxsize) 7 for i in range(maxsize): 8 self._q.put(threading.Thread) 9 10 def get_thread(self): 11 return self._q.get() 12 13 def add_thread(self): 14 self._q.put(threading.Thread) 15 16 pool = ThreadPool(5) 17 18 def task(arg, p): 19 print(arg) 20 time.sleep(1) 21 p.add_thread() 22 23 for i in range(10): 24 #threading.Thread類 25 t = pool.get_thread() 26 obj = t(target = task, args = (i,pool)) 27 obj.start() 28 29 from threading import Timer 30 def hello(): 31 print("hello,word") 32 t = Timer(3,hello) 33 t.start() 34 35 36 0 37 1 38 2 39 3 40 4 41 5 42 6 43 7 44 8 45 9 46 1 47 run the thread: 1 48 run the thread: 0 49 2 50 run the thread: 3 51 run the thread: 2 52 hello,word 53 3 54 run the thread: 4 55 4 56 5 57 6 58 7 59 8 60 9 61 10
隊列:
隊列是線程間最經常使用的交換數據的形式。queue模塊是提供隊列操做的模塊,實現了各類多生產者,多使用者隊列,可用於執行多個線程之間安全地交換信息。
queue模塊定義了3種不一樣的隊列類:
1. Queue([maxsize]): FIFO(先進先出)隊列。maxsize爲可放入項目的最大量。不設置或者爲0時,隊列無窮大。
2. LifoQueue([maxsize]): LIFO(後進先出)隊列。也叫棧。
3. PriorityQueue([maxsize]): 優先級隊列,項目按優先級從低到高排列,格式爲(priority, data)形式的元組, priority爲一個數字。
1 實例以下: 2 3 1 q.qsize(): #返回隊列的正確大小 4 2 q.empty(): #若是隊列爲空,則返回True 5 3 q.full():#若是隊列已滿,返回True 6 4 q.put(item [, block [, timeout): #將item放入隊列. block,調用者將被阻塞直到隊列中有可用的空閒位置便可。 7 5 q.put_nowait(item): #與q.put沒什麼差異 8 6 q.get([block [, timeout]]):3 從隊列中刪除一項,而後返回這個項目 9 7 q.get_nowait():#至關於get(0) 10 8 q.task_done(): 隊列中數據的使用者用來指示對於項目的處理意見結束。 11 9 q.join(): 阻塞直到隊列中的全部項目均被刪除和處理爲止。
案例:
(先進先出)
1 import queue 2 q = queue.Queue(2) 3 print(q.empty()) 4 q.put(11) 5 q.put(22) 6 print(q.empty()) 7 print(q.qsize()) 8 print(q.get()) 9 print(q.get()) 10 q.put(33,block=False) 11 q.put(33,block=False,timeout=2) 12 print(q.get(timeout=2)) 13 14 q = queue.Queue(5) 15 16 q.put(123) 17 q.put(456) 18 print(q.get()) 19 q.task_done() 20 print(q.get()) 21 q.task_done() 22 q.join()
1 #queue.LifoQueue, #後進先出隊列 2 3 q = queue.LifoQueue() 4 q.put(123) 5 q.put(456) 6 print(q.get())
1 # queue.PriorityQueue,優先級隊列 2 3 # q = queue.PriorityQueue() 4 # q.put((8, 'hong')) 5 # q.put((2, 345)) 6 # q.put((3, 678)) 7 # print(q.get())
1 # queue.deque,雙向對隊 2 3 # q = queue.deque() 4 # q.append(123) 5 # q.append(333) 6 # q.appendleft(555) 7 # 8 # print(q.pop()) 9 # print(q.popleft())
生產者與消費者模型:
生產者的工做是產生一塊數據,放到buffer中,如此循環。與此同時,消費者在消耗這些數據(例如從buffer中把它們移除),每次一塊。這裏的關鍵詞是「同時」。因此生產者和消費者是併發運行的,咱們須要對生產者和消費者作線程分離。
1 import queue 2 import threading 3 import time 4 q = queue.Queue() 5 6 def productor(arg): 7 """ 8 買票 9 :param arg: 10 :return: 11 """ 12 q.put(str(arg) + '- 買票') 13 14 for i in range(20): 15 t = threading.Thread(target=productor,args=(i,)) 16 t.start()
(二)進程:
進程是程序的一次執行,每一個進程都有本身的地址空間,內存,數據棧。建立進程的時候,內核會爲進程分配必定的資源,並在進程存活的時候不斷進行調整,好比內存,進程建立的時候會佔有一部份內存。進程結束的時候資源會釋放出來,來讓其餘資源使用。咱們能夠把進程理解爲一種容器,容器內的資源可多可少,可是隻能進程間通訊,不能共享信息。
談到進程則要用到的就是multiprocessing模塊,這個模塊的全部功能基本都是在進程上的。
定義一個類運行一個進程:
process([,target [,name [,args [,kwargs]]]])
target: 當進程啓動時執行的可調用對象
name: 爲進程執行描述性名稱的字符串
args: 位置參數,元組
kwargs: 位置參數,字典
經過這個構造函數簡單構造了一個process進程。
進程(process)實例:
p.is_alive() #若是p仍然運行,返回True p.join([timeout]) #等待進程p終止,timeout是可選的超時期限。進程可被鏈接無數次,但鏈接自身時則會報錯 p.run()# 啓動進程時運行的方法,可調用target。 p.start() #啓動進程,表明進程的子進程,並調用p.run()函數 p.terminate()#強制終止進程。進程p被當即終止,而不會進行清理,慎用。
單進程實例:
import multiprocessing import time def clock(interval): while True: print("The time is %s" % time.ctime()) time.sleep(interval) if __name__ == '__main__': p = multiprocessing.Process(target=clock, args=(5,)) p.start() The time is Fri Jul 22 17:15:45 2016 The time is Fri Jul 22 17:15:50 2016 The time is Fri Jul 22 17:15:55 2016
將上面的進程定義爲繼承自Process的類,目的爲爲了實現跨平臺的可移植性,必須有主程序建立進程。
1 import multiprocessing 2 import time 3 4 class ClockProcess(multiprocessing.Process): 5 def __init__(self, interval): 6 multiprocessing.Process.__init__(self) 7 self.interval = interval 8 9 def run(self): 10 while True: 11 print("The time is %s" % time.ctime()) 12 time.sleep(self.interval) 13 if __name__ == '__main__': 14 p = ClockProcess(5) 15 p.start() 16 17 The time is Fri Jul 22 17:25:08 2016 18 The time is Fri Jul 22 17:25:13 2016 19 The time is Fri Jul 22 17:25:18 2016
進程鎖:
當多個進程須要訪問共享資源的時候,Lock能夠用來避免訪問的衝突。
1 import multiprocessing 2 import sys 3 4 def worker_with(lock, f): 5 with lock: 6 fs = open(f,"a+") 7 fs.write('Lock acquired via \n') 8 fs.close() 9 10 def worker_no_with(lock, f): 11 lock.acquire() 12 try: 13 fs = open(f,"a+") 14 fs.write('Lock acquired directly\n') 15 fs.close() 16 finally: 17 lock.release() 18 19 if __name__ == "__main__": 20 21 f = "file.txt" 22 23 lock = multiprocessing.Lock() 24 w = multiprocessing.Process(target=worker_with, args=(lock, f)) 25 nw = multiprocessing.Process(target=worker_no_with, args=(lock, f)) 26 27 w.start() 28 nw.start() 29 30 w.join() 31 nw.join() 32 33 34 #cat file.txt 35 36 Lock acquired directly 37 Lock acquired via
注:若是兩個進程沒有使用lock來同步,則他們對同一個文件的寫操做可能會出現混亂。
進程池:
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。
建立一個進程池:
Pool([numprocess [,initializer [, initargs]]])
numprocess: 要建立的進程數
initlalizer: 每一個工做進程啓動時要執行的可調用對象,默認爲None
initargs: 傳遞給initlalizer的元組
Pool的實例:
p.apply(func, [, args [, kwargs]])#在一個池工做進程中執行函數(**args, **kwargs),而後返回結果,不能再池中並行執行,可以使用apply_async p.apply_async(func, [, args [, kwargs [,callback]]])#在一個池工做進程中異步執行函數(**args, **kwargs),而後返回結果,傳遞給callback。 p.terminate()#當即終止 p.close()# 關閉進程池 p.join()# 等待全部工做進程退出
案例:
1 from multiprocessing import Pool 2 import time 3 4 def f1(arg): 5 time.sleep(3) 6 print(arg) 7 8 if __name__ == '__main__': 9 pool = Pool(5) #併發執行5個函數 10 11 for i in range(15): 12 #pool.apply(func=f1,args=(i,))#不能併發的執行函數 13 pool.apply_async(func=f1,args=(i,))#可併發執行函數 14 15 pool.close() #全部的任務執行完畢 16 time.sleep(3) 17 #pool.terminate()#當即終止 18 pool.join()
進程數據共享:
一般進程之間是徹底孤立的,使用數據共享,能夠訪問多個進程。
實現進程數據共享有兩種方法:
1 #方法一,Array 2 3 from multiprocessing import Process 4 from multiprocessing import Array 5 6 def foo(i,arg): 7 arg[i] = i + 100 8 for item in arg: 9 print(item) 10 print('============') 11 12 if __name__ == '__main__': 13 li = Array('i',10) 14 for i in range(10): 15 p = Process(target=foo,args=(i,li,)) 16 p.start()
1 #方法二:manage.dict()共享數據 2 3 from multiprocessing import Process 4 from multiprocessing import Manager 5 # 6 def foo(i,arg): 7 arg[i] = i + 100 8 print(arg.values()) 9 10 if __name__ == '__main__': 11 obj = Manager() 12 li = obj.dict() 13 for i in range(10): 14 p = Process(target=foo,args=(i,li,)) 15 p.start() 16 import time 17 time.sleep(1)
線程鎖(Lock, RLock):
因爲線程之間是進行隨機調度,而且每一個線程可能只執行n條執行以後,當多個線程同時修改同一條數據時可能會出現髒數據,因此,出現了線程鎖 - 同一時刻容許一個線程執行操做。
Lock():建立新的Lock對象,初始化狀態爲非鎖定
lock.acquire([blocking]): 獲取鎖定
lock.release(): 釋放鎖定
1 import threading,time 2 3 def run(n): 4 semaphore.acquire() 5 time.sleep(1) 6 print("run the thread: %s" %n) 7 semaphore.release() 8 9 if __name__ == '__main__': 10 11 num= 0 12 semaphore = threading.BoundedSemaphore(2) #最多容許5個線程同時運行 13 for i in range(5): 14 t = threading.Thread(target=run,args=(i,)) 15 t.start() 16 17 18 1 19 run the thread: 1 20 run the thread: 0 21 2 22 run the thread: 3 23 run the thread: 2 24 3 25 run the thread: 4 26 4 27 5 28 6 29 7 30 8 31 9 32 10
(三)協程:
協程咱們能夠當作是一種用戶空間的線程,利用一個線程,分解一個線程成爲多個「微線程」
Python經過yield
提供了對協程的基本支持,可是不徹底。而第三方的gevent爲Python提供了比較完善的協程支持。
gevent是第三方庫,經過greenlet實現協程,其基本思想是:
當一個greenlet遇到IO操做時,好比訪問網絡,就自動切換到其餘的greenlet,等到IO操做完成,再在適當的時候切換回來繼續執行。因爲IO操做很是耗時,常常使程序處於等待狀態,有了gevent爲咱們自動切換協程,就保證總有greenlet在運行,而不是等待IO。
因爲切換是在IO操做時自動完成,因此gevent須要修改Python自帶的一些標準庫,這一過程在啓動時經過monkey patch完成:
1 from gevent import monkey; monkey.patch_socket() 2 import gevent 3 4 def f(n): 5 for i in range(n): 6 print gevent.getcurrent(), i 7 8 g1 = gevent.spawn(f, 5) 9 g2 = gevent.spawn(f, 5) 10 g3 = gevent.spawn(f, 5) 11 g1.join() 12 g2.join() 13 g3.join() 14 15 16 <Greenlet at 0x10e49f550: f(5)> 0 17 <Greenlet at 0x10e49f550: f(5)> 1 18 <Greenlet at 0x10e49f550: f(5)> 2 19 <Greenlet at 0x10e49f550: f(5)> 3 20 <Greenlet at 0x10e49f550: f(5)> 4 21 <Greenlet at 0x10e49f910: f(5)> 0 22 <Greenlet at 0x10e49f910: f(5)> 1 23 <Greenlet at 0x10e49f910: f(5)> 2 24 <Greenlet at 0x10e49f910: f(5)> 3 25 <Greenlet at 0x10e49f910: f(5)> 4 26 <Greenlet at 0x10e49f4b0: f(5)> 0 27 <Greenlet at 0x10e49f4b0: f(5)> 1 28 <Greenlet at 0x10e49f4b0: f(5)> 2 29 <Greenlet at 0x10e49f4b0: f(5)> 3 30 <Greenlet at 0x10e49f4b0: f(5)> 4
能夠看到,3個greenlet是依次運行而不是交替運行。
要讓greenlet交替運行,能夠經過gevent.sleep()
交出控制權:
def f(n): for i in range(n): print gevent.getcurrent(), i gevent.sleep(0) <Greenlet at 0x10cd58550: f(5)> 0 <Greenlet at 0x10cd58910: f(5)> 0 <Greenlet at 0x10cd584b0: f(5)> 0 <Greenlet at 0x10cd58550: f(5)> 1 <Greenlet at 0x10cd584b0: f(5)> 1 <Greenlet at 0x10cd58910: f(5)> 1 <Greenlet at 0x10cd58550: f(5)> 2 <Greenlet at 0x10cd58910: f(5)> 2 <Greenlet at 0x10cd584b0: f(5)> 2 <Greenlet at 0x10cd58550: f(5)> 3 <Greenlet at 0x10cd584b0: f(5)> 3 <Greenlet at 0x10cd58910: f(5)> 3 <Greenlet at 0x10cd58550: f(5)> 4 <Greenlet at 0x10cd58910: f(5)> 4 <Greenlet at 0x10cd584b0: f(5)> 4
3個greenlet交替運行,
把循環次數改成500000,讓它們的運行時間長一點,而後在操做系統的進程管理器中看,線程數只有1個。
固然,實際代碼裏,咱們不會用gevent.sleep()
去切換協程,而是在執行到IO操做時,gevent自動切換,代碼以下:
1 from gevent import monkey; monkey.patch_all() 2 import gevent 3 import urllib2 4 5 def f(url): 6 print('GET: %s' % url) 7 resp = urllib2.urlopen(url) 8 data = resp.read() 9 print('%d bytes received from %s.' % (len(data), url)) 10 11 gevent.joinall([ 12 gevent.spawn(f, 'https://www.python.org/'), 13 gevent.spawn(f, 'https://www.yahoo.com/'), 14 gevent.spawn(f, 'https://github.com/'), 15 ]) 16 17 18 GET: https://www.python.org/ 19 GET: https://www.yahoo.com/ 20 GET: https://github.com/ 21 45661 bytes received from https://www.python.org/. 22 14823 bytes received from https://github.com/. 23 304034 bytes received from https://www.yahoo.com/.
從結果看,3個網絡操做是併發執行的,並且結束順序不一樣,但只有一個線程。
(四)緩存
memcache:
下載: wget http://ftp.tummy.com/pub/python-memcached/old-releases/python-memcached-1.54.tar.gz(本身更新最新版)
解壓縮:tar -zxvf python-memcached-1.54.tar.gz
安裝: python setup.py install
啓動:memcached
-
d
-
m
10
-
u root
-
l 127.0.0.1
-
p 11511
-
c
256
-
P
/
tmp
/
memcached.pid
參數說明:
-
d 是啓動一個守護進程
-
m 是分配給Memcache使用的內存數量,單位是MB
-
u 是運行Memcache的用戶
-
l 是監聽的服務器IP地址
-
p 是設置Memcache監聽的端口,最好是
1024
以上的端口
-
c 選項是最大運行的併發鏈接數,默認是
1024
,按照你服務器的負載量來設定
-
P 是設置保存Memcache的pid文件
代碼:
import memcache class MemcachedClient(): ''' python memcached 客戶端操做示例 ''' def __init__(self, hostList): self.__mc = memcache.Client(hostList); def set(self, key, value): result = self.__mc.set("name", "hongfei") return result def get(self, key): name = self.__mc.get("name") return name def delete(self, key): result = self.__mc.delete("name") return result if __name__ == '__main__': mc = MemcachedClient(["127.0.0.1:11511", "127.0.0.1:11512"]) key = "name" result = mc.set(key, "hongfei") print("set的結果:", result) name = mc.get(key) print ("get的結果:", name) result = mc.delete(key) print ("delete的結果:", result) set的結果: True get的結果: hongfei delete的結果: 1
很抱歉,時間有點倉促,寫的不是很細,有點亂,之後慢慢補充整理,謝謝查看。