首先介紹下python異步執行,python有兩種方法編寫異步代碼:html
一、corutines協程(也稱爲greenlets)python
二、回調git
gevent是greenlets的一種實現方式,能夠經過pip方便的安裝gevent模塊。gevent執行方式其實是代碼塊的交替執行,具體的能夠看下這篇blog,我就不重複造輪子了。github
值得一提的是,gevent封裝了不少接口,其中一個是著名的猴子補丁monkey,sql
from gevent import monkey monkey.patch_all()
這兩行就能夠在代碼中改變其他包的行爲,讓其從同步阻塞方式變爲異步非阻塞方式,很是的神奇。數據庫
我用gevent的異步非阻塞方式寫了一個手機號段蜘蛛爬蟲,目前一直在服務器穩定的運行,代碼詳見個人github,內有福利。腳本用法:python numspiderlist.py -s [String, e.g:138,137,1393134,1700001-1709999,1450000-1459999],程序基於data/mobile_area.db數據庫作增量更新。json
1 #!/usr/bin/python 2 #-*- coding:utf-8 -*- 3 """手機號段爬蟲:接收用戶命令參數精簡版 for sqlitedb 4 @version:1.0 5 @author:Kenny{Kenny.F<mailto:kennyffly@gmail.com>} 6 @since:2014/05/23 7 """ 8 import sys 9 reload(sys) 10 sys.setdefaultencoding('utf8') 11 import gevent #gevent協程包 12 import multiprocessing #多進程 13 from multiprocessing import Manager 14 import urllib2 15 from urllib import unquote,quote 16 import socket 17 socket.setdefaulttimeout(20) 18 import cookielib 19 import random 20 import simplejson as json 21 import os 22 import time 23 import sqlite3 #sqlite數據庫操做 24 from functools import wraps #方法工具 25 from strtodecode import strtodecode #編碼檢測轉換 26 27 28 manager = Manager() #多進程共享隊列 29 lacknumlist = manager.list() 30 31 32 def multi_run_wrapper(func): #多進程map包裹參數 33 @wraps(func) 34 def newF(args): 35 if isinstance(args,list): 36 return func(*args) 37 elif isinstance(args,tuple): 38 return func(*args) 39 else: 40 return func(args) 41 return newF 42 43 44 def getRanIp(): #獲得隨機IP 45 #123.125.40.255 - 123.127.134.56 北京聯通154938條 46 return "123.{0}.{1}.{2}".format(random.randint(125,127), random.randint(40,134), random.randint(56,255)) 47 48 49 def _cookiePool(url): #查看cookie池 50 cookie = cookielib.CookieJar() 51 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookie)) 52 opener.open(url) 53 for item in cookie: 54 print 'Name = '+item.name 55 print 'Value = '+item.value 56 57 58 def catchPage(url=''): #封裝的網頁頁面獲取 59 if not url: 60 return False 61 62 with open("./logs/outprint.txt","a") as f: 63 f.write(url+"\n") 64 65 try: 66 headers = { 67 'User-Agent':'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.6) Gecko/20091201 Firefox/3.5.6', 68 'Referer':'http://www.baidu.com', 69 "X-Forwarded-For":getRanIp() 70 } 71 req = urllib2.Request( 72 url = url, 73 headers = headers 74 ) 75 76 html = '' 77 result = '' 78 try: 79 try: 80 gevent.Timeout 81 except: 82 result = urllib2.urlopen(req,timeout=20) 83 else: 84 with gevent.Timeout(20, False): 85 result = urllib2.urlopen(req) 86 except urllib2.HTTPError, e: 87 #For Ptyhon 2.6 88 try: 89 socket.timeout 90 except: 91 print 'The server couldn\'t fulfill the request.' 92 print "url:{0} Httperrorcode:{1}".format(url, e.code) 93 else: 94 if isinstance(e.reason, socket.timeout): 95 print 'The server couldn\'t fulfill the request.' 96 print "url:{0} Httperrorcode:{1}".format(url, e.code) 97 except urllib2.URLError, e: 98 print 'We failed to reach a server.' 99 print "url:{0} Reason:{1}".format(url, e.reason) 100 except socket.timeout, e: 101 #For Python 2.7 102 print 'The server couldn\'t fulfill the request.' 103 print "url:{0} Httperrorcode:{1}".format(url, e) 104 else: 105 if result: 106 html = result.read() 107 return html 108 except: 109 try: 110 socket.timeout 111 except: 112 print 'The server couldn\'t fulfill the request.' 113 print "url:{0} Httperrorcode:{1}".format(url, 'timeout') 114 else: 115 print 'The server couldn\'t fulfill the request.' 116 print "url:{0} Server someting error".format(url) 117 return False 118 119 120 def opensqlitedb(): #從sqlite數據源開始工做 121 db_file = './data/mobile_area.db' 122 123 if not os.path.exists(db_file): 124 try: 125 cx = sqlite3.connect(db_file) 126 cu = cx.cursor() 127 #建表 128 sql = "create table mobile_area (id integer primary key,\ 129 mobile_num integer,\ 130 mobile_area varchar(50) NULL,\ 131 mobile_type varchar(50) NULL,\ 132 area_code varchar(50) NULL,\ 133 post_code varchar(50) NULL)" 134 cu.execute(sql) 135 except: 136 print "can not find sqlite db file\n" 137 with open('./logs/errorlog.txt','a') as f: 138 f.write("can not find sqlite db file '%s'\n" % str(db_file)) 139 return False 140 else: 141 try: 142 cx = sqlite3.connect(db_file) 143 cu = cx.cursor() 144 except: 145 print "can not find sqlite db file\n" 146 with open('./logs/errorlog.txt','a') as f: 147 f.write("can not find sqlite db file '%s'\n" % str(db_file)) 148 return False 149 150 mobile_err_list,mobile_dict = [],{} 151 limit = 10000 152 offset = 0 153 mobile_num_pre = 0 154 while 1: 155 cu.execute("SELECT * FROM mobile_area ORDER BY mobile_num ASC LIMIT %d OFFSET %d " % (limit, offset)) 156 rs = cu.fetchall() 157 if not rs: 158 break 159 else: 160 offset = offset + limit 161 for i in xrange(0,len(rs)): 162 id = rs[i][0] 163 mobile_num = int(rs[i][1]) 164 mobile_area = rs[i][2] 165 mobile_type = rs[i][3] 166 area_code = rs[i][4] 167 post_code = rs[i][5] 168 169 if len(mobile_area) > 100 or (not mobile_area) or (not mobile_num) or len(mobile_type) > 100 or len(area_code) > 100 or len(post_code) > 100 or len(str(mobile_num)) > 7: 170 print "error id:%d" % id 171 continue 172 173 #正確的號碼入字典 174 mobile_dict[str(mobile_num)] = True 175 176 print "get data from sqlite works down!\n" 177 return mobile_dict 178 179 180 @multi_run_wrapper 181 def getNumPage(segnum='', num='', url=''): #獲取號碼頁詳細數據 182 if not segnum: 183 return False 184 if not num: 185 return False 186 if not url: 187 return False 188 189 gevent.sleep(random.randint(10,22)*0.81) #今後處協程併發 190 191 db_file = './data/mobile_area.db' 192 193 html = catchPage(url) 194 if not html: 195 print "catch %s num page error!" % num 196 print "url:%s\n" % (url) 197 with open("./logs/errornum.txt", "a") as f: 198 f.write(segnum+','+num+','+url+"\n") 199 return False 200 201 #json數據 202 try: 203 page_temp_dict = json.loads(unquote(html)) 204 except: 205 print segnum+','+num+','+url+",result error convert to dict\n" 206 with open('./logs/errorlog.txt','a') as f: 207 f.write(segnum+','+num+','+url+",result error convert to dict\n") 208 return False 209 else: 210 try: 211 cx = sqlite3.connect(db_file) 212 cu = cx.cursor() 213 except: 214 print "can not find sqlite db file\n" 215 with open('./logs/errorlog.txt','a') as f: 216 f.write("can not find sqlite db file '%s'\n" % str(db_file)) 217 return False 218 219 insdata = {} 220 #mobile_num 221 if page_temp_dict.get('Mobile', False): 222 insdata['mobile_num'] = int(page_temp_dict['Mobile']) 223 else: 224 with open('./logs/errorlog.txt','a') as f: 225 f.write(segnum+','+num+','+url+",No matching data\n") 226 return False #無號碼 227 #mobile_area 228 if page_temp_dict.get('Province', False): 229 if page_temp_dict['Province'] == u'未知': 230 with open('./logs/errorlog.txt','a') as f: 231 f.write(segnum+','+num+','+url+",province is weizhi\n") 232 return False #無地區 233 if page_temp_dict.get('City', False): 234 insdata['mobile_area'] = strtodecode(page_temp_dict['Province']+' '+page_temp_dict['City']) 235 else: 236 insdata['mobile_area'] = strtodecode(page_temp_dict['Province']+' '+page_temp_dict['Province']) 237 else: 238 with open('./logs/errorlog.txt','a') as f: 239 f.write(segnum+','+num+','+url+",No matching province\n") 240 return False #無地區 241 #mobile_type 242 if page_temp_dict.get('Corp', False): 243 if page_temp_dict.get('Card', False): 244 insdata['mobile_type'] = strtodecode(page_temp_dict['Corp']+' '+page_temp_dict['Card']) 245 else: 246 insdata['mobile_type'] = strtodecode(page_temp_dict['Corp']) 247 #area_code 248 if page_temp_dict.get('AreaCode', False): 249 insdata['area_code'] = strtodecode(page_temp_dict['AreaCode']) 250 #post_code 251 if page_temp_dict.get('PostCode', False): 252 insdata['post_code'] = strtodecode(page_temp_dict['PostCode']) 253 254 if insdata: 255 sql = "insert into mobile_area values (?,?,?,?,?,?)" 256 cu.execute(sql, (None,insdata['mobile_num'],insdata['mobile_area'],insdata['mobile_type'],insdata['area_code'],insdata['post_code'])) 257 258 try: 259 cx.commit() #執行insert 260 except: 261 with open('./logs/errorlog.txt','a') as f: 262 f.write(segnum+','+num+','+url+",insert sqlitdb faild\n") 263 return False 264 else: 265 return True 266 267 def getneednum(url='', step=10): #獲取全部未記錄的號碼信息數據 268 if not lacknumlist: 269 return False 270 if not url: 271 return False 272 if not step: 273 print "step can not be null" 274 return False 275 if not isinstance(step,int): 276 print "step should be numeric" 277 return False 278 if step < 0: 279 print "step should be > 0" 280 return False 281 282 offset = 0 283 limit = int(step) 284 len_max = len(lacknumlist) 285 breaktag = False 286 while 1: 287 if breaktag: 288 break 289 290 threads = [] 291 for i in xrange(offset,(limit+offset)): 292 try: 293 num = lacknumlist[i] 294 except: 295 breaktag = True 296 break 297 else: 298 furl = url() 299 threads.append( gevent.spawn(getNumPage, (num[0:3], num, furl+num)) ) #協程併發 300 301 try: 302 gevent.joinall(threads) 303 print "%d-%d is end\n" % (offset+1,limit+offset) 304 except Exception as e: 305 print "Gevent catch error\n" 306 307 offset = offset + limit 308 time.sleep(random.randint(5,80)*0.9) 309 310 i = 1 #處理網絡異常號碼數據10次 311 while i <= 10: 312 if not os.path.exists("./logs/errornum.txt"): 313 break 314 j = 1 315 threads = [] 316 with open("./logs/errornum.txt","r") as f: 317 while 1: 318 if (j >= step) and threads: 319 try: 320 gevent.joinall(threads) 321 except Exception as e: 322 print "turn%d-%d Gevent catch error\n" % (i,j) 323 time.sleep(random.randint(5,80)*0.9) 324 threads = [] 325 j = 0 326 line = f.readline() 327 if line: 328 errnum_str = line.strip() 329 errnum_truple = errnum_str.split(',') 330 threads.append(gevent.spawn(getNumPage, (errnum_truple[0], errnum_truple[1], errnum_truple[2]))) 331 else: 332 if threads: 333 try: 334 gevent.joinall(threads) 335 except Exception as e: 336 print "turn%d-%d Gevent catch error\n" % (i,j) 337 break 338 j += 1 339 340 if i < 10: 341 with open("./logs/errornum.txt","w") as f: #清除文件內容 342 pass 343 i = i + 1 344 345 346 def setneednum(num='', mobile_dict={}): #設置獲得全部未補全的號碼 347 if not num: 348 return False 349 350 if len(str(num))==3: 351 start_num = int(num+'0000') 352 end_num = int(num+'9999') 353 else: 354 num_list = num.split('-') 355 start_num = int(num_list[0]) 356 end_num = int(num_list[1]) 357 358 i = start_num 359 while i <= end_num: 360 if not mobile_dict.get(str(i),False): #查找沒有的號碼 361 lacknumlist.append(str(i)) 362 i += 1 363 # print "%s num works down\n" % num 364 365 366 def setsegnum(segnumlist=[], mobile_dict={}): #根據號段起併發進程 367 if not segnumlist: 368 return False 369 370 record = [] 371 for seg in xrange(0, len(segnumlist)): 372 segnum = segnumlist[seg].strip() 373 if len(str(segnum)) == 3: #指定的單個號段:137 374 try: 375 int(segnum) 376 except: 377 print "%s is illegal argument\n" % str(segnum) 378 continue 379 else: 380 process = multiprocessing.Process(target=setneednum, args=(str(segnum), mobile_dict)) 381 process.start() 382 record.append(process) 383 elif len(str(segnum)) == 7: #具體指定的單個號碼:1391234 384 if not mobile_dict.get(str(segnum),False): 385 lacknumlist.append(str(segnum)) #sqlite沒有的號碼 386 else: 387 segparam_list = segnum.split('-') 388 try: 389 int(segparam_list[0]) 390 except: 391 print "%s is illegal argument\n" % str(segnum) 392 continue 393 else: 394 try: 395 segparam_list[1] 396 except: 397 print "%s is illegal argument\n" % str(segnum) 398 continue 399 else: 400 if segparam_list[0][:3] == segparam_list[1][:3] : #指定號碼範圍:1380000-1389999 401 process = multiprocessing.Process(target=setneednum, args=(str(segnum), mobile_dict)) 402 process.start() 403 record.append(process) 404 else: 405 print "%s is illegal argument\n" % str(segnum) 406 continue 407 for process in record: 408 process.join() 409 410 print "all SegNum prepare works down!\n" 411 412 413 def callback_url_showji(): #返回showji網的api地址 414 showji = 'http://api.showji.com/Locating/www.showji.c.o.m.aspx?output=json' 415 return "{0}×tamp={1}&m=".format(showji, int(time.time())) 416 417 418 def main(param=''): #主方法 419 with open("./logs/errornum.txt","w") as f: #清除零時文件內容 420 pass 421 with open("./logs/outprint.txt","w") as f: 422 pass 423 424 if not param: 425 print "no argument!" 426 return False 427 428 # segnumlist = [\ 429 # # '134','135','136','137','138','139','147','150','151','152','157','158','159','182','183','187','188',\ 430 # # '130','131','132','136','145','185','186',\ 431 # # '133','153','180','189',\ 432 # # '147','155','156','170','176','177','178','181','184'\ 433 # ] 434 435 segnumlist = str(param).split(',') 436 437 #從sqlite庫查已有的 438 mobile_dict = opensqlitedb() 439 440 #算哪些是尚未的 441 setsegnum(segnumlist, mobile_dict) 442 if lacknumlist: 443 tempstr = '' 444 for i in xrange(0,len(lacknumlist)): 445 tempstr += str(lacknumlist[i])+"\n" 446 with open("./logs/needmobilelist.txt","w") as f: 447 f.write(tempstr) 448 449 #補沒有的 450 getneednum(callback_url_showji) 451 452 print "all works end!" 453 454 455 if __name__ == "__main__": 456 from optparse import OptionParser 457 USAGE = "usage:python numspiderlist.py -s [String, e.g:138,137,1393134,1700001-1709999,1450000-1459999]" 458 parser = OptionParser(USAGE) 459 parser.add_option("-s", dest="s") 460 opt,args = parser.parse_args() 461 judopt = lambda x:x.s 462 463 if not opt.s: 464 print USAGE 465 sys.exit(1) 466 467 if not judopt(opt): 468 print USAGE 469 sys.exit(1) 470 471 if opt.s: 472 content = opt.s 473 474 main(content)
若是你看的仔細必定會發現我在加了這樣兩行:api
import socket socket.setdefaulttimeout(20)
這是爲了兼容python2.6如下版本urllib2的timeout沒法正常生效。並且在gevent異步非阻塞方式下urllib2的阻塞方式須要改用gevent.Timeout()替代。服務器