python3爬蟲-爬取58同城上全部城市的租房信息

from fake_useragent import UserAgent
from lxml import etree
import requests, os
import time, re, datetime
import base64, json, pymysql
from fontTools.ttLib import TTFont

ua = UserAgent()


class CustomException(Exception):

    def __init__(self, status, msg):
        self.status = status
        self.msg = msg


class City_58:
    '''
    58同城的爬蟲類,目前就寫這兩個
    出租房url: https://cd.58.com/chuzu/         cd表明成都縮寫
    二手房url: https://cd.58.com/ershoufang/
    '''

    font_dict = {
        "glyph00001": "0",
        "glyph00002": "1",
        "glyph00003": "2",
        "glyph00004": "3",
        "glyph00005": "4",
        "glyph00006": "5",
        "glyph00007": "6",
        "glyph00008": "7",
        "glyph00009": "8",
        "glyph00010": "9",
    }
    conn = None

    def __init__(self):
        self.session = requests.Session()
        self.session.headers = {
            "user-agent": ua.random
        }
        self.__init__all_city()

    def __init__all_city(self):
        '''獲取全部城市的名字及縮寫的對應關係'''
        api = "https://www.58.com/changecity.html"
        headers = self.session.headers.copy()
        response = self.session.get(api, headers=headers)
        html = response.text
        res = re.findall("cityList = (.*?)</script>", html, re.S)[0]
        res = re.sub("\s", "", res)
        dic = json.loads(res)
        for k, v in dic.items():
            for k1, v1 in v.items():
                dic[k][k1] = v1.split("|")[0]
        city_dict = {}

        def traverse_dict(dic: dict):
            for k, v in dic.items():
                if k == "海外" or k == "其餘":
                    continue
                if isinstance(v, dict):
                    traverse_dict(v)
                city_dict[k] = v

        traverse_dict(dic)

        other_city = re.findall("independentCityList = (.*?)var", html, re.S)[0]
        res = re.sub("\s", "", other_city)
        other_city_dic = json.loads(res)

        for k, v in other_city_dic.items():
            other_city_dic[k] = v.split("|")[0]

        city_dict.update(other_city_dic)
        self.all_city_dict = city_dict

    def spider_zufang(self, city: str = "成都", is_get_all: bool = True):
        '''爬取租房信息的爬蟲方法'''
        assert self.all_city_dict is not None, "獲取全部城市信息失敗 !"
        format_city = self.all_city_dict.pop(city, None)
        assert format_city is not None, "{}該城市不在爬取城市以內".format(city)
        while True:
            self.city = city
            # self.file = open("./house_info.json", "a", encoding="utf-8")
            start_url = self.__init_zufang(format_city)

            # 思路是什麼,首先進入區域的租房頁面,在該頁面中先提取出相應的title,好比經紀人,我的房源等等...
            # 咱們須要構建出相應的url就能夠了
            # start_url的格式爲 https://cd.58.com/chuzu/  咱們須要轉爲這樣的格式 https://cd.58.com/jintang/hezu/
            # 咱們訪問轉化後的地址,再拿去到相應的連接,好比經紀人,我的房源等連接
            # 拿到該連接之後,這就是這個分類裏的第一頁url,咱們再對這個連接發生請求,
            # 拿到響應體,這裏能夠寫一個while循環,由於咱們不知道有多少頁,其實也能夠知道有多少頁,就是在這個響應體中可拿到
            # 個人思路就是寫一個while循環,判斷是否有下一頁,有的繼續,沒有的話直接break

            for url_info_list in self.__get_url(start_url):
                # 這裏的話,最好進行判斷一下,由於每一個title(值我的房源,品牌公寓等..)不同的話,可能爬取的策略也不太同樣
                title = url_info_list[1]
                if title in ["我的房源", "安選房源", "經紀人", "熱租房源"] or "出租" in title:
                    self.__spiders_v1(url_info_list)
                    # pass
                elif title == "品牌公寓":
                    self.__spiders_v2(url_info_list)
                    pass
                elif title == "房屋求租":
                    # 房屋求租不太想寫,數據也不是不少
                    pass
                else:
                    # 這種狀況不在範圍內,直接pass掉
                    continue
            if not is_get_all:
                return
            try:
                city = list(self.all_city_dict.keys()).pop()
                format_city = self.all_city_dict.pop(city)
            except IndexError:
                print('全國出租房信息,爬取完畢')
                return

    def spider_ershoufang(self, city: str = "cd"):
        '''爬取二手房信息的爬蟲方法'''
        pass

    def __spiders_v1(self, url_info_list):
        "負責處理我的房源,安選房源等等頁面的方法"
        url = url_info_list[2]
        page_num = 1
        while True:
            time.sleep(2)
            print("正在爬取{}-{}--第{}頁數據".format(url_info_list[0], url_info_list[1], page_num))
            response = self.__get_html_source(url)
            # 從html源碼中獲取到想要的數據
            for house_info_list in self.__deal_with_html_source_v1(response):
                self.__save_to_mysql(house_info_list, url_info_list)
            # 判斷是否還有下一頁
            next_page_url = self.__is_exist_next_page(response)
            if not next_page_url:
                print("{}-{}爬取完畢".format(url_info_list[0], url_info_list[1]))
                return
            url = next_page_url
            page_num += 1

    def __spiders_v2(self, url_info_list):
        '''處理品牌公寓的爬蟲信息'''
        base_url = url_info_list[2]
        format_url = self.__format_url_v2(base_url)
        page_num = 1
        params = None
        while True:
            print("正在爬取{}--第{}頁數據...".format(url_info_list[1], page_num))
            time.sleep(2)
            url = format_url.format(page_num)
            response = self.__get_html_source(url, params)
            # 獲取到有用的數據  deal_with_html_source_v2
            for house_info_list in self.__deal_with_html_source_v2(response):
                # self.__save_to_file_v2(house_info_list)
                self.__save_to_mysql(house_info_list)

            # 獲取到下一頁的encryptData
            encryptData = self.__get_html_encryptData(response)

            # 判斷是否還有下一頁,經過<div class="tip">信息不足,爲您推薦附近房源</div>
            if not self.__is_exist_next_page_v2(response):
                print("{}爬取完畢".format(url_info_list[1]))
                return
            page_num += 1
            params = {
                "encryptData": encryptData or "",
                "segment": "true"
            }

    def __save_to_file_v2(self, house_info_list):
        '''
        :param house_info_list: 關於房子的信息的列表
        :param url_info_list: [區域,類型(我的房源,經紀人等等...),url]
        :return:
        '''

        print("房間圖片地址>>:", file=self.file)
        print(json.dumps(house_info_list[0], ensure_ascii=False), file=self.file)
        print("房間描述>>:", file=self.file)
        print(json.dumps(house_info_list[1], ensure_ascii=False), file=self.file)
        print("房間詳情>>:", file=self.file)
        print(json.dumps(house_info_list[2], ensure_ascii=False), file=self.file)
        print("房間地理位置>>:", file=self.file)
        print(json.dumps(house_info_list[3], ensure_ascii=False), file=self.file)
        print("獲取房間的標籤>>:", file=self.file)
        print(json.dumps(house_info_list[4], ensure_ascii=False), file=self.file)
        print("獲取房間的價格>>:", file=self.file)
        print(json.dumps(house_info_list[5], ensure_ascii=False), file=self.file)
        print(file=self.file)

    def __save_to_mysql(self, house_info_list, url_info_list=None):
        '''保存到數據庫'''
        if not self.conn:
            self.conn = pymysql.connect(host="127.0.0.1",
                                        port=3306,
                                        user="root",
                                        password="root",
                                        db="city_58")
            self.conn.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
        if not url_info_list:
            sql = "insert into zu_house_copy (house_img_url,house_title,house_details,house_address,house_tags,hoouse_price,house_type,city) values (%s,%s,%s,%s,%s,%s,%s,%s)"
            house_info_list.append("品牌公寓")
        else:
            sql = "insert into zu_house_copy (house_img_url,house_title,house_details,house_address,house_tags,hoouse_price,area,house_type,city) values (%s,%s,%s,%s,%s,%s,%s,%s,%s)"
            house_info_list.append(url_info_list[0])
            house_info_list.append(url_info_list[1])
        house_info_list.append(self.city)
        row = self.conn.cursor.execute(sql, house_info_list)
        if not row:
            print("插入失敗")
        else:
            self.conn.commit()

    def __deal_with_html_source_v1(self, response):
        html = response.text
        self.__get_font_file(html)
        html = self.__format_html_source(html)
        for house_info_list in self.__parse_html_v1(html):
            yield house_info_list

    def __deal_with_html_source_v2(self, response):

        html = response.text
        # 源碼裏的關於數字0123456789都是進行處理過的,咱們須要先獲取到字體文件
        # 咱們先獲取到字體文件而且保存
        self.__get_font_file(html)

        # 對源碼中的字體進行處理,獲得瀏覽器顯示的數據
        html = self.__format_html_source(html)

        # 開始從頁面中提取出想要的數據
        for house_info_list in self.__parse_html_v2(html):
            yield house_info_list

    def __parse_html_v1(self, html):
        xml = etree.HTML(html)

        li_xpath_list = xml.xpath("//ul[@class='listUl']/li[@logr]")

        for li_xpath in li_xpath_list:
            house_info_list = []
            try:
                house_img_url = li_xpath.xpath("div[@class='img_list']/a/img/@lazy_src")[0]
            except IndexError:
                house_img_url = li_xpath.xpath("div[@class='img_list']/a/img/@src")[0]
            house_info_list.append(house_img_url)
            # 房間描述
            house_title = re.sub("\s", "", li_xpath.xpath("div[@class='des']/h2/a/text()")[0])
            house_info_list.append(house_title)
            # 房間詳情
            house_details = re.sub("\s", "",
                                   li_xpath.xpath("div[@class='des']/p[@class='room strongbox']/text()")[0].strip())
            house_info_list.append(house_details)
            # 房間地理位置
            house_address = re.sub("\s", "",
                                   li_xpath.xpath("div[@class='des']/p[@class='add']")[0].xpath("string(.)"))
            house_info_list.append(house_address)
            # 獲取房間的標籤
            house_tags = "暫無標籤"
            house_info_list.append(house_tags)
            # 獲取房間的價格
            hoouse_price = re.sub("\s", "",
                                  li_xpath.xpath("div[@class='listliright']/div[@class='money']")[0].xpath("string(.)"))
            house_info_list.append(hoouse_price)

            yield house_info_list

    def __parse_html_v2(self, html):
        '''解析頁面,拿到數據'''
        xml = etree.HTML(html)
        li_xpath_list = xml.xpath("//ul[@class='list']/li")
        for li_xpath in li_xpath_list:
            house_info_list = []
            # 房間圖片地址,這裏只獲取了一張,我在想要不要獲取多張
            # 先空着。。。。。。。。。。。。。
            house_img_url = li_xpath.xpath("a/div[@class='img']/img/@lazy_src")[0]
            house_info_list.append(house_img_url)
            # 房間描述
            house_title = li_xpath.xpath("a/div[@class='des strongbox']/h2/text()")[0].strip()
            house_info_list.append(house_title)
            # 房間詳情
            house_details = re.sub("\s", "", li_xpath.xpath("a/div[@class='des strongbox']/p[@class='room']/text()")[0])
            # house_details = li_xpath.xpath("a/div[@class='des strongbox']/p[@class='room']/text()")[0]
            house_info_list.append(house_details)
            # 房間地理位置
            house_address = re.sub("\s", "", li_xpath.xpath(
                "a/div[@class='des strongbox']/p[@class='dist']")[0].xpath("string(.)")) or "暫無地址"
            # house_address = li_xpath.xpath( "a/div[@class='des strongbox']/p[@class='dist']/text()")[0]
            house_info_list.append(house_address)
            # 獲取房間的標籤
            house_tags = ",".join(li_xpath.xpath("a/div[@class='des strongbox']/p[@class='spec']/span/text()"))
            house_info_list.append(house_tags)
            # 獲取房間的價格
            hoouse_price = re.sub("\s", "", li_xpath.xpath("a/div[@class='money']/span[@class='strongbox']")[0].xpath(
                "string(.)")) or "暫無價格"
            house_info_list.append(hoouse_price)

            yield house_info_list

    def __get_font_file(self, html):
        '''從源碼中獲取到字體文件,而且轉爲保存,轉爲TTFont對象'''
        try:
            b64 = re.findall(r"base64,(.*?)\'", html, re.S)[0]
            res = base64.b64decode(b64)
            with open("./online_font.ttf", "wb") as f:
                f.write(res)
            self.online_font = TTFont("./online_font.ttf")
            self.online_font.saveXML("./online.xml")
        except IndexError:
            return

    def __format_html_source(self, html):
        assert self.online_font, "必須建立字體對象"
        assert os.path.exists("./online.xml"), "請先獲取到字體文件。"

        with open("./online.xml", "rb") as f:
            file_data = f.read()

        online_uni_list = self.online_font.getGlyphOrder()[1:]
        file_selector = etree.HTML(file_data)
        for uni2 in online_uni_list:
            code = file_selector.xpath("//cmap//map[@name='{}']/@code".format(uni2))[0]
            dd = "&#x" + code[2:].lower() + ";"
            if dd in html:
                html = html.replace(dd, self.font_dict[uni2])
        return html

    def __format_url_v2(self, url):
        '''
        :param url: https://cd.58.com/pinpaigongyu/?from=58_pc_zf_list_ppgy_tab_ppgy
        :return: https://cd.58.com/pinpaigongyu/pn/{}/?from=58_pc_zf_list_ppgy_tab_ppgy
        '''
        a = url.split("?")
        a[0] = a[0] + "pn/{}"
        format_url = "?".join(a)
        return format_url

    def __is_exist_next_page_v2(self, response):
        xml = self.__response_to_xml(response)
        try:
            _ = xml.xpath("//div[@class='tip']")[0]
            return False
        except IndexError:
            return True

    def __get_html_encryptData(self, response):
        html = response.text
        encryptData = re.findall(r"encryptData\":\"(.*?)\"", html, re.S)[0]
        return encryptData

    def __get_url(self, start_url: str):
        url_set = set()
        for area, v in self.area_dict.items():
            url = self.__conversion_url(start_url, v)
            response = self.__get_html_source(url)
            title_dict = self.__get_title_info(response)
            for title_name, v in title_dict.items():
                # 對於求租、品牌公寓這個url,它是重複的,在這裏進行判斷判斷就行了
                if v in url_set:
                    continue
                else:
                    url_set.add(v)
                    yield [area, title_name, v]

    def __conversion_url(self, url: str, area: str):
        '''
        :param url:  https://cd.58.com/chuzu/
        :param area:
        :return: https://cd.58.com/區域縮寫/chuzu/
        '''
        lis = url.split("/")
        lis.insert(3, area)
        return "/".join(lis)

    def __init_zufang(self, format_city):
        '''首先將所須要的數據的獲取到'''
        start_url = "https://{}.58.com/chuzu/".format(format_city)
        headers = self.session.headers.copy()
        response = self.session.get(url=start_url, headers=headers)
        self.__get_area_info(response)
        return start_url

    def __get_html_source(self, url, params=None):
        '''經過get方式獲取到網頁的源碼'''
        time.sleep(1)
        headers = self.session.headers.copy()
        try:
            if not params:
                params = {}
            response = self.session.get(url=url, headers=headers, params=params)
            return response
        except Exception as e:
            with open("./url_log_error.txt", "a", encoding="utf-8") as f:
                f.write(str(datetime.datetime.now()) + "\n")
                f.write(str(e) + "\n")
                f.write("error_url>>:{}".format(url) + "\n")

    def __response_to_xml(self, response):
        try:
            xml = etree.HTML(response.text)
            return xml
        except AttributeError:
            raise CustomException(10000, "response對象轉換爲xml失敗,錯誤的連接地址爲>>:{}".format(response))

    def __is_exist_next_page(self, response):
        '''判斷是否存在下一頁,存在拿到下一頁的連接,不存在返回False'''
        xml = self.__response_to_xml(response)
        try:
            next_page_url = xml.xpath("//a[@class='next']/@href")[0]
            return next_page_url
        except IndexError:
            return False

    def __get_area_info(self, response):
        '''獲取到當前城市的區域'''
        xml = self.__response_to_xml(response)
        a_xpath_list = xml.xpath("//dl[@class='secitem secitem_fist']//a[not(@class)]")
        area_key_list = []
        area_value_list = []
        for a_xpath in a_xpath_list:
            area_key_list.append(a_xpath.xpath("text()")[0])
            area_value_list.append(re.findall("com/(.*?)/", a_xpath.xpath("@href")[0])[0])
        assert len(area_key_list) == len(area_value_list), "數據不完整"

        self.area_dict = {k: v for k, v in zip(area_key_list, area_value_list)}

    def __get_title_info(self, response):
        '''獲取房屋的分類,好比我的房源,合租房,經紀人,熱選房源...'''
        "listTitle"
        xml = self.__response_to_xml(response)
        a_xpath_list = xml.xpath("//div[@class='listTitle']//a[not(@class)]")
        title_key_list = []
        title_value_list = []
        for a_xpath in a_xpath_list:
            title_key_list.append(a_xpath.xpath("span/text()")[0])
            title_value_list.append(a_xpath.xpath("@href")[0])
        assert len(title_key_list) == len(title_value_list), "數據不完整"
        return {k: v for k, v in zip(title_key_list, title_value_list)}


if __name__ == '__main__':
    city_58 = City_58()
    city_58.spider_zufang("重慶")

 

附上數據庫爬取的結果html

相關文章
相關標籤/搜索