python 線程池

python 的線程池模式是手動實現的,並無現成的實現,所以我猜,線程池並不可以直接使代碼清晰起來,更多的是概念性的,由於我以爲直接建立線程更加的簡單粗暴,而且很好的定位錯誤信息。可是,也應該去了解它的實現原理。python

 

import Queue, threading, sys
from threading import Thread
import time
import urllib
 
# working thread
class Worker(Thread):
    worker_count = 0
    timeout = 1
    def __init__( self, workQueue, resultQueue, **kwds):
        Thread.__init__( self, **kwds )
        self.id = Worker.worker_count
        Worker.worker_count += 1
        self.setDaemon( True )
        self.workQueue = workQueue
        self.resultQueue = resultQueue
        self.start( )
 
    def run( self ):
        ''' the get-some-work, do-some-work main loop of worker threads '''
        while True:
            try:
                callable, args, kwds = self.workQueue.get(timeout=Worker.timeout)
                res = callable(*args, **kwds)
                print "worker[%2d]: %s" % (self.id, str(res) )
                self.resultQueue.put( res )
                #time.sleep(Worker.sleep)
            except Queue.Empty:
                break
            except :
                print 'worker[%2d]' % self.id, sys.exc_info()[:2]
                raise
               
class WorkerManager:
    def __init__( self, num_of_workers=10, timeout = 2):
        self.workQueue = Queue.Queue()
        self.resultQueue = Queue.Queue()
        self.workers = []
        self.timeout = timeout
        self._recruitThreads( num_of_workers )
 
    def _recruitThreads( self, num_of_workers ):
        for i in range( num_of_workers ):
            worker = Worker( self.workQueue, self.resultQueue )
            self.workers.append(worker)
 
    def wait_for_complete( self):
        # ...then, wait for each of them to terminate:
        while len(self.workers):
            worker = self.workers.pop()
            worker.join( )
            if worker.isAlive() and not self.workQueue.empty():
                self.workers.append( worker )
        print "All jobs are are completed."
 
    def add_job( self, callable, *args, **kwds ):
        self.workQueue.put( (callable, args, kwds) )
 
    def get_result( self, *args, **kwds ):
        return self.resultQueue.get( *args, **kwds )

  以上是線程管理的建立方法。下面是調用的實例:app

import urllib2
import time
import socket
from datetime import datetime
from thread_pool import *
 
def main():
    url_list = {"sina":"http://www.sina.com.cn",
               "sohu":"http://www.sohu.com",
               "yahoo":"http://www.yahoo.com",
               "xiaonei":"http://www.xiaonei.com",
               "qihoo":"http://www.qihoo.com",
               "laohan":"http://www.laohan.org",
               "eyou":"http://www.eyou.com",
               "chinaren":"http://www.chinaren.com",
               "douban":"http://www.douban.com",
               "163":"http://www.163.com",
               "daqi":"http://www.daqi.com",
               "qq":"http://www.qq.com",
               "baidu_1":"http://www.baidu.com/s?wd=asdfasdf",
               "baidu_2":"http://www.baidu.com/s?wd=dddddddf",
               "google_1":"http://www.baidu.com/s?wd=sadfas",
               "google_2":"http://www.baidu.com/s?wd=sadflasd",
               "hainei":"http://www.hainei.com",
               "microsoft":"http://www.microsoft.com",
               "wlzuojia":"http://www.wlzuojia.com"}
 
    socket.setdefaulttimeout(10)
    print 'start testing'
    wm = WorkerManager(50)
    for url_name in url_list.keys():
        wm.add_job(do_get_con, url_name, url_list[url_name])
    wm.wait_for_complete()
    print 'end testing'
 
def do_get_con(url_name,url_link):
    try:
        fd = urllib2.urlopen(url_link)
        data = fd.read()
        f_hand = open("/tmp/ttt/%s" % url_name,"w")
        f_hand.write(data)
        f_hand.close()
    except Exception,e:
        pass
 
if __name__ == "__main__":
    main()
相關文章
相關標籤/搜索