Python之路【第八篇】python實現線程池

線程池概念

什麼是線程池?
諸如web服務器、數據庫服務器、文件服務器和郵件服務器等許多服務器應用都面向處理來自某些遠程來源的大量短小的任務。
構建服務器應用程序的一個過於簡單的模型是:每當一個請求到達就建立一個新的服務對象,而後在新的服務對象中爲請求服務。
但當有大量請求併發訪問時,服務器不斷的建立和銷燬對象的開銷很大。
因此提升服務器效率的一個手段就是儘量減小建立和銷燬對象的次數,特別是一些很耗資源的對象建立和銷燬,這樣就引入了「池」的概念,
「池」的概念使得人們能夠定製必定量的資源,而後對這些資源進行復用,而不是頻繁的建立和銷燬。

線程池是預先建立線程的一種技術
這些線程都是處於睡眠狀態,即均爲啓動,不消耗CPU,而只是佔用較小的內存空間。
當請求到來以後,緩衝池給此次請求分配一個空閒線程,把請求傳入此線程中運行,進行處理。
當預先建立的線程都處於運行狀態,即預製線程不夠,線程池能夠自由建立必定數量的新線程,用於處理更多的請求。
當系統比較閒的時候,也能夠經過移除一部分一直處於停用狀態的線程。

線程池的注意事項
雖然線程池是構建多線程應用程序的強大機制,但使用它並非沒有風險的。在使用線程池時需注意線程池大小與性能的關係,注意併發風險、死鎖、資源不足和線程泄漏等問題。
一、線程池大小。多線程應用並不是線程越多越好,須要根據系統運行的軟硬件環境以及應用自己的特色決定線程池的大小。html

通常來講,若是代碼結構合理的話,線程數目與CPU 數量相適合便可。
若是線程運行時可能出現阻塞現象,可相應增長池的大小;若有必要可採用自適應算法來動態調整線程池的大小,以提升CPU 的有效利用率和系統的總體性能。
二、併發錯誤。多線程應用要特別注意併發錯誤,要從邏輯上保證程序的正確性,注意避免死鎖現象的發生。
三、線程泄漏。這是線程池應用中一個嚴重的問題,當任務執行完畢而線程沒能返回池中就會發生線程泄漏現象。python

線程池要點

線程池要點:web

線程池要點:
1、經過判斷等待的任務數量和線程池中的最大值,取最小值來判斷開啓多少線程來工做
好比:
任務數是3,進程池最大20  ,那麼我們只須要開啓3個線程就好了。
任務數是500,進程池是20,那麼我們只開20個線程就能夠了。
取最小值

2、實現線程池正在運行,有一個查看的功能,查看一下如今線程裏面活躍的線程是多少等待的是多少?

線程總共是多少,等待中多少,正在運行中多少
做用:
方便查看當前線程池狀態
能獲取到這個以後就能夠當線程一直處於空閒狀態

查看狀態用:上下文管理來作,很是nice的一點

三、關閉線程

簡單線程池實現算法

#!/usr/bin/env python
#-*- coding:utf-8 -*-
__author__ = 'luo_t'
import Queue
import threading
import time

'''
這個簡單的例子的想法是經過:
一、利用Queue特性,在Queue裏建立多個線程對象
二、那我執行代碼的時候,去queue裏去拿線程!
若是線程池裏有可用的,直接拿。
若是線程池裏沒有可用,那就等。
三、線程執行完畢,歸還給線程池
'''

class ThreadPool(object): #建立線程池類
    def __init__(self,max_thread=20):#構造方法,設置最大的線程數爲20
        self.queue = Queue.Queue(max_thread) #建立一個隊列
        for i in xrange(max_thread):#循環把線程對象加入到隊列中
            self.queue.put(threading.Thread)
            #把線程的類名放進去,執行完這個Queue

    def get_thread(self):#定義方法從隊列裏獲取線程
        return self.queue.get()

    def add_thread(self):#定義方法在隊列裏添加線程
        self.queue.put(threading.Thread)

pool = ThreadPool(10)

def func(arg,p):
    print arg
    time.sleep(2)
    p.add_thread() #當前線程執行完了,我在隊列里加一個線程!

for i in xrange(300):
    thread = pool.get_thread() #線程池10個線程,每一次循環拿走一個!默認queue.get(),若是隊列裏沒有數據就會等待。
    t = thread(target=func,args=(i,pool))
    t.start()


'''
self.queue.put(threading.Thread) 添加的是類不是對象,在內存中若是相同的類只佔一分內存空間
而且若是這裏存儲的是對象的話每次都的新增都得在內存中開闢一段內存空間

還有若是是對象的話:下面的這個語句就不能這麼調用了!
for i in xrange(300):
    thread = pool.get_thread()
    t = thread(target=func,args=(i,pool))
    t.start()
    經過查看源碼能夠知道,在thread的構造函數中:self.__args = args  self.__target = target  都是私有字段那麼調用就應該這麼寫

for i in xrange(300):
    ret = pool.get_thread()
    ret._Thread__target = func
    ret._Thread__args = (i,pool)
    ret.start()
'''
simple_pool.py

複雜線程池須要知道的知識點數據庫

#!/usr/bin/env python
#-*- coding:utf-8 -*-
__author__ = 'luo_t'

import Queue
obj = object() #object也是一個類,我建立了一個對象obj

q = Queue.Queue()
for i in range(10):
    print id(obj)#看蘿蔔號
    q.put(obj)
'''
這個隊列裏有10個蘿蔔(蘿蔔=obj),可是這10個蘿蔔只是個投影。
咱們在for循環的時候put到隊列裏,obj有變化嗎?是否有新開闢空間?顯然沒有
'''
knowledge_point_1.py
#!/usr/bin/env python
#-*- coding:utf-8 -*-
__author__ = 'luo_t'
import contextlib
import threading
import time
import random

doing = []
def number(l2):
    while True:
        print len(l2)
        time.sleep(1)

t = threading.Thread(target=number,args=(doing,))  #開啓一個線程,每一秒打印列表,當前工做中的線程數量
t.start()


#添加管理上下文的裝飾器
@contextlib.contextmanager
def show(li,iterm):
    li.append(iterm)
    yield
    '''
    yield凍結此次操做,就出去了,with就會捕捉到,而後就會執行with下的代碼塊,當with下的代碼塊
    執行完畢後就會回來繼續執行yield下面沒有執行的代碼塊!
    而後就執行完畢了
    若是with代碼塊中的很是耗時,那麼doing的長度是否是一直是1,說明他沒執行完呢?咱們就能夠獲取到正在執行的數量,當他with執行完畢後
    執行yield的後續的代碼塊。把他移除後就爲0了!
    '''
    li.remove(iterm)


def task(arg):
    with show(doing,1):#經過with管理上下文進行切換
        print len(doing)
        time.sleep(10) #等待10秒這裏可使用random模塊來操做~

for i in range(20): #開啓20個線程執行
    temp = threading.Thread(target=task,args=(i,))
    temp.start()

'''
做用:咱們要記錄正在工做的的列表
好比正在工做的線程我把加入到doing這個列表中,若是工做完成的把它從doing列表中移除。
經過這個機制,就能夠獲取如今正在執行的線程都有多少
'''
knowledge_point_2.py

線程池實現

#!/usr/bin/env python
#-*- coding:utf-8 -*-
__author__ = 'luo_t'
from Queue import Queue
import contextlib
import threading

WorkerStop = object()


class ThreadPool:
    workers = 0
    threadFactory = threading.Thread
    currentThread = staticmethod(threading.currentThread)

    def __init__(self, maxthreads=20, name=None):
        self.q = Queue(0) #這裏建立一個隊列,若是是0的話表示不限制,如今這個隊列裏放的是任務
        self.max = maxthreads #定義最大線程數
        self.name = name
        self.waiters = []#這兩個是用來計數的
        self.working = []#這兩個是用來技術的

    def start(self):
        #self.max 最大線程數
        #q.qisze(),任務個數
        needSize = self.q.qsize()
        while self.workers < min(self.max, needSize):#min(10,20)取最小值
            #wokers默認爲0  【workers = 0】
            '''
            舉例來講:
            while self.workers < min(self.max, needSize):
            這個循環,好比最大線程爲20,我們的任務個數爲10,取最小值爲10
            每次循環開1個線程,而且workers自增1,那麼循環10次後,開了10個線程了workers = 10 ,那麼workers就不小於10了
            就不開線程了,我線程開到最大了,大家這10個線程去消耗這10個任務去吧
            而且這裏不阻塞,建立完線程就去執行了!
            每個線程都去執行_worker方法去了
            '''
            self.startAWorker()

    def startAWorker(self):
        self.workers += 1
        newThread = self.threadFactory(target=self._worker, name='shuaige') #建立一個線程並去執行_worker方法
        newThread.start()

    def callInThread(self, func, *args, **kw):
        self.callInThreadWithCallback(None, func, *args, **kw)

    def callInThreadWithCallback(self, onResult, func, *args, **kw):
        o = (func, args, kw, onResult)
        self.q.put(o)


    @contextlib.contextmanager
    def _workerState(self, stateList, workerThread):
        stateList.append(workerThread)
        try:
            yield
        finally:
            stateList.remove(workerThread)

    def _worker(self):
        ct = self.currentThread()
        o = self.q.get() #去隊列裏取任務,若是有任務就O就會有值,每一個任務是個元組,有方法,有參數
        while o is not WorkerStop:
            with self._workerState(self.working, ct):  #上下文切換
                function, args, kwargs, onResult = o
                del o
                try:
                    result = function(*args, **kwargs)
                    success = True
                except:
                    success = False
                    if onResult is None:
                        pass
                    else:
                        pass

                del function, args, kwargs

                if onResult is not None:
                    try:
                        onResult(success, result)
                    except:
                        #context.call(ctx, log.err)
                        pass

                del onResult, result

            with self._workerState(self.waiters, ct): #當線程工做完閒暇的時候,在去取任務執行
                o = self.q.get()

    def stop(self): #定義關閉線程方法
        while self.workers: #循環workers值
            self.q.put(WorkerStop) #在隊列中增長一個信號~
            self.workers -= 1 #workers值-1 直到全部線程關閉


def show(arg):
    import time
    time.sleep(1)
    print arg


pool = ThreadPool(10)

#建立500個任務,隊列裏添加了500個任務
#每一個任務都是一個元組(方法名,動態參數,動態參數,默認爲NoNe)
for i in range(100):
    pool.callInThread(show, i)

pool.start()  #隊列添加完成以後,開啓線程讓線程一個一個去隊列裏去拿

pool.stop() #當上面的任務都執行完以後,線程中都在等待着在隊列裏去數據呢!
'''
咱們要關閉全部的線程,執行stop方法,首先workers這個值是當前的線程數量,咱們給線程發送一個信號「WorkerStop」
在線程的工做裏:        while o is not WorkerStop:   若是線程獲取到這個值就不執行了,而後這個線程while循環就中止了,等待
python的垃圾回收機制,回收。

而後在self.workers -= 1 ,那麼全部的線程收到這個信號以後就會中止!!!
over~
'''

 

更多請參考:http://www.cnblogs.com/wupeiqi/articles/4839959.html服務器

相關文章
相關標籤/搜索