因爲硬件等各類緣由須要把大概170多萬2t左右的微博圖片數據存到Mysql中.以前存微博數據一直用的非關係型數據庫mongodb,因爲對Mysql的各類不熟悉,踩了無數坑,來來回回改了3天才完成。 python
PS:(本人長期出售超大量微博數據、旅遊網站評論數據,並提供各類指定數據爬取服務,Message to YuboonaZhang@Yahoo.com。同時歡迎加入社交媒體數據交流羣:99918768)mysql
存數據的時候首先須要設計數據庫,我準備設計了3個表sql
微博表:[id, userid, blog_text, lat, lng, created_time, reserve] pkey: idmongodb
圖片表:[md5, pic_url, pic_bin, exif, reserve] pkey: md5數據庫
關係表:[id, md5, reserve] pkey: (id, md5) fkey: (id, 微博表id) (md5, 圖片表md5)json
建表的時候別的問題都還好,主要是 pic_bin 的類型和 blog_text 的類型有很大的問題,首先是pic_bin的類型,開始設置的爲BLOB,可是運行以後發現BLOB最大隻能存1M的數據,並不能知足微博圖片的存儲,後改爲MEDIUMBLOB(16M)基本可以知足要求了。再後來就是blog_text,我遇到的第一個大坑api
開始的時候很天然的設置blog_text的類型爲TEXT,但跑起來發現有些數據存不進去,會報錯,經篩查發現是有些微博文本中包含了emoji表情...隨後找了不少資料發現是由於utf8下文字是三字節,可是emoji是四字節,須要將編碼改爲utf8mb4。然而我在mac上整mysql的配置文件報各類奇葩錯誤,一怒之下把TEXT改爲了BLOB,就行了。由於本地是MAC,我要鏈接到遠程的一臺Windows上才能經過那個Windows鏈接到羣暉的Mysql上...本地配置改了也白改。app
而後這就是一個大坑!!! 因爲我使用的python3,因此讀取圖片獲得的二進制的結果前面會有一個b', 表示bytes,正是因爲這個b'致使sql語句拼接的時候這個b後面的單引號會和sql語句的引號結合,致使後面的二進制沒有在引號裏面出錯!二進制編碼又不像string能夠對字符轉義,試了好多方法都不行!最後沒有辦法使用base64 對二進制進行加密轉化成字符串,存到數據庫中,而後要用時的時候再解密。fetch
pic_bin = str(base64.b64encode(pic_bin))[2:-1]
因爲使用Python多進程,一個小時8G數據量,圖片數據比較大,發包的時候回超過mysql的默認限制,出現Mysql server has gone away, 這個時候要改配置文件,在配置文件中參數網站
max_allowed_packet = 600M
wait_timeout = 60000
程序跑着跑着總會出現這個錯誤,一直找緣由,試了各類辦法看了好多資料,一直都是錯誤。實在不知道什麼緣由了...後來一想,我管他什麼緣由,失去鏈接以後從新鏈接就好了。使用conn.Ping(True) 判斷是否鏈接mysql成功。若是失去鏈接就從新鏈接就好了!最後解決了這個問題
#!/usr/bin/env python # -*- coding: utf-8 -*- # Created by Baoyi on 2017/10/16 from multiprocessing.pool import Pool import pymysql import requests import json import exifread from io import BytesIO import configparser import hashlib import logging import base64 # 配置logging logging.basicConfig(level=logging.WARNING, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%a, %d %b %Y %H:%M:%S', filename='weibo.log', filemode='w') cf = configparser.ConfigParser() cf.read("ConfigParser.conf") # 讀取配置mysql db_host = cf.get("mysql", "db_host") db_port = cf.getint("mysql", "db_port") db_user = cf.get("mysql", "db_user") db_pass = cf.get("mysql", "db_pass") db = cf.get("mysql", "db") # 建立鏈接 conn = pymysql.connect(host=db_host, user=db_user, passwd=db_pass, db=db, port=db_port, charset='utf8') # 獲取遊標 cursor = conn.cursor() # 建立insert_sql insert_blog_sql = ( "INSERT IGNORE INTO blog(userid, id, blog_text, lat, lng, created_time) VALUES('{uid}', '{id}','{blog_text}','{lat}','{lng}','{created_time}')" ) insert_pic_sql = ( "INSERT IGNORE INTO pics(pic_url, pic_bin, md5, exif) VALUES ('{pic_url}','{pic_bin}','{md5}','{exif}')" ) insert_relationship_sql = ( "INSERT IGNORE INTO relationship(id, md5) VALUES ('{id}','{md5}')" ) uid = [] with open('./data/final_id.txt', 'r') as f: for i in f.readlines(): uid.append(i.strip('\r\n')) # 處理圖片數據 def handle_pic(pic_url): large_pic_url = pic_url.replace('thumbnail', 'large') large_bin = requests.get(large_pic_url) return large_bin.content def get_poiid_info(uid): try: url = 'https://api.weibo.com/2/statuses/user_timeline.json' load = { 'access_token': 'xxxxxxxxxx', 'uid': uid, 'count': 100, 'feature': 2, 'trim_user': 1 } get_info = requests.get(url=url, params=load, timeout=(10, 10)) if get_info.status_code != 200: logging.warning(ConnectionError) pass info_json = json.loads(get_info.content) info_json['uid'] = uid statuses = info_json['statuses'] # 處理篩選微博數據 for status in statuses: id = status['idstr'] if status['geo'] is not None: lat = status['geo']['coordinates'][0] lng = status['geo']['coordinates'][1] pic_urls = status['pic_urls'] # 判斷是否在北京 if (115.7 < lng < 117.4) and (39.4 < lat < 41.6): # 若在北京,插入blog數據進庫 blog_text = status['text'].replace('\'', '\'\'') created_time = status['created_at'] try: cursor.execute( insert_blog_sql.format(uid=uid, id=id, blog_text=blog_text, lat=lat, lng=lng, created_time=created_time)) except pymysql.err.OperationalError as e_blog: logging.warning(e_blog.args[1]) pass # conn.commit() # 處理圖片 for pic_url in pic_urls: # 獲取原圖片二進制數據 pic_bin = handle_pic(pic_url['thumbnail_pic']) # 讀取exif 數據 pic_file = BytesIO(pic_bin) # 將二進制數據轉化成文件對象便於讀取exif數據信息和生成MD5 tag1 = exifread.process_file(pic_file, details=False, strict=True) tag = {} for key, value in tag1.items(): if key not in ( 'JPEGThumbnail', 'TIFFThumbnail', 'Filename', 'EXIF MakerNote'): # 去除四個沒必要要的exif屬性,簡化信息量 tag[key] = str(value) tags = json.dumps(tag) # dumps爲json類型 此tag即爲exif的json數據 # 生成MD5 MD5 = hashlib.md5(pic_file.read()).hexdigest() # 首先把二進制圖片用base64 轉成字符串以後再存 try: cursor.execute( insert_pic_sql.format(pic_url=pic_url['thumbnail_pic'].replace('thumbnail', 'large'), pic_bin=str(base64.b64encode(pic_bin))[2:-1], md5=MD5, exif=tags)) except pymysql.err.OperationalError as e_pic: logging.warning(e_pic.args[1]) pass try: cursor.execute(insert_relationship_sql.format(id=id, md5=MD5)) except pymysql.err.OperationalError as e_relation: logging.warning(e_relation) pass conn.commit() else: logging.info(id + " is Not in Beijing") pass else: logging.info(id + ' Geo is null') pass except pymysql.err.OperationalError as e: logging.error(e.args[1]) pass def judge_conn(i): global conn try: conn.ping(True) get_poiid_info(i) except pymysql.err.OperationalError as e: logging.error('Reconnect') conn = pymysql.connect(host=db_host, user=db_user, passwd=db_pass, db=db, charset='utf8') get_poiid_info(i) def handle_tuple(a_tuple): read_uid_set = [] for i in a_tuple: read_uid_set.append(i[0]) return set(read_uid_set) if __name__ == '__main__': sql_find_uid = ( "SELECT userid FROM blog" ) cursor.execute(sql_find_uid) read_uid_tuple = cursor.fetchall() read_list = handle_tuple(read_uid_tuple) print(len(read_list)) new_uid = set(uid).difference(read_list) print(len(new_uid)) pool = Pool() pool.map(judge_conn, list(new_uid))
本人長期出售抓取超大量微博數據的代碼,並提供微博數據打包出售,Message to YuboonaZhang@Yahoo.com