python 的線程池模式是手動實現的,並無現成的實現,所以我猜,線程池並不可以直接使代碼清晰起來,更多的是概念性的,由於我以爲直接建立線程更加的簡單粗暴,而且很好的定位錯誤信息。可是,也應該去了解它的實現原理。python
import Queue, threading, sys from threading import Thread import time import urllib # working thread class Worker(Thread): worker_count = 0 timeout = 1 def __init__( self, workQueue, resultQueue, **kwds): Thread.__init__( self, **kwds ) self.id = Worker.worker_count Worker.worker_count += 1 self.setDaemon( True ) self.workQueue = workQueue self.resultQueue = resultQueue self.start( ) def run( self ): ''' the get-some-work, do-some-work main loop of worker threads ''' while True: try: callable, args, kwds = self.workQueue.get(timeout=Worker.timeout) res = callable(*args, **kwds) print "worker[%2d]: %s" % (self.id, str(res) ) self.resultQueue.put( res ) #time.sleep(Worker.sleep) except Queue.Empty: break except : print 'worker[%2d]' % self.id, sys.exc_info()[:2] raise class WorkerManager: def __init__( self, num_of_workers=10, timeout = 2): self.workQueue = Queue.Queue() self.resultQueue = Queue.Queue() self.workers = [] self.timeout = timeout self._recruitThreads( num_of_workers ) def _recruitThreads( self, num_of_workers ): for i in range( num_of_workers ): worker = Worker( self.workQueue, self.resultQueue ) self.workers.append(worker) def wait_for_complete( self): # ...then, wait for each of them to terminate: while len(self.workers): worker = self.workers.pop() worker.join( ) if worker.isAlive() and not self.workQueue.empty(): self.workers.append( worker ) print "All jobs are are completed." def add_job( self, callable, *args, **kwds ): self.workQueue.put( (callable, args, kwds) ) def get_result( self, *args, **kwds ): return self.resultQueue.get( *args, **kwds )
以上是線程管理的建立方法。下面是調用的實例:app
import urllib2 import time import socket from datetime import datetime from thread_pool import * def main(): url_list = {"sina":"http://www.sina.com.cn", "sohu":"http://www.sohu.com", "yahoo":"http://www.yahoo.com", "xiaonei":"http://www.xiaonei.com", "qihoo":"http://www.qihoo.com", "laohan":"http://www.laohan.org", "eyou":"http://www.eyou.com", "chinaren":"http://www.chinaren.com", "douban":"http://www.douban.com", "163":"http://www.163.com", "daqi":"http://www.daqi.com", "qq":"http://www.qq.com", "baidu_1":"http://www.baidu.com/s?wd=asdfasdf", "baidu_2":"http://www.baidu.com/s?wd=dddddddf", "google_1":"http://www.baidu.com/s?wd=sadfas", "google_2":"http://www.baidu.com/s?wd=sadflasd", "hainei":"http://www.hainei.com", "microsoft":"http://www.microsoft.com", "wlzuojia":"http://www.wlzuojia.com"} socket.setdefaulttimeout(10) print 'start testing' wm = WorkerManager(50) for url_name in url_list.keys(): wm.add_job(do_get_con, url_name, url_list[url_name]) wm.wait_for_complete() print 'end testing' def do_get_con(url_name,url_link): try: fd = urllib2.urlopen(url_link) data = fd.read() f_hand = open("/tmp/ttt/%s" % url_name,"w") f_hand.write(data) f_hand.close() except Exception,e: pass if __name__ == "__main__": main()