python的版本是2.7.10,使用了兩個第三方模塊bs4和happybase,能夠經過pip直接安裝。html
1.logger利用python自帶的logging模塊配置了一個簡單的日誌輸出python
2.getHtml利用自帶的urllib2模塊得到網站html的內容。另外urllib2還能夠配置代理的,後期還能夠作成每一個線程用一個代理地址,防止線程開得太多讓人家給封了。數據庫
3.get_html_class和get_html_id是兩個幾乎相同的函數,使用bs4模塊抓取網頁的相關內容併發
4.getcounty,getvillage和getintro是根據實際的網頁過濾想要的信息,這個要根據每一個網站的實際狀況來定。app
5.writetohbase利用了happybase模塊把抓到的數據寫入到hbase數據庫裏。這裏要注意hbase要開啓thrift1,happybase和thrift2是不兼容的。ide
6.div_list把一個大的列表,根據線程的數量散分紅n個小的列表,好讓每一個線程只處理本身的那個列表函數
7.代碼裏的url只是天津市的,線程開了4個,每一個線程都只抓51條,真的想要抓的話須要改一下網站
1 #!/usr/local/bin/python 2 # -*- coding: utf8 -*- 3 4 import urllib2,threading,time,logging,msvcrt 5 from bs4 import BeautifulSoup 6 import happybase 7 8 # 配置log 9 def logger(messagetype,message): 10 logging.basicConfig(level=logging.INFO,format='%(asctime)s [%(levelname)s] %(message)s',datefmt='%Y-%m-%d %H:%M:%S',filename="spider.log") 11 if (messagetype=="info"): 12 logging.info(message) 13 elif (messagetype=="warning"): 14 logging.warning(message) 15 16 # 主頁的網址,後面抓到的地址都是相對地址,要加上主頁地址才能使用 17 def mainhtml(): 18 mainhtml="http://www.yigecun.com" 19 return mainhtml 20 21 # 打開網頁 22 def getHtml(url): 23 page = urllib2.urlopen(url) 24 html = page.read() 25 return html 26 27 # 獲取網頁中指定的內容 28 def get_html_class(url,htmltype,htmlclass): 29 html = getHtml(url) 30 soup = BeautifulSoup(html,"html.parser",from_encoding="UTF-8") 31 divs=soup.find_all(htmltype,class_=htmlclass) 32 content=[] 33 for div in divs: 34 content.append(str(div).decode("utf8")) 35 return content 36 37 def get_html_id(url,htmltype,htmlid): 38 html = getHtml(url) 39 soup = BeautifulSoup(html,"html.parser",from_encoding="UTF-8") 40 content=soup.find_all(htmltype,id=htmlid) 41 return str(content[0]).decode("utf8") 42 43 # 縣的名字和地址 44 def getcounty(url): 45 # 從網頁上獲取省的名字 46 province=get_html_class(url,"li","active")[0] 47 province=province.split(">")[1].split("<")[0] 48 49 # 定義一個列表存放縣的信息及對應的連接地址 50 list_county=[] 51 52 # 從網頁上得到省市縣的信息及對應的連接地址 53 content=get_html_class(url,"div","cunnavtagguoup") 54 content[0]=content[0].replace("<div class=\"cunnavtagguoup\">\n","").replace("\n</div>","") 55 for item in content[0].split("</div>"): 56 57 # 獲取地級市的名字 58 if (item.split(">")[0]=="<div class=\"cunnavtaga\""): 59 city=item.split(">")[1] 60 # 地級市的名字前面加上省名的名字作前綴 61 city=province+"-"+city 62 63 # 獲取到縣的名字及相應的地址,並存放到列表裏 64 if (item.split(">")[0]=="<div class=\"cunnavtagb\""): 65 # 縣的名字前面再加上省市的名字 66 county=city+"-"+item.split(">")[2].replace("</a","") 67 path=mainhtml()+item.split("\"")[3] 68 list=[] 69 list.append(county) 70 list.append(path) 71 list_county.append(list) 72 73 return list_county 74 75 # 得到村的名字和地址 76 def getvillage(list_county): 77 list_village=[] 78 79 content=get_html_class(list_county[1],"div","cunnavtagguoup cunmargintd10") 80 content[0]=content[0].replace("<div class=\"cunnavtagguoup cunmargintd10\">\n","").replace("\n</div>","") 81 82 for item in content[0].split("</div>"): 83 84 # 得到鎮一級的名字 85 if (item.split(">")[0]=="<div class=\"cunnavtaga\""): 86 town=item.split(">")[1] 87 # 鎮的名字前面加上對應的省,市,縣的名字 88 town=list_county[0]+"-"+town 89 90 # 獲取到村的名字及相應的地址,並存放到列表裏 91 if (item.split(">")[0]=="<div class=\"cunnavtagb\""): 92 # 村在名字前面加上省市縣鎮的名字 93 village=town+"-"+item.split(">")[2].replace("</a","") 94 path=mainhtml()+item.split("\"")[3] 95 list=[] 96 list.append(village) 97 list.append(path) 98 list_village.append(list) 99 return list_village 100 101 # 得到村的簡介 102 def getintro(villagelist): 103 intro=get_html_id(villagelist[1],"div","cunintr") 104 intro=intro.replace("<div id=\"cunintr\">","").replace("</br>","").replace("</div>","").replace("\n","").replace(" ","") 105 newvillagelist=[] 106 newvillagelist.append(villagelist[0]) 107 newvillagelist.append(intro) 108 return newvillagelist 109 110 #寫到hbase數據庫 111 def writetohbase(villagelist): 112 113 # 村的名字作rowkey 114 rowkey=villagelist[0] 115 116 # 其餘內容放入到info列族的相應列裏 117 content=villagelist[1].split("<br>") 118 119 # 初始化每一個列的值,由於有可能有的列是空的,直接調用要報錯 120 intro="" 121 company="" 122 farmprod="" 123 resource="" 124 unit="" 125 other="" 126 127 for i in range (0,len(content)): 128 if (i==0): 129 intro=content[i] 130 elif (content[i].split(u":")[0]==u"村內企業"): 131 company=content[i].split(u":")[1] 132 elif (content[i].split(u":")[0]==u"主要農產品"): 133 farmprod=content[i].split(u":")[1] 134 elif (content[i].split(u":")[0]==u"村內資源"): 135 resource=content[i].split(u":")[1] 136 elif (content[i].split(u":")[0]==u"村裏單位"): 137 unit=content[i].split(u":")[1] 138 else: 139 other=content[i] 140 141 connection=happybase.Connection("192.168.161.101") 142 table=connection.table("village") 143 table.put(rowkey.encode("utf8"),{"info:intro":intro.encode("utf8"),"info:company":company.encode("utf8"),"info:farmprod":farmprod.encode("utf8"),"info:resource":resource.encode("utf8"),"info:unit":unit.encode("utf8"),"info:other":other.encode("utf8")}) 144 145 # 抓數據 146 def work(thread_id,list_village): 147 148 logger("info",u"線程"+"(thread_id="+str(thread_id)+u")已啓動,總共須要爬取的數量是 "+str(len(list_village))) 149 150 count=0 151 errorlist=[] 152 153 for village in list_village: 154 155 # 碰到錯誤最多重試三次 156 error=0 157 158 while (error<3) and (count<=50): 159 try: 160 newvillagelist=getintro(village) 161 writetohbase(newvillagelist) 162 # print "(thread_id="+str(thread_id)+")"+newvillagelist[0]+"done!" 163 logger("info","(thread_id="+str(thread_id)+")"+newvillagelist[0]+"done!") 164 count=count+1 165 break; 166 except: 167 error=error+1 168 time.sleep(5) 169 170 if (error>=3): 171 # print "(thread_id="+str(thread_id)+")"+newvillagelist[0]+"failed!" 172 errorlist.append(newvillagelist[0]) 173 logger("warning","(thread_id="+str(thread_id)+")"+newvillagelist[0]+"failed!") 174 175 logger("info","(thread_id="+str(thread_id)+u")工做結束,成功:"+str(count)+u",失敗:"+str(len(errorlist))) 176 177 # 相似於數據庫的hash分區同樣,把一個大的列表取模拆分紅幾個小的列表,這樣能夠方便多個線程進行併發處理 178 def div_list(list,n): 179 divlist=[] 180 181 for m in range (0,n): 182 divlist.append([]) 183 184 for i in range (0,len(list)): 185 divlist[i%n].append(list[i]) 186 187 result=[] 188 189 for j in range (0,n): 190 result.append(divlist[j]) 191 return result 192 193 if (__name__=="__main__"): 194 195 # print "spider begin" 196 logger("info",u"主程序啓動") 197 198 url="http://www.yigecun.com/lelist/listxian.aspx?id=0D59910890BB01DB" 199 num_thread=4 200 201 list=[] 202 203 204 for list_county in getcounty(url): 205 for list_village in getvillage(list_county): 206 list.append(list_village) 207 208 # print u"經統計須要爬取行政村的數量:"+str(len(list)) 209 logger("info",u"經統計須要爬取行政村的數量: "+str(len(list))) 210 print u"按任意鍵繼續" 211 msvcrt.getch() 212 213 214 newlist=[] 215 newlist=div_list(list,num_thread) 216 217 for i in range (0,num_thread): 218 t=threading.Thread(target=work,args=(i,newlist[i])) 219 t.start() 220
程序啓動的時候:url
結束的時候:spa
使用hive對hbase作mapreduce統計: