pycurl的使用

     我使用的是pycurl庫是python使用libcurl的接口,官網是http://pycurl.sourceforge.net。相似urllib庫,pycurl用來獲取經過域名訪問的網絡資源。它支持多項協議:FTP, FTPS, HTTP, HTTPS, SCP, SFTP, TFTP, TELNET, DICT, LDAP, LDAPS, FILE, IMAP, SMTP, POP3。
    下面代碼:
php

c = pycurl.Curl()
    #url = "http://image.baidu.com/i?tn=baiduimage&ct=201326592&lm=-1&cl=2&nc=1&word="
    url = '/duy/d' //地址
    c.setopt(pycurl.URL,url)
    c.setopt(pycurl.USERAGENT,'Mozilla/5.0 (Windows NT 6.1; rv:27.0) Gecko/20100101 Firefox/27.0')//使用的客戶端
    c.setopt(pycurl.REFERER,'http://www.google.com/search?sourceid=chrome&ie=UTF-8&q='+rand_str())//上一個網頁
    c.setopt(pycurl.HTTPHEADER,['text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'])//http包頭
    c.setopt(pycurl.COOKIE,cookie)//使用的cookie格式是字符串:"key=value;key=value".
    c.setopt(pycurl.VERBOSE,1)//輸出調試信息
    c.setopt(pycurl.FOLLOWLOCATION, 1)//遇到302時候是否進行自動跳轉
    c.setopt(pycurl.MAXREDIRS, 5)
    c.setopt(pycurl.COOKIEFILE,"cookie_file_name")//使用的cookie的保存的文件名
    c.setopt(pycurl.COOKIEJAR, "cookie_file_name")
    c.setopt(pycurl.POST,1)//是不是post方法,默認是get
    c.setopt(pycurl.POSTFIELDS, urllib.urlencode(post_data))//post的數據,是字典:個字典:{"key":"value"}
    c.setopt(c.WRITEFUNCTION, t.body_callback)//結果寫入的回調函數,能夠是」
def body_callback(self,buf):
        self.contents = self.contents + buf
「
    c.setopt(pycurl.HEADERFUNCTION, d.body_callback)//一樣是結果的回調函數
    c.setopt(pycurl.ENCODING, 'gzip,deflate')//編碼


設置完了必要的參數以後能夠調用c.perform進行請求。細節不少參數功能能夠參照curl庫的其餘文檔,和官網。下面是我本身使用的一個類,封裝了pycurl:html


class curl_request:
    c=None
    def __init__(self,url,action='get'):
        self.url = url
        self.url_para =None
        self.c = pycurl.Curl()
        print self.url,"     d"
        self.c.setopt(pycurl.URL,self.url)
        self.c.setopt(pycurl.USERAGENT,'Miozilla/4.0 (compatible; MSIE 6.0; WindowsNT 5.1');
        self.c.setopt(pycurl.REFERER,'http://www.google.com/search?sourceid=chrome&ie=UTF-8&q='+rand_str())
        self.c.setopt(pycurl.COOKIE,'Hm_lvt_5251b1b3df8c7fd322ea256727293cf0=1393221156,1393223230,1393223252,1393223985;_jzqa=1.46109393469532')
        self.c.setopt(pycurl.VERBOSE,1)

        self.c.setopt(pycurl.HEADER,1)
        self.c.setopt(pycurl.FOLLOWLOCATION, 1)
        self.c.setopt(pycurl.MAXREDIRS, 5)
        self.c.setopt(pycurl.COOKIEFILE, 'cookie_file_name.txt')
        self.c.setopt(pycurl.COOKIEJAR, 'cookie_file_name.txt')
        if action == 'post':
            self.c.setopt(pycurl.POST,1)
            self.c.setopt(pycurl.POSTFIELDS, post_data = {"noe":"noe"})
        else:
            self.c.setopt(pycurl.HTTPGET,1)

#        c.setopt(c.WRITEFUNCTION, self.write)

#        c.setopt(pycurl.HEADERFUNCTION, d.body_callback)
        self.c.setopt(pycurl.ENCODING, 'gzip,deflate');

    def set_url_para(self,para):
        self.url_para = para
        url = self.url + para
        self.c.setopt(pycurl.URL,url)

    def set_post_para(self,para):
        self.c.setopt(pycurl.POST,1)
        self.c.setopt(pycurl.POSTFIELDS, urllib.urlencode( para))
    def set_cookie(self,cookie):
        self.c.setopt(pycurl.COOKIE,cookie)

    def perform(self,url='',referer=''):
        if url != '':
            self.c.setopt(pycurl.URL,url)
        if referer != '':
            self.c.setopt(pycurl.REFERER,referer)
        self.buf = cStringIO.StringIO()
        self.head = cStringIO.StringIO()
        self.c.setopt(self.c.WRITEFUNCTION, self.buf.write)
        self.c.setopt(pycurl.HEADERFUNCTION, self.head.write)
        try:
            self.c.perform()
        except Exception,e:
            self.c.close()
            self.buf.close()
            self.head.close()
        self.r = self.buf.getvalue()
        self.h = self.head.getvalue()
        self.code = self.c.getinfo(pycurl.HTTP_CODE)
        self.info = self.c.getinfo(pycurl.EFFECTIVE_URL)
        self.cookie = self.c.getinfo(pycurl.INFO_COOKIELIST)

        self.buf.close()
        self.head.close()
    def __del__(self):
        self.c.close()

    def get_body(self):
        return self.r
    def get_head(self):
        return self.h
    def get_code(self):
        return self.code
    def get_info(self):
        return self.info
    def get_cookie(self):
        return self.cookie


在涉及到某些網頁須要登陸才能訪問時,能夠設置cookie和post的數據進行登陸操做。登陸完成以後登陸的session信息會保存到cookie文件中,之後的訪問都會附帶上cookie驗證身份。python

使用這個請求完網頁內容以後,可使用beautifulsoup來解析網頁內容。這個用法相似於xml2的使用方法,能夠查找,也能夠遍歷。sql

像以下的代碼就是分析某個特定網站的html代碼以後,進行相應圖片的下載:chrome

def get_dynamic_mm(buf):
    root_soup = BeautifulSoup(''.join( buf ),fromEncoding="utf-8")
    div = root_soup.find('div',{ "class":"mm_time"})
    if div:
        for divsub in div.div :
            if str(type(divsub)) == "<class 'BeautifulSoup.Tag'>" and divsub['class'] == "girl_info" :
                name = divsub.a.string.strip().replace(" ","")
                page = divsub.a['href']
        os.makedirs("./照片/"+name)
        img_url = div.img['src']
        get_img(img_url,name,name)
        return page
 

def get_img(url,name,path):
    while 1:
        try :
            r = urllib2.urlopen(url)
            print './照片/'+path+'/'+name+'.gif'
            f = open('./照片/'+path+'/'+name+'.gif','ab+')
            f.write(r.read())
            r.close()
            f.close()
            break
        except Exception,e:
            print 'error'
            continue


beautiful的使用文檔也能夠在官網找到:http://www.crummy.com/software/BeautifulSoup/bash

配合這兩個3方庫,很容易能夠實現網絡機器人,進行圖片的下載,特定內容的監控(例如飛機票的價格)以及各個論壇去發帖。cookie

在這裏介紹一本書,比較簡易,介紹了做者怎樣使用php來編寫爬蟲,編寫爬蟲時應該注意的點,例如訪問網站時應該間隔多長時間,以及網絡機器人的用途,例如能夠檢測一個網頁裏的無效連接有多少。《Webbots, Spiders, and Screen Scrapers 2nd Editior》網絡

下面是我本身寫的簡易的爬蟲,爬取連接和相關的內容,把相關內容的網頁內容寫入sqlite文件。session

使用了線程池,在線程池初始化時,啓動線程,每一個線程循環獲取任務隊列的數據,獲取任務。獲取到任務後就進行任務處理(爬取網頁)。知道任務結束,設置flag結束全部線程。 這個方法很好,之前在工做中竟然沒有使用過線程(池)(一直都是無盡的fork,兩年的工做經驗啊,真是坑了公司),能夠節省系統資源,又能夠靈活的調整任務的效率,相對多進程來講還節省了進程間的數據傳遞,以及不容易出錯。app

爬連接的時候只是獲取 <a href=""></a>裏的href字段。

爬網頁的時候還進行了關鍵字的搜尋,搜尋到關鍵字後,就把內容寫入隊列,讓主線程進行數據的寫入(使用 sqlite)


import sys
import os
import re
import urllib
import urllib2
import time
import random
import pycurl
import Queue
import threading
import logging
from BeautifulSoup import BeautifulSoup
import getopt
import sqlite3
from Request import curl_request


global logger
class MyThread(threading.Thread):
    def __init__(self, workQueue, resultQueue, contentQueue, key, timeout=15):
        threading.Thread.__init__(self)
        self.mutex = threading.Lock()
        self.timeout = timeout
        self.setDaemon(True)
        self.workQueue = workQueue
        self.resultQueue = resultQueue
        self.contentQueue = contentQueue
        self.start()
        self.flag = False
        self.exit_flag = False
        self.key = key
        
    def run(self):
        while True:
            try:
               # if self.mutex.acquire(1): 
                callable, args, kwargs, deep = self.workQueue.get(timeout=self.timeout)
                #self.mutex.release()
                self.flag = True
                res = callable(args,self.resultQueue,self.contentQueue,kwargs,deep,self.key)
                self.flag = False
            except Queue.Empty:
                logger.debug('queue is emtpy')
                self.flag = False
                if self.exit_flag:
                    logger.info('exit_flag set')
                    break
                continue
            except :
                print sys.exc_info()
                raise
            
class ThreadPool:
    def __init__(self, key, num_of_threads=10):
        self.workQueue = Queue.Queue()
        self.resultQueue = Queue.Queue()
        self.contentQueue = Queue.Queue()
        self.threads = []
        self.key = key
        self.__createThreadPool(num_of_threads)
       
    def __createThreadPool(self, num_of_threads):
        for i in range( num_of_threads ):
            thread = MyThread( self.workQueue, self.resultQueue, self.contentQueue, self.key )
            self.threads.append(thread)
            
    def wait_for_complete(self):
        while len(self.threads):
            thread = self.threads.pop()
            if thread.isAlive():
                thread.join()
    def get_flag(self):
        flag = False
        for thread in self.threads:
            if thread.flag:
                flag = True
        return flag
    def get_num(self):
        num = 0
        for thread in self.threads:
            if thread.flag:
                num += 1
        return num
    def set_flag(self):
        flag = False
        for thread in self.threads:
            thread.exit_flag = True
                
    def add_job(self,callable, args,kwargs, deep):
        self.workQueue.put( (callable, args, kwargs, deep) )

def resovle_address(base_url,link):
    base_url = base_url.strip()
    logger.debug('url base is: '+base_url.encode()+' and link is: '+link.encode())
    link = link.strip()
    link.replace(';','')
    link.replace('\\','')
    link.replace('\'','')
    link.replace('/./','/')
    bash = base_url.rfind('/')
    if len(link) < 1:
        return None
    if bash != -1 and base_url[:bash+1] != "http://":
        base_url = base_url[:base_url.rfind('/')]
    m = re.search("http|www",link)
    if link[0] == '/' and len(link)>1:
        logger.debug('return url is ' + base_url.encode() + link.encode())
        return base_url + link
    elif m is not None:
        logger.debug('return link is' + link.encode()) 
        return link
    return None
    
        
    
        
def crawl_url( url, resultQueue, contentQueue, sleep, deep, key):
    global logger
    logger.debug('start to crawl the url: '+url.encode()+' and deep is: '+str(deep))
    time.sleep(int(sleep[0]))
    home_url = curl_request(url)
    home_url.perform()
    buf = home_url.get_body()
    if buf is None:
        return 
    root_soup = BeautifulSoup(''.join( buf ),fromEncoding="utf-8")
    body = root_soup.body
    u = body
    logger.info('body is '+str(u))
    m = re.findall("<a.*?>",str(u))
    for sub in m:
        if len(sub) < 1:
            continue
        tag_a = BeautifulSoup(''.join( sub ),fromEncoding="utf-8")
        if tag_a.a is not None and tag_a.a.has_key('href'):
            url_s = tag_a.a['href']
            url_s = resovle_address(url,url_s)
         #   print 'geting url and deep is ',url_s,deep
            if url_s is not None:
                #print 'adding iiiiiiiiiiiiiiiiiii',url_s
                logger.info('geting url :'+url_s.encode()+'deep is :'+str(deep))
                resultQueue.put( (url_s, deep+1) )
    if u is None:
        return
    for k in u:
        if re.search(key,str(k)) is not None:
          #  print str(k)
            contentQueue.put( (str(url), str(k) ))

def Usage():
    print 'myspider.py usage:'

def get_rand():
    return random.sample([0.1,0.2,0.3,0.4,0.5],1)
def main(argv):
    global logger
    thread_num=10
    try:
        opts, args = getopt.getopt(argv[1:],'hu:d:t:l:f:i:',['key=','thread=','dbfile='])
    except getopt.GetoptError, err:
        print str(err)
        Usage()
        sys.exit(2)
    for o, a in opts:
        if o in ('-h','--help'):
            Usage()
            sys.exit(1)
        elif o in ('-u',):
            url = a
        elif o in ('-d',):
            scrawl_level = int(a)
        elif o in ('-f',):
            log_file = a
        elif o in ('-l',):
            log_level = int(a)
        elif o in ('--key'):
            key = a
        elif o in ('--thread'):
            thread_num = int(a)
        elif o in ('--dbfile'):
            dbfile = a
        else:
            print 'unhandled option'
            sys.exit(3)

    cu = None
    cx = None
    logger = logging.getLogger()
    hdlr = logging.FileHandler(log_file)
    logger.addHandler(hdlr)
    level = (6-log_level)*10
    logger.setLevel(level)
  #  logger.info("hi")
    if dbfile is not None:
        os.remove(dbfile)
        cx = sqlite3.connect(dbfile)
        cu=cx.cursor()
        cu.execute("""create table content (id INTEGER PRIMARY KEY AUTOINCREMENT,url varchar(100), content varchar(4000)  )""")
          
    logger.debug('thread num is '+str(thread_num))
    logger.debug('scrawl_level is ' + str(scrawl_level))
    
    
    tp = ThreadPool(key,thread_num)
    tp.add_job(crawl_url, url , get_rand() ,1)
    deep = 1
    time_old = time.time()
    count = 0
    while 1:
        time_new = time.time()
        if time_new - time_old > 10:
            print '已經處理連接數:',count,'正在處理連接數',tp.get_num(),'剩餘未處理的連接數:',tp.resultQueue.qsize(),'未插入數據:',tp.contentQueue.qsize()
            time_old = time.time()
        try:
            url,deep= tp.resultQueue.get(timeout=0.5)
            if url is not None and int(deep) <= scrawl_level:
               # print "adding  deep",deep
                logger.info('adding url: '+url.encode()+'and deep is: '+str(deep))
                count += 1
                tp.add_job(crawl_url, url, get_rand(), deep)
        except Queue.Empty:
            if not tp.get_flag() and tp.contentQueue.qsize() == 0 and tp.resultQueue.qsize() == 0:
                print 'work done,exiting'
                tp.set_flag()
                break
        try:
            url,content= tp.contentQueue.get(timeout=0)
            if url is not None:
              #  print 'gettingiiiiiiiiii ',content,url
                cu.execute( "insert into content(url,content) values(?,?)", (str(url), content.decode('utf-8')))
        except Queue.Empty:
            continue
            
        
    if cx is not None:
        cx.commit()
        cx.close()
    tp.wait_for_complete()
    #print tp.workQueue.qsize()
    
if __name__ == '__main__':
    main(sys.argv)
相關文章
相關標籤/搜索