以常見的端口掃描器爲實例linux
端口掃描器的原理很簡單,操做socket來判斷鏈接狀態肯定主機端口的開放狀況。多線程
import socket def scan(port): s = socket.socket() if s.connect_ex(('localhost', port)) == 0: print port, 'open' s.close() if __name__ == '__main__': map(scan,range(1,65536))
這是一個socket掃描器的基本代碼。socket
可是若是直接運行會等待很長時間都沒有反應,這是由於socket是阻塞的,到等待每一個鏈接超時後纔會進入下一個鏈接。優化
給這段代碼加一個超時spa
s.settimeout(0.1)
完整的代碼以下線程
import socket def scan(port): s = socket.socket() s = settimeont(0.1) if s.connect_ex(('localhost', port)) == 0: print port, 'open' s.close() if __name__ == '__main__': map(scan,range(1,65536))
本文的重點不在於掃描器功能部分。而重點在於代碼質量的提高和優化從而提高代碼的運行效率。code
多線程版本:blog
import socket import threading def scan(port): s = socket.socket() s.settimeout(0.1) if s.connect_ex(('localhost', port)) == 0: print port, 'open' s.close() if __name__ == '__main__': threads = [threading.Thread(target=scan, args=(i,)) for i in xrange(1,65536)] map(lambda x:x.start(),threads)
Run起來,速度確實快了很多,可是拋出了異常:thread.error: can't start new thread隊列
這個進程開啓了65535個線程,有兩種可能,一種是超過最大線程數了,一種是超過最大socket句柄數了。在linux能夠經過ulimit來修改。
若是不修改最大限制,怎麼用多線程不報錯呢?
加個queue,變成生產者-消費者模式,開固定線程。進程
多線程+隊列版本:
import socket import threading from Queue import Queue def scan(port): s = socket.socket() s.settimeout(0.1) if s.connect_ex(('localhost', port)) == 0: print port, 'open' s.close() def worker(): while not q.empty(): port = q.get() try: scan(port) finally: q.task_done() if __name__ == '__main__': q = Queue() map(q.put,xrange(1,65535)) threads = [threading.Thread(target=worker) for i in xrange(500)] map(lambda x:x.start(),threads) q.join()
開500個線程,不停的從隊列中取出任務來進行...
multiprocessing + 隊列版本:
總不能開65535個進程吧?仍是用生產者消費者模式
import socket
import multiprocessing def scan(port): s = socket.socket() s.settimeout(0.1) if s.connect_ex(('localhost', port)) == 0: print port, 'open' s.close() def worker(q): while not q.empty(): port = q.get() try: scan(port) finally: q.task_done() if __name__ == '__main__': q = multiprocessing.JoinableQueue() map(q.put,xrange(1,65535)) jobs = [multiprocessing.Process(target=worker, args=(q,)) for i in xrange(100)] map(lambda x:x.start(),jobs)
注意這裏把隊列做爲一個參數傳入到worker中去,由於是process safe的queue,否則會報錯。
還有用的是JoinableQueue(),顧名思義就是能夠join()的。
gevent的spawn版本:
from gevent import monkey; monkey.patch_all(); import gevent import socket ... if __name__ == '__main__': threads = [gevent.spawn(scan, i) for i in xrange(1,65536)] gevent.joinall(threads)
注意monkey patch必須在被patch的東西以前import,否則會Exception KeyError.好比不能先import threading,再monkey patch.
gevent的Pool版本:
from gevent import monkey; monkey.patch_all(); import socket from gevent.pool import Pool ... if __name__ == '__main__': pool = Pool(500) pool.map(scan,xrange(1,65536)) pool.join()
concurrent.futures版本:
import socket from Queue import Queue from concurrent.futures import ThreadPoolExecutor ... if __name__ == '__main__': q = Queue() map(q.put,xrange(1,65536)) with ThreadPoolExecutor(max_workers=500) as executor: for i in range(500): executor.submit(worker,q)