基於gevent全國手機號段spider蜘蛛爬蟲

 

首先介紹下python異步執行,python有兩種方法編寫異步代碼:html

一、corutines協程(也稱爲greenlets)python

二、回調git

gevent是greenlets的一種實現方式,能夠經過pip方便的安裝gevent模塊。gevent執行方式其實是代碼塊的交替執行,具體的能夠看下這篇blog,我就不重複造輪子了。github

值得一提的是,gevent封裝了不少接口,其中一個是著名的猴子補丁monkey,sql

from gevent import monkey
monkey.patch_all()

這兩行就能夠在代碼中改變其他包的行爲,讓其從同步阻塞方式變爲異步非阻塞方式,很是的神奇。數據庫

我用gevent的異步非阻塞方式寫了一個手機號段蜘蛛爬蟲,目前一直在服務器穩定的運行,代碼詳見個人github,內有福利。腳本用法:python numspiderlist.py -s [String, e.g:138,137,1393134,1700001-1709999,1450000-1459999],程序基於data/mobile_area.db數據庫作增量更新。json

  1 #!/usr/bin/python
  2 #-*- coding:utf-8 -*-
  3 """手機號段爬蟲:接收用戶命令參數精簡版 for sqlitedb
  4 @version:1.0
  5 @author:Kenny{Kenny.F<mailto:kennyffly@gmail.com>}
  6 @since:2014/05/23
  7 """
  8 import sys
  9 reload(sys)
 10 sys.setdefaultencoding('utf8')
 11 import gevent                         #gevent協程包
 12 import multiprocessing                #多進程
 13 from multiprocessing import Manager
 14 import urllib2
 15 from urllib import unquote,quote
 16 import socket
 17 socket.setdefaulttimeout(20)
 18 import cookielib
 19 import random
 20 import simplejson as json
 21 import os
 22 import time
 23 import sqlite3                        #sqlite數據庫操做
 24 from functools import wraps            #方法工具
 25 from strtodecode import strtodecode    #編碼檢測轉換
 26 
 27 
 28 manager = Manager()                    #多進程共享隊列
 29 lacknumlist = manager.list()
 30 
 31 
 32 def multi_run_wrapper(func):        #多進程map包裹參數
 33     @wraps(func)
 34     def newF(args):
 35         if isinstance(args,list):
 36             return func(*args)
 37         elif isinstance(args,tuple):
 38             return func(*args)
 39         else:
 40             return func(args)
 41     return newF
 42 
 43 
 44 def getRanIp():        #獲得隨機IP
 45     #123.125.40.255 - 123.127.134.56 北京聯通154938條
 46     return "123.{0}.{1}.{2}".format(random.randint(125,127), random.randint(40,134), random.randint(56,255))
 47 
 48 
 49 def _cookiePool(url):        #查看cookie池
 50     cookie = cookielib.CookieJar()
 51     opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookie))
 52     opener.open(url)
 53     for item in cookie:
 54         print 'Name = '+item.name
 55         print 'Value = '+item.value
 56 
 57 
 58 def catchPage(url=''):        #封裝的網頁頁面獲取
 59     if not url:
 60         return False
 61 
 62     with open("./logs/outprint.txt","a") as f:
 63         f.write(url+"\n")
 64 
 65     try:
 66         headers = {
 67             'User-Agent':'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.6) Gecko/20091201 Firefox/3.5.6',
 68             'Referer':'http://www.baidu.com',
 69             "X-Forwarded-For":getRanIp()
 70         }
 71         req = urllib2.Request(
 72             url = url,
 73             headers = headers
 74         )
 75 
 76         html = ''
 77         result = ''
 78         try:
 79             try:
 80                 gevent.Timeout
 81             except:
 82                 result = urllib2.urlopen(req,timeout=20)
 83             else:
 84                 with gevent.Timeout(20, False):
 85                     result = urllib2.urlopen(req)
 86         except urllib2.HTTPError, e:
 87             #For Ptyhon 2.6
 88             try:
 89                 socket.timeout
 90             except:
 91                 print 'The server couldn\'t fulfill the request.'
 92                 print "url:{0} Httperrorcode:{1}".format(url, e.code)
 93             else:
 94                 if isinstance(e.reason, socket.timeout):
 95                     print 'The server couldn\'t fulfill the request.'
 96                     print "url:{0} Httperrorcode:{1}".format(url, e.code)
 97         except urllib2.URLError, e:
 98             print 'We failed to reach a server.'
 99             print "url:{0} Reason:{1}".format(url, e.reason)
100         except socket.timeout, e:
101             #For Python 2.7
102             print 'The server couldn\'t fulfill the request.'
103             print "url:{0} Httperrorcode:{1}".format(url, e)
104         else:
105             if result:
106                 html = result.read()
107         return html
108     except:
109         try:
110             socket.timeout
111         except:
112             print 'The server couldn\'t fulfill the request.'
113             print "url:{0} Httperrorcode:{1}".format(url, 'timeout')
114         else:
115             print 'The server couldn\'t fulfill the request.'
116             print "url:{0} Server someting error".format(url)
117         return False
118 
119 
120 def opensqlitedb():        #從sqlite數據源開始工做
121     db_file = './data/mobile_area.db'
122 
123     if not os.path.exists(db_file):
124         try:
125             cx = sqlite3.connect(db_file)
126             cu = cx.cursor()
127             #建表
128             sql = "create table mobile_area (id integer primary key,\
129                                         mobile_num integer,\
130                                         mobile_area varchar(50) NULL,\
131                                         mobile_type varchar(50) NULL,\
132                                         area_code varchar(50) NULL,\
133                                         post_code varchar(50) NULL)"
134             cu.execute(sql)
135         except:
136             print "can not find sqlite db file\n"
137             with open('./logs/errorlog.txt','a') as f:
138                 f.write("can not find sqlite db file '%s'\n" % str(db_file))
139             return False
140     else:
141         try:
142             cx = sqlite3.connect(db_file)
143             cu = cx.cursor()
144         except:
145             print "can not find sqlite db file\n"
146             with open('./logs/errorlog.txt','a') as f:
147                 f.write("can not find sqlite db file '%s'\n" % str(db_file))
148             return False
149 
150     mobile_err_list,mobile_dict = [],{}
151     limit = 10000
152     offset = 0
153     mobile_num_pre = 0
154     while 1:
155         cu.execute("SELECT * FROM mobile_area ORDER BY mobile_num ASC LIMIT %d OFFSET %d " % (limit, offset))
156         rs = cu.fetchall()
157         if not rs:
158             break
159         else:
160             offset = offset + limit
161             for i in xrange(0,len(rs)):
162                 id = rs[i][0]
163                 mobile_num = int(rs[i][1])
164                 mobile_area = rs[i][2]
165                 mobile_type = rs[i][3]
166                 area_code = rs[i][4]
167                 post_code = rs[i][5]
168 
169                 if len(mobile_area) > 100 or (not mobile_area)  or (not mobile_num) or len(mobile_type) > 100 or len(area_code) > 100 or len(post_code) > 100 or len(str(mobile_num)) > 7:
170                     print "error id:%d" % id
171                     continue
172 
173                 #正確的號碼入字典
174                 mobile_dict[str(mobile_num)] = True
175 
176     print "get data from sqlite works down!\n"
177     return mobile_dict
178 
179 
180 @multi_run_wrapper
181 def getNumPage(segnum='', num='', url=''):        #獲取號碼頁詳細數據
182     if not segnum:
183         return False
184     if not num:
185         return False
186     if not url:
187         return False
188 
189     gevent.sleep(random.randint(10,22)*0.81)    #今後處協程併發
190 
191     db_file = './data/mobile_area.db'
192 
193     html = catchPage(url)
194     if not html:
195         print "catch %s num page error!" % num
196         print "url:%s\n" % (url)
197         with open("./logs/errornum.txt", "a") as f:
198             f.write(segnum+','+num+','+url+"\n")
199         return False
200 
201     #json數據
202     try:
203         page_temp_dict = json.loads(unquote(html))
204     except:
205         print segnum+','+num+','+url+",result error convert to dict\n"
206         with open('./logs/errorlog.txt','a') as f:
207             f.write(segnum+','+num+','+url+",result error convert to dict\n")
208         return False
209     else:
210         try:
211             cx = sqlite3.connect(db_file)
212             cu = cx.cursor()
213         except:
214             print "can not find sqlite db file\n"
215             with open('./logs/errorlog.txt','a') as f:
216                 f.write("can not find sqlite db file '%s'\n" % str(db_file))
217             return False
218 
219         insdata = {}
220         #mobile_num
221         if page_temp_dict.get('Mobile', False):
222             insdata['mobile_num'] = int(page_temp_dict['Mobile'])
223         else:
224             with open('./logs/errorlog.txt','a') as f:
225                 f.write(segnum+','+num+','+url+",No matching data\n")
226             return False    #無號碼
227         #mobile_area
228         if page_temp_dict.get('Province', False):
229             if page_temp_dict['Province'] == u'未知':
230                 with open('./logs/errorlog.txt','a') as f:
231                     f.write(segnum+','+num+','+url+",province is weizhi\n")
232                 return False    #無地區
233             if page_temp_dict.get('City', False):
234                 insdata['mobile_area'] = strtodecode(page_temp_dict['Province']+' '+page_temp_dict['City'])
235             else:
236                 insdata['mobile_area'] = strtodecode(page_temp_dict['Province']+' '+page_temp_dict['Province'])
237         else:
238             with open('./logs/errorlog.txt','a') as f:
239                 f.write(segnum+','+num+','+url+",No matching province\n")
240             return False    #無地區
241         #mobile_type
242         if page_temp_dict.get('Corp', False):
243             if page_temp_dict.get('Card', False):
244                 insdata['mobile_type'] = strtodecode(page_temp_dict['Corp']+' '+page_temp_dict['Card'])
245             else:
246                 insdata['mobile_type'] = strtodecode(page_temp_dict['Corp'])
247         #area_code
248         if page_temp_dict.get('AreaCode', False):
249             insdata['area_code'] = strtodecode(page_temp_dict['AreaCode'])
250         #post_code
251         if page_temp_dict.get('PostCode', False):
252             insdata['post_code'] = strtodecode(page_temp_dict['PostCode'])
253 
254         if insdata:
255             sql = "insert into mobile_area values (?,?,?,?,?,?)"
256             cu.execute(sql, (None,insdata['mobile_num'],insdata['mobile_area'],insdata['mobile_type'],insdata['area_code'],insdata['post_code']))
257 
258             try:
259                 cx.commit()        #執行insert
260             except:
261                 with open('./logs/errorlog.txt','a') as f:
262                     f.write(segnum+','+num+','+url+",insert sqlitdb faild\n")
263                 return False
264             else:
265                 return True
266 
267 def getneednum(url='', step=10):        #獲取全部未記錄的號碼信息數據
268     if not lacknumlist:
269         return False
270     if not url:
271         return False
272     if not step:
273         print "step can not be null"
274         return False
275     if not isinstance(step,int):
276         print "step should be numeric"
277         return False
278     if step < 0:
279         print "step should be > 0"
280         return False
281 
282     offset = 0
283     limit = int(step)
284     len_max = len(lacknumlist)
285     breaktag = False
286     while 1:
287         if breaktag:
288             break
289 
290         threads = []
291         for i in xrange(offset,(limit+offset)):
292             try:
293                 num = lacknumlist[i]
294             except:
295                 breaktag = True
296                 break
297             else:
298                 furl = url()
299                 threads.append( gevent.spawn(getNumPage, (num[0:3], num, furl+num)) )        #協程併發
300 
301         try:
302             gevent.joinall(threads)
303             print "%d-%d is end\n" % (offset+1,limit+offset)
304         except Exception as e:
305             print "Gevent catch error\n"
306 
307         offset = offset + limit
308         time.sleep(random.randint(5,80)*0.9)
309 
310     i = 1                                     #處理網絡異常號碼數據10次
311     while i <= 10:
312         if not os.path.exists("./logs/errornum.txt"):
313             break
314         j = 1
315         threads = []
316         with open("./logs/errornum.txt","r") as f:
317             while 1:
318                 if (j >= step) and threads:
319                     try:
320                         gevent.joinall(threads)
321                     except Exception as e:
322                         print "turn%d-%d Gevent catch error\n" % (i,j)
323                     time.sleep(random.randint(5,80)*0.9)
324                     threads = []
325                     j = 0
326                 line = f.readline()
327                 if line:
328                     errnum_str = line.strip()
329                     errnum_truple = errnum_str.split(',')
330                     threads.append(gevent.spawn(getNumPage, (errnum_truple[0], errnum_truple[1], errnum_truple[2])))
331                 else:
332                     if threads:
333                         try:
334                             gevent.joinall(threads)
335                         except Exception as e:
336                             print "turn%d-%d Gevent catch error\n" % (i,j)
337                     break
338                 j += 1
339 
340         if i < 10:
341             with open("./logs/errornum.txt","w") as f:        #清除文件內容
342                 pass
343         i = i + 1
344 
345 
346 def setneednum(num='', mobile_dict={}):        #設置獲得全部未補全的號碼
347     if not num:
348         return False
349 
350     if len(str(num))==3:
351         start_num = int(num+'0000')
352         end_num = int(num+'9999')
353     else:
354         num_list = num.split('-')
355         start_num = int(num_list[0])
356         end_num = int(num_list[1])
357 
358     i = start_num
359     while i <= end_num:
360         if not mobile_dict.get(str(i),False):        #查找沒有的號碼
361             lacknumlist.append(str(i))
362         i += 1
363     # print "%s num works down\n" % num
364 
365 
366 def setsegnum(segnumlist=[], mobile_dict={}):        #根據號段起併發進程
367     if not segnumlist:
368         return False
369 
370     record = []
371     for seg in xrange(0, len(segnumlist)):
372         segnum = segnumlist[seg].strip()
373         if len(str(segnum)) == 3:        #指定的單個號段:137
374             try:
375                 int(segnum)
376             except:
377                 print "%s is illegal argument\n" % str(segnum)
378                 continue
379             else:
380                 process = multiprocessing.Process(target=setneednum, args=(str(segnum), mobile_dict))
381                 process.start()
382                 record.append(process)
383         elif len(str(segnum)) == 7:        #具體指定的單個號碼:1391234
384             if not mobile_dict.get(str(segnum),False):
385                 lacknumlist.append(str(segnum)) #sqlite沒有的號碼
386         else:
387             segparam_list = segnum.split('-')
388             try:
389                 int(segparam_list[0])
390             except:
391                 print "%s is illegal argument\n" % str(segnum)
392                 continue
393             else:
394                 try:
395                     segparam_list[1]
396                 except:
397                     print "%s is illegal argument\n" % str(segnum)
398                     continue
399                 else:
400                     if segparam_list[0][:3] == segparam_list[1][:3] :        #指定號碼範圍:1380000-1389999
401                         process = multiprocessing.Process(target=setneednum, args=(str(segnum), mobile_dict))
402                         process.start()
403                         record.append(process)
404                     else:
405                         print "%s is illegal argument\n" % str(segnum)
406                         continue
407     for process in record:
408         process.join()
409 
410     print "all SegNum prepare works down!\n"
411 
412 
413 def callback_url_showji():        #返回showji網的api地址
414     showji = 'http://api.showji.com/Locating/www.showji.c.o.m.aspx?output=json'
415     return "{0}&timestamp={1}&m=".format(showji, int(time.time()))
416 
417 
418 def main(param=''):        #主方法
419     with open("./logs/errornum.txt","w") as f:        #清除零時文件內容
420         pass
421     with open("./logs/outprint.txt","w") as f:
422         pass
423 
424     if not param:
425         print "no argument!"
426         return False
427 
428     # segnumlist = [\
429     #             # '134','135','136','137','138','139','147','150','151','152','157','158','159','182','183','187','188',\
430     #             # '130','131','132','136','145','185','186',\
431     #             # '133','153','180','189',\
432     #             # '147','155','156','170','176','177','178','181','184'\
433     #             ]
434 
435     segnumlist = str(param).split(',')
436 
437     #從sqlite庫查已有的
438     mobile_dict = opensqlitedb()
439 
440     #算哪些是尚未的
441     setsegnum(segnumlist, mobile_dict)
442     if lacknumlist:
443         tempstr = ''
444         for i in xrange(0,len(lacknumlist)):
445             tempstr += str(lacknumlist[i])+"\n"
446         with open("./logs/needmobilelist.txt","w") as f:
447             f.write(tempstr)
448 
449     #補沒有的
450     getneednum(callback_url_showji)
451 
452     print "all works end!"
453 
454 
455 if __name__ == "__main__":
456     from optparse import OptionParser
457     USAGE = "usage:python numspiderlist.py -s [String, e.g:138,137,1393134,1700001-1709999,1450000-1459999]"
458     parser = OptionParser(USAGE)
459     parser.add_option("-s", dest="s")
460     opt,args = parser.parse_args()
461     judopt = lambda x:x.s
462 
463     if not opt.s:
464         print USAGE
465         sys.exit(1)
466 
467     if not judopt(opt):
468         print USAGE
469         sys.exit(1)
470 
471     if opt.s:
472         content = opt.s
473 
474     main(content)

若是你看的仔細必定會發現我在加了這樣兩行:api

import socket
socket.setdefaulttimeout(20)

這是爲了兼容python2.6如下版本urllib2的timeout沒法正常生效。並且在gevent異步非阻塞方式下urllib2的阻塞方式須要改用gevent.Timeout()替代。服務器

相關文章
相關標籤/搜索