Python網絡編程之線程,進程

 

  

  一. 線程:python

      基本使用git

      線程鎖github

      線程池緩存

      隊列(生產者消費者模型)安全

  二. 進程:服務器

       基本使用網絡

       進程鎖多線程

                進程池併發

                進程數據共享app

   

    三. 協程:

      gevent

      greenlet

    四. 緩存:

      memcache

      

  

  

  (一)線程:

       全部的線程都運行於一個進程中,一個進程中能夠執行多個線程。多個線程共享進程內的資源。因此能夠將線程能夠當作是共享同一虛擬內存以及其餘屬性的進程。

       Threading用於提供線程相關的操做,線程是應用程序中工做的最小單元。

       Thread(target=None, name=None, args=(), kwargs={}) : 建立一個新的線程實例

target:可調用對象,線程啓動時,run()將調用此對象,默認爲None

name: 線程名

args: 傳遞給target的參數元組

kwargs: 傳遞給target的關鍵字參數字典

Thread的實例:

t.start: #線程準備就緒,等待CPU調度
t.run: #線程被cpu調度後自動執行線程對象的run方法
t.join([timeout]): #等待直到線程終止或者出現超時爲止。
t.is_alive(): #若是線程是活動的。返回True,不然返回False
t.name: #線程名稱
t.daemon: #線程的布爾型後臺標誌,必須在調用start()方法以前設置這個標誌

 

###以線程的形式建立和啓動一個函數:

 1 import threading
 2 import time
 3 
 4 def clock(interval):
 5     while True:
 6         print("The time is %s" % time.ctime())
 7         time.sleep(interval)
 8 
 9 t = threading.Thread(target=clock,args=(5,))
10 #t.daemon = True
11 t.start()
12 
13 The time is Sat Jul 23 02:08:58 2016
14 The time is Sat Jul 23 02:09:03 2016
15 The time is Sat Jul 23 02:09:08 2016

###將同一個線程定義爲一個類:

 1 import threading
 2 import time
 3 
 4 class ClockThread(threading.Thread):
 5     def __init__(self,interval):
 6         threading.Thread.__init__(self)
 7         self.interval = interval
 8     def run(self):
 9         while True:
10             print("The time is %s" % time.ctime())
11             time.sleep(self.interval)
12 t = ClockThread(5)
13 t.start()
14 
15 
16 The time is Sat Jul 23 02:15:48 2016
17 The time is Sat Jul 23 02:15:53 2016
18 The time is Sat Jul 23 02:15:58 2016

 

  Timer對象:

      定時器,在某個時間執行某個函數

    格式: 

      Timer(interval, func [,args [, kwargs]])

    對象:

     p.start(): 啓動定時器

     p.cancel(): 若是函數還沒有執行,取消定時器

 1 from threading import Timer
 2  
 3  
 4 def hello():
 5     print("hello, world")
 6  
 7 t = Timer(3, hello)
 8 t.start()  #3s後執行函數顯示"hello,word"
 9 
10 hello, world

  信號量與有邊界的信號量(semaphore):

     互斥鎖 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 ,每次調用acquire()方法時此計數器減1,每次調用release()方法時此計數器加1,若是計數器爲0,acquire方法將會阻塞,直到其餘線程調用release方法爲止。好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去。

  Semaphore([value]) :建立一個信號量,value爲初始值,省略時,默認爲1

    p.acquire([blocking]):獲取信號量

    p.release() :經過將內部計數器值加1來釋放一個信號量。

  BoundedSemaphore([value]): 建立一個新的信號機,與Semaphore的工做方式徹底相同,可是release()操做的次數不能超過acquire()操做次數

 

注: 信號機與互斥鎖的差異在於:

      信號機可用於發射信號,能夠從不一樣線程調用以上兩個方法。

 1 import threading,time
 2 
 3 def run(n):
 4     semaphore.acquire()
 5     time.sleep(1)
 6     print("run the thread: %s" %n)
 7     semaphore.release()
 8 
 9 if __name__ == '__main__':
10 
11     num= 0
12     semaphore  = threading.BoundedSemaphore(5) #最多容許5個線程同時運行
13     for i in range(10):
14         t = threading.Thread(target=run,args=(i,))
15         t.start()
16 
17 
18 run the thread: 0
19 run the thread: 4
20 run the thread: 3
21 run the thread: 2
22 run the thread: 1
23 run the thread: 5
24 run the thread: 9
25 run the thread: 8
26 run the thread: 7
27 run the thread: 6

  事件(Event):

    用於在線程之間通訊。一個線程發出「事件」信號,一個或多個其它線程等待,Event實例管理者一個內部標誌,可使用set()方法將它置爲True,clear()置爲Flase, wait()方法將阻塞直到標誌位True.

    Event()

e.is_set(): 當內部標誌位Ture時才返回True

e.set(): 將內部標誌設置爲True。等待它變爲True的全部線程都將被喚醒

e.clear(): 將內部標誌重置爲False

e.wait([timeout]): 阻塞直到內部標誌位True。

 1 import threading
 2  
 3  
 4 def do(event):
 5     print 'start'
 6     event.wait()
 7     print 'execute'
 8  
 9  
10 event_obj = threading.Event()
11 for i in range(5):
12     t = threading.Thread(target=do, args=(event_obj,))
13     t.start()
14  
15 event_obj.clear()
16 inp = raw_input('input:')
17 if inp == 'true':
18     event_obj.set()
19 
20 
21 start
22 start
23 start
24 start
25 start
26 input:true
27 execute
28 execute
29 execute
30 execute
31 execute

  條件(Condition):

    使得線程等待,只有知足某條件時,才釋放n個線程

    Condition([lock]) :建立一個條件變量,lock爲可選的Lock或RLock實例,爲指定則建立新的RLock實例供條件變量使用。

  c.acquire(*args): 獲取底層鎖定

  c.release(): 釋放底層鎖定

  c.wait([timeout]): 等待直到得到通知或出現超時爲止

  c.notify([n]) : 喚醒一個或多個等待此條件變量的線程。

  c.notify_all(): 喚醒全部等待此條件的線程。

 1 import threading
 2 
 3 def condition_func():
 4 
 5     ret = False
 6     inp = input('>>>')
 7     if inp == '1':
 8         ret = True
 9 
10     return ret
11 
12 
13 def run(n):
14     con.acquire()
15     con.wait_for(condition_func)
16     print("run the thread: %s" %n)
17     con.release()
18 
19 if __name__ == '__main__':
20 
21     con = threading.Condition()
22     for i in range(10):
23         t = threading.Thread(target=run, args=(i,))
24         t.start()
25 
26 >>>1
27 run the thread: 0
28 >>>1
29 run the thread: 1
30 >>>1
31 run the thread: 2
32 >>>1
33 run the thread: 3

  線程池:

    線程池是一個存放不少線程的單位,同時還有一個對應的任務隊列。整個執行過程其實就是使用線程池中已有有限的線程把任務 隊列中的任務作完。

 

 1 import queue,threading,time
 2 
 3 class ThreadPool:
 4     def __init__(self,maxsize = 5):
 5         self.maxsize = maxsize
 6         self._q = queue.Queue(maxsize)
 7         for i in range(maxsize):
 8             self._q.put(threading.Thread)
 9 
10     def get_thread(self):
11         return self._q.get()
12 
13     def add_thread(self):
14         self._q.put(threading.Thread)
15 
16 pool = ThreadPool(5)
17 
18 def task(arg, p):
19     print(arg)
20     time.sleep(1)
21     p.add_thread()
22 
23 for i in range(10):
24     #threading.Thread類
25     t = pool.get_thread()
26     obj = t(target = task, args = (i,pool))
27     obj.start()
28 
29 from threading import Timer
30 def hello():
31     print("hello,word")
32 t = Timer(3,hello)
33 t.start()
34 
35 
36 0
37 1
38 2
39 3
40 4
41 5
42 6
43 7
44 8
45 9
46 1
47 run the thread: 1
48 run the thread: 0
49 2
50 run the thread: 3
51 run the thread: 2
52 hello,word
53 3
54 run the thread: 4
55 4
56 5
57 6
58 7
59 8
60 9
61 10
View Code

  

  隊列:

    隊列是線程間最經常使用的交換數據的形式。queue模塊是提供隊列操做的模塊,實現了各類多生產者,多使用者隊列,可用於執行多個線程之間安全地交換信息。

  queue模塊定義了3種不一樣的隊列類:

    1. Queue([maxsize]):  FIFO(先進先出)隊列。maxsize爲可放入項目的最大量。不設置或者爲0時,隊列無窮大。

    2. LifoQueue([maxsize]): LIFO(後進先出)隊列。也叫棧。

    3. PriorityQueue([maxsize]): 優先級隊列,項目按優先級從低到高排列,格式爲(priority, data)形式的元組, priority爲一個數字。

 1  實例以下:
 2   
 3  1 q.qsize(): #返回隊列的正確大小
 4  2 q.empty(): #若是隊列爲空,則返回True
 5  3 q.full():#若是隊列已滿,返回True
 6  4 q.put(item [, block [, timeout): #將item放入隊列. block,調用者將被阻塞直到隊列中有可用的空閒位置便可。
 7  5 q.put_nowait(item): #與q.put沒什麼差異
 8  6 q.get([block [, timeout]]):3 從隊列中刪除一項,而後返回這個項目
 9  7 q.get_nowait():#至關於get(0)
10 8 q.task_done(): 隊列中數據的使用者用來指示對於項目的處理意見結束。
11 9 q.join(): 阻塞直到隊列中的全部項目均被刪除和處理爲止。

案例: 

(先進先出)

 1 import queue
 2 q = queue.Queue(2)
 3 print(q.empty())
 4 q.put(11)
 5 q.put(22)
 6 print(q.empty())
 7 print(q.qsize())
 8 print(q.get())
 9 print(q.get())
10 q.put(33,block=False)
11 q.put(33,block=False,timeout=2)
12 print(q.get(timeout=2))
13 
14 q = queue.Queue(5)
15 
16 q.put(123)
17 q.put(456)
18 print(q.get())
19 q.task_done()
20 print(q.get())
21 q.task_done()
22 q.join()
1 #queue.LifoQueue, #後進先出隊列
2 
3 q = queue.LifoQueue()
4 q.put(123)
5 q.put(456)
6 print(q.get())
1 # queue.PriorityQueue,優先級隊列
2 
3 # q = queue.PriorityQueue()
4 # q.put((8, 'hong'))
5 # q.put((2, 345))
6 # q.put((3, 678))
7 # print(q.get())
1 # queue.deque,雙向對隊
2 
3 # q = queue.deque()
4 # q.append(123)
5 # q.append(333)
6 # q.appendleft(555)
7 #
8 # print(q.pop())
9 # print(q.popleft())

  生產者與消費者模型:

     生產者的工做是產生一塊數據,放到buffer中,如此循環。與此同時,消費者在消耗這些數據(例如從buffer中把它們移除),每次一塊。這裏的關鍵詞是「同時」。因此生產者和消費者是併發運行的,咱們須要對生產者和消費者作線程分離。    

 1 import queue
 2 import threading
 3 import time
 4 q = queue.Queue()
 5 
 6 def productor(arg):
 7     """
 8     買票
 9     :param arg:
10     :return:
11     """
12     q.put(str(arg) + '- 買票')
13 
14 for i in range(20):
15     t = threading.Thread(target=productor,args=(i,))
16     t.start()

   

  (二)進程:

        進程是程序的一次執行,每一個進程都有本身的地址空間,內存,數據棧。建立進程的時候,內核會爲進程分配必定的資源,並在進程存活的時候不斷進行調整,好比內存,進程建立的時候會佔有一部份內存。進程結束的時候資源會釋放出來,來讓其餘資源使用。咱們能夠把進程理解爲一種容器,容器內的資源可多可少,可是隻能進程間通訊,不能共享信息。

               談到進程則要用到的就是multiprocessing模塊,這個模塊的全部功能基本都是在進程上的。

       定義一個類運行一個進程:

       process([,target [,name [,args [,kwargs]]]])

target: 當進程啓動時執行的可調用對象

name: 爲進程執行描述性名稱的字符串

args: 位置參數,元組

kwargs: 位置參數,字典

經過這個構造函數簡單構造了一個process進程。

  進程(process)實例:

p.is_alive() #若是p仍然運行,返回True
p.join([timeout]) #等待進程p終止,timeout是可選的超時期限。進程可被鏈接無數次,但鏈接自身時則會報錯
p.run()# 啓動進程時運行的方法,可調用target。
p.start() #啓動進程,表明進程的子進程,並調用p.run()函數
p.terminate()#強制終止進程。進程p被當即終止,而不會進行清理,慎用。

單進程實例:

import multiprocessing
import time

def clock(interval):
    while True:
        print("The time is %s" % time.ctime())
        time.sleep(interval)
if __name__ == '__main__':
    p = multiprocessing.Process(target=clock, args=(5,))
    p.start()

The time is Fri Jul 22 17:15:45 2016
The time is Fri Jul 22 17:15:50 2016
The time is Fri Jul 22 17:15:55 2016
將上面的進程定義爲繼承自Process的類,目的爲爲了實現跨平臺的可移植性,必須有主程序建立進程。

 1 import multiprocessing
 2 import time
 3 
 4 class ClockProcess(multiprocessing.Process):
 5     def __init__(self, interval):
 6         multiprocessing.Process.__init__(self)
 7         self.interval = interval
 8 
 9     def run(self):
10         while True:
11             print("The time is %s" % time.ctime())
12             time.sleep(self.interval)
13 if __name__ == '__main__':
14     p = ClockProcess(5)
15     p.start()
16 
17 The time is Fri Jul 22 17:25:08 2016
18 The time is Fri Jul 22 17:25:13 2016
19 The time is Fri Jul 22 17:25:18 2016

     進程鎖:

     當多個進程須要訪問共享資源的時候,Lock能夠用來避免訪問的衝突。

 1 import multiprocessing
 2 import sys
 3 
 4 def worker_with(lock, f):
 5     with lock:
 6         fs = open(f,"a+")
 7         fs.write('Lock acquired via \n')
 8         fs.close()
 9 
10 def worker_no_with(lock, f):
11     lock.acquire()
12     try:
13         fs = open(f,"a+")
14         fs.write('Lock acquired directly\n')
15         fs.close()
16     finally:
17         lock.release()
18 
19 if __name__ == "__main__":
20 
21     f = "file.txt"
22 
23     lock = multiprocessing.Lock()
24     w = multiprocessing.Process(target=worker_with, args=(lock, f))
25     nw = multiprocessing.Process(target=worker_no_with, args=(lock, f))
26 
27     w.start()
28     nw.start()
29 
30     w.join()
31     nw.join()
32 
33 
34 #cat file.txt
35 
36 Lock acquired directly
37 Lock acquired via 

注:若是兩個進程沒有使用lock來同步,則他們對同一個文件的寫操做可能會出現混亂。

 

  進程池:

     進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。

      建立一個進程池:

        Pool([numprocess [,initializer [, initargs]]])

numprocess: 要建立的進程數

initlalizer: 每一個工做進程啓動時要執行的可調用對象,默認爲None

initargs: 傳遞給initlalizer的元組

    Pool的實例:

p.apply(func, [, args [, kwargs]])#在一個池工做進程中執行函數(**args, **kwargs),而後返回結果,不能再池中並行執行,可以使用apply_async
p.apply_async(func, [, args [, kwargs [,callback]]])#在一個池工做進程中異步執行函數(**args, **kwargs),而後返回結果,傳遞給callback。
p.terminate()#當即終止
p.close()# 關閉進程池
p.join()# 等待全部工做進程退出

案例:

 1 from multiprocessing import Pool
 2 import time
 3 
 4 def f1(arg):
 5     time.sleep(3)
 6     print(arg)
 7 
 8 if __name__ == '__main__':
 9     pool = Pool(5) #併發執行5個函數
10 
11     for i in range(15):
12         #pool.apply(func=f1,args=(i,))#不能併發的執行函數
13         pool.apply_async(func=f1,args=(i,))#可併發執行函數
14 
15     pool.close() #全部的任務執行完畢
16     time.sleep(3)
17     #pool.terminate()#當即終止
18     pool.join()

   進程數據共享:

        一般進程之間是徹底孤立的,使用數據共享,能夠訪問多個進程。

   實現進程數據共享有兩種方法:

 1 #方法一,Array
 2 
 3 from multiprocessing import Process
 4 from multiprocessing import Array
 5 
 6 def foo(i,arg):
 7     arg[i] = i + 100
 8     for item in arg:
 9         print(item)
10     print('============')
11 
12 if __name__ == '__main__':
13     li = Array('i',10)
14     for i in range(10):
15         p = Process(target=foo,args=(i,li,))
16         p.start()
 1 #方法二:manage.dict()共享數據
 2 
 3 from multiprocessing import Process
 4 from multiprocessing import Manager
 5 #
 6 def foo(i,arg):
 7     arg[i] = i + 100
 8     print(arg.values())
 9 
10 if __name__ == '__main__':
11     obj = Manager()
12     li = obj.dict()
13     for i in range(10):
14         p = Process(target=foo,args=(i,li,))
15         p.start()
16     import time
17     time.sleep(1)

  線程鎖(Lock, RLock):

    因爲線程之間是進行隨機調度,而且每一個線程可能只執行n條執行以後,當多個線程同時修改同一條數據時可能會出現髒數據,因此,出現了線程鎖 - 同一時刻容許一個線程執行操做。

    Lock():建立新的Lock對象,初始化狀態爲非鎖定

    lock.acquire([blocking]): 獲取鎖定

    lock.release(): 釋放鎖定

 1 import threading,time
 2 
 3 def run(n):
 4     semaphore.acquire()
 5     time.sleep(1)
 6     print("run the thread: %s" %n)
 7     semaphore.release()
 8 
 9 if __name__ == '__main__':
10 
11     num= 0
12     semaphore  = threading.BoundedSemaphore(2) #最多容許5個線程同時運行
13     for i in range(5):
14         t = threading.Thread(target=run,args=(i,))
15         t.start()
16 
17 
18 1
19 run the thread: 1
20 run the thread: 0
21 2
22 run the thread: 3
23 run the thread: 2
24 3
25 run the thread: 4
26 4
27 5
28 6
29 7
30 8
31 9
32 10

 

  (三)協程:

       協程咱們能夠當作是一種用戶空間的線程,利用一個線程,分解一個線程成爲多個「微線程」  

        Python經過yield提供了對協程的基本支持,可是不徹底。而第三方的gevent爲Python提供了比較完善的協程支持。

        gevent是第三方庫,經過greenlet實現協程,其基本思想是:  

         當一個greenlet遇到IO操做時,好比訪問網絡,就自動切換到其餘的greenlet,等到IO操做完成,再在適當的時候切換回來繼續執行。因爲IO操做很是耗時,常常使程序處於等待狀態,有了gevent爲咱們自動切換協程,就保證總有greenlet在運行,而不是等待IO。

因爲切換是在IO操做時自動完成,因此gevent須要修改Python自帶的一些標準庫,這一過程在啓動時經過monkey patch完成:

 1 from gevent import monkey; monkey.patch_socket()
 2 import gevent
 3 
 4 def f(n):
 5     for i in range(n):
 6         print gevent.getcurrent(), i
 7 
 8 g1 = gevent.spawn(f, 5)
 9 g2 = gevent.spawn(f, 5)
10 g3 = gevent.spawn(f, 5)
11 g1.join()
12 g2.join()
13 g3.join()
14 
15 
16 <Greenlet at 0x10e49f550: f(5)> 0
17 <Greenlet at 0x10e49f550: f(5)> 1
18 <Greenlet at 0x10e49f550: f(5)> 2
19 <Greenlet at 0x10e49f550: f(5)> 3
20 <Greenlet at 0x10e49f550: f(5)> 4
21 <Greenlet at 0x10e49f910: f(5)> 0
22 <Greenlet at 0x10e49f910: f(5)> 1
23 <Greenlet at 0x10e49f910: f(5)> 2
24 <Greenlet at 0x10e49f910: f(5)> 3
25 <Greenlet at 0x10e49f910: f(5)> 4
26 <Greenlet at 0x10e49f4b0: f(5)> 0
27 <Greenlet at 0x10e49f4b0: f(5)> 1
28 <Greenlet at 0x10e49f4b0: f(5)> 2
29 <Greenlet at 0x10e49f4b0: f(5)> 3
30 <Greenlet at 0x10e49f4b0: f(5)> 4

能夠看到,3個greenlet是依次運行而不是交替運行。

要讓greenlet交替運行,能夠經過gevent.sleep()交出控制權:

def f(n):
    for i in range(n):
        print gevent.getcurrent(), i
        gevent.sleep(0)


<Greenlet at 0x10cd58550: f(5)> 0
<Greenlet at 0x10cd58910: f(5)> 0
<Greenlet at 0x10cd584b0: f(5)> 0
<Greenlet at 0x10cd58550: f(5)> 1
<Greenlet at 0x10cd584b0: f(5)> 1
<Greenlet at 0x10cd58910: f(5)> 1
<Greenlet at 0x10cd58550: f(5)> 2
<Greenlet at 0x10cd58910: f(5)> 2
<Greenlet at 0x10cd584b0: f(5)> 2
<Greenlet at 0x10cd58550: f(5)> 3
<Greenlet at 0x10cd584b0: f(5)> 3
<Greenlet at 0x10cd58910: f(5)> 3
<Greenlet at 0x10cd58550: f(5)> 4
<Greenlet at 0x10cd58910: f(5)> 4
<Greenlet at 0x10cd584b0: f(5)> 4

3個greenlet交替運行,

把循環次數改成500000,讓它們的運行時間長一點,而後在操做系統的進程管理器中看,線程數只有1個。

固然,實際代碼裏,咱們不會用gevent.sleep()去切換協程,而是在執行到IO操做時,gevent自動切換,代碼以下:

 1 from gevent import monkey; monkey.patch_all()
 2 import gevent
 3 import urllib2
 4 
 5 def f(url):
 6     print('GET: %s' % url)
 7     resp = urllib2.urlopen(url)
 8     data = resp.read()
 9     print('%d bytes received from %s.' % (len(data), url))
10 
11 gevent.joinall([
12         gevent.spawn(f, 'https://www.python.org/'),
13         gevent.spawn(f, 'https://www.yahoo.com/'),
14         gevent.spawn(f, 'https://github.com/'),
15 ])
16 
17 
18 GET: https://www.python.org/
19 GET: https://www.yahoo.com/
20 GET: https://github.com/
21 45661 bytes received from https://www.python.org/.
22 14823 bytes received from https://github.com/.
23 304034 bytes received from https://www.yahoo.com/.

從結果看,3個網絡操做是併發執行的,並且結束順序不一樣,但只有一個線程。

 

  (四)緩存

      memcache:

        下載: wget http://ftp.tummy.com/pub/python-memcached/old-releases/python-memcached-1.54.tar.gz(本身更新最新版)

        解壓縮:tar -zxvf python-memcached-1.54.tar.gz

        安裝: python setup.py install

        啓動:memcached --10    -u root -l 127.0.0.1  -p 11511 -256 -/tmp/memcached.pid

 
參數說明:
     - d 是啓動一個守護進程
     - m 是分配給Memcache使用的內存數量,單位是MB
     - u 是運行Memcache的用戶
     - l 是監聽的服務器IP地址
     - p 是設置Memcache監聽的端口,最好是 1024 以上的端口
     - c 選項是最大運行的併發鏈接數,默認是 1024 ,按照你服務器的負載量來設定
     - P 是設置保存Memcache的pid文件

代碼:

import memcache

class MemcachedClient():
    ''' python memcached 客戶端操做示例 '''

    def __init__(self, hostList):
        self.__mc = memcache.Client(hostList);

    def set(self, key, value):
        result = self.__mc.set("name", "hongfei")
        return result

    def get(self, key):
        name = self.__mc.get("name")
        return name

    def delete(self, key):
        result = self.__mc.delete("name")
        return result

if __name__ == '__main__':
    mc = MemcachedClient(["127.0.0.1:11511", "127.0.0.1:11512"])
    key = "name"
    result = mc.set(key, "hongfei")
    print("set的結果:", result)
    name = mc.get(key)
    print ("get的結果:", name)
    result = mc.delete(key)
    print ("delete的結果:", result)

set的結果: True
get的結果: hongfei
delete的結果: 1

 

 很抱歉,時間有點倉促,寫的不是很細,有點亂,之後慢慢補充整理,謝謝查看。 

相關文章
相關標籤/搜索