學習PYTHON之路, DAY 10 進程、線程、協程篇

線程

線程是應用程序中工做的最小單元。它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務。python

直接調用多線程

import threading import time def show(arg): time.sleep(1) print 'thread'+str(arg) for i in range(10): t = threading.Thread(target=show, args=(i,)) t.start() print 'main thread stop'

繼承式調用併發

import threading import time class MyThread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) self.num = num def run(self):#定義每一個線程要運行的函數
 
        print("running on number:%s" %self.num) time.sleep(3) if __name__ == '__main__': t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start()

更多方法:app

    • start            線程準備就緒,等待CPU調度
    • setName      爲線程設置名稱
    • getName      獲取線程名稱
    • setDaemon   設置爲後臺線程或前臺線程(默認)
                         若是是後臺線程,主線程執行過程當中,後臺線程也在進行,主線程執行完畢後,後臺線程不論成功與否,均中止
                          若是是前臺線程,主線程執行過程當中,前臺線程也在進行,主線程執行完畢後,等待前臺線程也執行完成後,程序中止
    • join              逐個執行每一個線程,執行完畢後繼續往下執行,該方法使得多線程變得無心義
    • run              線程被cpu調度後自動執行線程對象的run方法
 1 import time  2 import threading  3  
 4  
 5 def run(n):  6  
 7     print('[%s]------running----\n' % n)  8     time.sleep(2)  9     print('--done--') 10  
11 def main(): 12     for i in range(5): 13         t = threading.Thread(target=run,args=[i,]) 14  t.start() 15         t.join(1) 16         print('starting thread', t.getName()) 17  
18  
19 m = threading.Thread(target=main,args=[]) 20 m.setDaemon(True) #將main線程設置爲Daemon線程,它作爲程序主線程的守護線程,當主線程退出時,m線程也會退出,由m啓動的其它子線程會同時退出,無論是否執行完任務
21 m.start() 22 m.join(timeout=2) 23 print("---main thread done----")
守護線程

線程鎖(Lock、RLock)

因爲線程之間是進行隨機調度,而且每一個線程可能只執行n條執行以後,當多個線程同時修改同一條數據時可能會出現髒數據,因此,出現了線程鎖 - 同一時刻容許一個線程執行操做。dom

import threading import time gl_num = 0 lock = threading.RLock() def Func(): lock.acquire() global gl_num gl_num +=1 time.sleep(1) print gl_num lock.release() for i in range(10): t = threading.Thread(target=Func) t.start()

信號量(Semaphore)

互斥鎖 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 ,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去。ide

 1 import threading,time  2  
 3 def run(n):  4  semaphore.acquire()  5     time.sleep(1)  6     print("run the thread: %s" %n)  7  semaphore.release()  8  
 9 if __name__ == '__main__': 10  
11     num= 0 12     semaphore  = threading.BoundedSemaphore(5) #最多容許5個線程同時運行
13     for i in range(20): 14         t = threading.Thread(target=run,args=(i,)) 15         t.start()

事件(event)

python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。函數

事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。測試

  • clear:將「Flag」設置爲False
  • set:將「Flag」設置爲True
import threading def do(event): print 'start' event.wait() print 'execute' event_obj = threading.Event() for i in range(10): t = threading.Thread(target=do, args=(event_obj,)) t.start() event_obj.clear() inp = raw_input('input:') if inp == 'true': event_obj.set()

條件(Condition)

使得線程等待,只有知足某條件時,才釋放n個線程ui

def condition_func(): ret = False inp = input('>>>') if inp == '1': ret = True return ret def run(n): con.acquire() con.wait_for(condition_func) print("run the thread: %s" %n) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start()

隊列

class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #last in fisrt out 
class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

Queue.qsize()spa

 

Queue.empty() #return True if empty  

 

 

Queue.full() # return True if full 

 

 

Queue.put(itemblock=Truetimeout=None)Queue.put_nowait(item)Equivalent to put(item, False).

 

 

Queue.get(block=Truetimeout=None)Queue.get_nowait()

 

Equivalent to get(False)

Queue.task_done()

 1 import time,random
 2 import queue,threading
 3 q = queue.Queue()
 4 def Producer(name):
 5   count = 0
 6   while count <20:
 7     time.sleep(random.randrange(3))
 8     q.put(count)
 9     print('Producer %s has produced %s baozi..' %(name, count))
10     count +=1
11 def Consumer(name):
12   count = 0
13   while count <20:
14     time.sleep(random.randrange(4))
15     if not q.empty():
16         data = q.get()
17         print(data)
18         print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
19     else:
20         print("-----no baozi anymore----")
21     count +=1
22 p1 = threading.Thread(target=Producer, args=('A',))
23 c1 = threading.Thread(target=Consumer, args=('B',))
24 p1.start()
25 c1.start()

線程池

 1 import queue
 2 import threading
 3 import time
 4 
 5 
 6 class ThreadPool:
 7     def __init__(self, maxsize=5):
 8         self.maxsize = maxsize
 9         self._q = queue.Queue(maxsize)
10         for i in range(maxsize):
11             self._q.put(threading.Thread)
12         # 【threading.Thread,threading.Thread,threading.Thread,threading.Thread,threading.Thread】
13     def get_thread(self):
14         return self._q.get()
15 
16     def add_thread(self):
17         self._q.put(threading.Thread)
18 
19 pool = ThreadPool(5)
20 
21 def task(arg,p):
22     print(arg)
23     time.sleep(1)
24     p.add_thread()
25 
26 for i in range(100):
27     # threading.Thread類
28     t = pool.get_thread()
29     obj = t(target=task,args=(i,pool,))
30     obj.start()
簡單版
  1 #!/usr/bin/env python
  2 # -*- coding:utf-8 -*-
  3 # Author:Alex Li
  4 import queue
  5 import threading
  6 import contextlib
  7 import time
  8 
  9 StopEvent = object()
 10 
 11 class ThreadPool(object):
 12 
 13     def __init__(self, max_num, max_task_num = None):
 14         if max_task_num:
 15             self.q = queue.Queue(max_task_num)
 16         else:
 17             self.q = queue.Queue()
 18         self.max_num = max_num
 19         self.cancel = False
 20         self.terminal = False
 21         self.generate_list = []
 22         self.free_list = []
 23 
 24     def run(self, func, args, callback=None):
 25         """
 26         線程池執行一個任務
 27         :param func: 任務函數
 28         :param args: 任務函數所需參數
 29         :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值(默認爲None,即:不執行回調函數)
 30         :return: 若是線程池已經終止,則返回True不然None
 31         """
 32         if self.cancel:
 33             return
 34         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
 35             self.generate_thread()
 36         w = (func, args, callback,)
 37         self.q.put(w)
 38 
 39     def generate_thread(self):
 40         """
 41         建立一個線程
 42         """
 43         t = threading.Thread(target=self.call)
 44         t.start()
 45 
 46     def call(self):
 47         """
 48         循環去獲取任務函數並執行任務函數
 49         """
 50         current_thread = threading.currentThread
 51         self.generate_list.append(current_thread)
 52 
 53         event = self.q.get()
 54         while event != StopEvent:
 55 
 56             func, arguments, callback = event
 57             try:
 58                 result = func(*arguments)
 59                 success = True
 60             except Exception as e:
 61                 success = False
 62                 result = None
 63 
 64             if callback is not None:
 65                 try:
 66                     callback(success, result)
 67                 except Exception as e:
 68                     pass
 69 
 70             with self.worker_state(self.free_list, current_thread):
 71                 if self.terminal:
 72                     event = StopEvent
 73                 else:
 74                     event = self.q.get()
 75         else:
 76 
 77             self.generate_list.remove(current_thread)
 78 
 79     def close(self):
 80         """
 81         執行完全部的任務後,全部線程中止
 82         """
 83         self.cancel = True
 84         full_size = len(self.generate_list)
 85         while full_size:
 86             self.q.put(StopEvent)
 87             full_size -= 1
 88 
 89     def terminate(self):
 90         """
 91         不管是否還有任務,終止線程
 92         """
 93         self.terminal = True
 94 
 95         while self.generate_list:
 96             self.q.put(StopEvent)
 97 
 98         self.q.empty()
 99 
100     @contextlib.contextmanager
101     def worker_state(self, state_list, worker_thread):
102         """
103         用於記錄線程中正在等待的線程數
104         """
105         state_list.append(worker_thread)
106         try:
107             yield
108         finally:
109             state_list.remove(worker_thread)
110 
111 
112 pool = ThreadPool(5)
113 
114 def callback(status, result):
115     # status, execute action status
116     # result, execute action return value
117     pass
118 
119 def action(i):
120     print(i)
121 
122 for i in range(300):
123     ret = pool.run(action, (i,), callback)
124 
125 # time.sleep(5)
126 # print(len(pool.generate_list), len(pool.free_list))
127 # print(len(pool.generate_list), len(pool.free_list))
複雜版

 上下文管理

 1 import contextlib
 2 
 3 @contextlib.contextmanager #加了這個裝飾器,能夠用with
 4 def work(free_list, worker_thread):
 5     free_list.append(worker_thread)
 6     try:
 7         yield
 8     finally:
 9         free_list.remove(worker_thread)
10 
11 free_list = []
12 worker_thread = '1'
13 with work(free_list, worker_thread):
14     print(123)

運行順序

多進程

from multiprocessing import Process
import time
def f(name):
    time.sleep(2)
    print('hello', name)
 
if __name__ == '__main__': #在windos進程只能作測試,必定要寫這句
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

進程間通信

Queues

使用方法跟threading裏的queue差很少

from multiprocessing import Process, Queue

def f(i,q):
    print(i,q.get())

if __name__ == '__main__':
    q = Queue()

    q.put("h1")
    q.put("h2")
    q.put("h3")

    for i in range(10):
        p = Process(target=f, args=(i,q,))
        p.start()

 Managers

 1 from multiprocessing import Process, Manager
 2  
 3 def f(d, l):
 4     d[1] = '1'
 5     d['2'] = 2
 6     d[0.25] = None
 7     l.append(1)
 8     print(l)
 9  
10 if __name__ == '__main__':
11     with Manager() as manager:
12         d = manager.dict()
13  
14         l = manager.list(range(5))
15         p_list = []
16         for i in range(10):
17             p = Process(target=f, args=(d, l))
18             p.start()
19             p_list.append(p)
20         for res in p_list:
21             res.join()
22  
23         print(d)
24         print(l)

 

協程

協程一個標準定義:

  1. 必須在只有一個單線程裏實現併發
  2. 修改共享數據不需加鎖
  3. 用戶程序裏本身保存多個控制流的上下文棧
  4. 一個協程遇到IO操做自動切換到其它協程

greenlet

 

 1 from greenlet import greenlet
 2  
 3  
 4 def test1():
 5     print 12
 6     gr2.switch()
 7     print 34
 8     gr2.switch()
 9  
10  
11 def test2():
12     print 56
13     gr1.switch()
14     print 78
15  
16 gr1 = greenlet(test1)
17 gr2 = greenlet(test2)
18 gr1.switch()

 

gevent

 1 import gevent
 2  
 3 def foo():
 4     print('Running in foo')
 5     gevent.sleep(0)
 6     print('Explicit context switch to foo again')
 7  
 8 def bar():
 9     print('Explicit context to bar')
10     gevent.sleep(0)
11     print('Implicit context switch back to bar')
12  
13 gevent.joinall([
14     gevent.spawn(foo),
15     gevent.spawn(bar),
16 ])
相關文章
相關標籤/搜索