# -*- coding:utf-8 -*-
# author: street
import rehtml
import datetime
import time, redis
from bs4 import BeautifulSoup
from lxml import etreemysql
import requests
from pymongo import MongoClient
from pymysql import connect
from selenium import webdriver
from selenium.webdriver import DesiredCapabilitieslinux
class ArticleFilter(object):
def __init__(self, title, content):
self.redis_client = redis.StrictRedis(host='127.0.0.1', port='6379', db=9)git
self.first_keywords = str(self.redis_client.get('first_keywords')).split(',')
self.second_keywords = str(self.redis_client.get('second_keywords')).split(',')
self.title = title
self.content = content
self.group_id_list = list()web
# 一級關鍵詞在內容中的頻次
def article_content_filter(self):
first_keyword_dict = dict()
second_keyword_dict = dict()redis
# 內容查找
if isinstance(self.content, list):
text = ''.join([item.get('text') for item in self.content if item.get('text')])
# 查詢文章內容含有的頻次最高的一級關鍵詞
for first_keyword in self.first_keywords:
num = 0
num += text.count(first_keyword)
if num > 0:
first_keyword_dict[first_keyword] = num
first_res = self.select_high(first_keyword_dict)
if len(first_res) == 1:
keyword, num = first_res[0][0], first_res[0][1]
keyword = {'first_keywords': keyword}
else:
# 頻次最高的一級關鍵詞沒有或者有多個,採用二級屬性詞分類標準
for second_keyword in self.second_keywords:
num = 0
num += text.count(second_keyword)
if num > 0:
second_keyword_dict[second_keyword] = num
second_res = self.select_high(second_keyword_dict)
if len(second_res) == 1:
keyword, num = second_res[0][0], second_res[0][1]
keyword = {'second_keywords': keyword}
elif len(second_res) > 1:
# 頻次最高的二級屬性詞有多個,文章分別上架到二級屬性詞對應的文章分類
keyword = [x[0] for x in second_res]
keyword = {'second_keywords': keyword}
else:
# 沒有匹配到二級屬性詞,但頻次最高的一級關鍵詞有多個,文章分別上架到一級關鍵詞對應的文章分類
if len(first_res) > 1:
keyword = [x[0] for x in first_res]
keyword = {'first_keywords': keyword}
else:
return False
return keyword
return Falsesql
# 標題查找
def article_title_filter(self):
first_keyword_dict = dict()macos
for first_keyword in self.first_keywords:
num = 0
num += self.title.count(first_keyword)
if num > 0:
first_keyword_dict[first_keyword] = num
first_res = self.select_high(first_keyword_dict)
if len(first_res) == 1:
keyword, num = first_res[0][0], first_res[0][1]
first_keywords = {'first_keywords': keyword}
return first_keywords
return Falsejson
# 關鍵詞查找--主函數,返回文章關鍵詞對應的分類ID
def article_filter(self):
# 1.標題查找
title_keyword = self.article_title_filter()
if title_keyword:
first_keywords = title_keyword.get('first_keywords')
group_id = self.get_keyword_group_id(first_keywords)
self.group_id_list.append(group_id)
else:
# 2.內容查找
content_keyword = self.article_content_filter()
if content_keyword:
first_keywords = content_keyword.get('first_keywords')
if isinstance(first_keywords, str):
group_id = self.get_keyword_group_id(first_keywords)
self.group_id_list.append(group_id)app
elif isinstance(first_keywords, list):
for first_keyword in first_keywords:
group_id = self.get_keyword_group_id(first_keyword)
self.group_id_list.append(group_id)
else:
second_keywords = content_keyword.get('second_keywords')
if isinstance(second_keywords, str):
group_id = self.get_keyword_group_id(second_keywords)
self.group_id_list.append(group_id)
elif isinstance(second_keywords, list):
for second_keyword in second_keywords:
group_id = self.get_keyword_group_id(second_keyword)
self.group_id_list.append(group_id)
else:
self.group_id_list = None
else:
self.group_id_list = None
return self.group_id_list
# 選取出現頻次最高的關鍵字
@staticmethod
def select_high(keyword_dict):
ls = sorted(list(keyword_dict.items()), key=lambda a: a[1], reverse=True)
index = 0
for i, x in enumerate(ls):
if x[1] == ls[0][1]:
index = i + 1
else:
break
print((ls[:index]))
return ls[:index]
# Redis取出關鍵詞對應的文章分類ID
def get_keyword_group_id(self, keyword):
article_group_id = self.redis_client.hget('group_id_of_keyword', keyword)
return article_group_id
# 文章敏感詞過濾
def sensitive_words_filter(self):
try:
sensitive_words = self.redis_client.get('sensitive_words')
if sensitive_words:
sensitive_words = sensitive_words.split(',')
text = ''.join([item.get('text') for item in self.content if item.get('text')])
for sensitive_word in sensitive_words:
resp_title = self.title.find(sensitive_word)
resp_content = text.find(sensitive_word)
if resp_title != -1 or resp_content != -1:
return True
else:
return False
else:
return False
except Exception as e:
return False
class huxiu_spider(object):
def __init__(self):
self.base_url = 'https://www.huxiu.com/'
self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.3; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0'}
def send_request(self, url):
response = requests.get(url, headers=self.headers)
text = response.text
return text
# 文章列表
def first_analysis(self, text):
selector = etree.HTML(text)
results=selector.xpath('//div[@class="mod-info-flow"]/div')
# results = selector.xpath('//div[@class="container"]/div[2]/div[2]/div')
# //*[@id="index"]/div[1]/div[2]/div[9]/div[1]/a/div/@style
# //div[@class="container"]/div[2]/div[2]/div --new
# //*[@id="index"]/div[1]/div[2]/div --old
new_list = []
i = 1
for res in results:
res_dict = {}
web_name = '虎嗅網'
res_dict['web_name'] = web_name
# 文章標題
title = res.xpath('div/h2/a/text()')[0]
print('正在爬取第%s篇文章,標題是:%s' % (i, title))
num = self.get_title(title, web_name)
print('查看文章是否存在=====')
if num == 0:
print('文章不存在~~~')
url = res.xpath('div/h2/a[starts-with(@href, "/article")]/@href')[0]
article_link = 'https://www.huxiu.com' + url
article_content, article_time = self.second_analysis(article_link)
if article_content != 1:
print('敏感詞開始過濾')
# 本地敏感關鍵詞過濾
article_filter_obj = ArticleFilter(title, article_content)
resp = article_filter_obj.sensitive_words_filter()
if resp:
print('文章存在敏感詞彙')
else:
# 文章內容
res_dict['content'] = article_content
# 文章發佈時間
res_dict['date'] = article_time
# 文章內容連接
res_dict['article_link'] = article_link
# 文章標題
res_dict['title'] = title
# 文章簡介
summary = res.xpath('div/div[2]/text()')[0]
res_dict['summary'] = summary
# 文章做者
name = res.xpath('div/div/a/span/text()')[0]
res_dict["name"] = name
# 文章做者連接
# res_dict["author_link"] = 'https://www.huxiu.com' + res.xpath('div/div/a/@href')[0]
# 文章列表主圖
if res.xpath('div/a/img/@data-original'):
min_pic = res.xpath('div/a/img/@data-original')[0]
oss_url = self.upload_oss(min_pic)
# oss_url = oss_url.replace('http', 'https')
res_dict["min_pic"] = oss_url
elif res.xpath('a/div/img/@data-original'):
min_pic = res.xpath('a/div/img/@data-original')[0]
oss_url = self.upload_oss(min_pic)
# oss_url = oss_url.replace('http', 'https')
res_dict["min_pic"] = oss_url
elif res.xpath('div/a/div/@style'):
# 截取圖片是視頻樣式的
mystr = res.xpath('div/a/div/@style')[0]
print(111, mystr)
start_index = mystr.find('(', 0, len(mystr))
end_index = mystr.find('?', 0, len(mystr))
min_pic = mystr[start_index + 2:end_index]
print(123, min_pic)
oss_url = self.upload_oss(min_pic)
print(321, oss_url)
# oss_url = oss_url.replace('http', 'https')
res_dict["min_pic"] = oss_url
else:
oss_url = ''
res_dict["min_pic"] = oss_url
self.upload_mongo(res_dict)
self.upload_mysql(title, name, article_time, oss_url, summary, web_name, article_link)
print('成功獲取並保存第%s篇文章' % i)
i += 1
new_list.append(res_dict)
else:
i += 1
continue
else:
i += 1
continue
print('成功獲取到%s篇文章' % (i - 1))
# 文章內容
def second_analysis(self, url):
try:
# 自定義PhantomJS的請求頭
cap = DesiredCapabilities.PHANTOMJS.copy()
for key, value in self.headers.items():
cap['phantomjs.page.customHeaders.{}'.format(
key)] = value
# browser = webdriver.PhantomJS(
# '/Users/fushande/Shande_Zhijian/phantomjs-2.1.1-macosx/phantomjs-2.1.1-macosx/bin/phantomjs')
browser = webdriver.PhantomJS('/usr/local/lib/phantomjs-2.1.1-linux-x86_64/bin/phantomjs')
# browser = webdriver.PhantomJS('/www/zhijiantest/phantomjs-1.9.7-linux-x86_64/bin/phantomjs')
browser.get(url)
time.sleep(3)
html = browser.page_source
# 選取文章發佈時間
selector = etree.HTML(html)
# //div[@class="column-link-box"]/span[1]/text() ---new
# //*[@class="article-author"]/div/span[1]/text() ---old
if selector.xpath('//div[@class="column-link-box"]/span[1]/text()'):
article_time = selector.xpath('//div[@class="column-link-box"]/span[1]/text()')[0]
print(article_time)
elif selector.xpath('//div[@class="article-author"]/span[1]/a/text()'):
article_time = selector.xpath('//div[@class="article-author"]/span[1]/a/text()')[0]
else:
article_time = ''
# 文章內頭圖
if selector.xpath('//div[@class="article-img-box"]/img/@src'):
article_min_pic = selector.xpath('//div[@class="article-img-box"]/img/@src')[0]
else:
article_min_pic = ""
# 選取文章內容
content = selector.xpath('//div[@id=「article_cstaontent304286」]')[0]
result = etree.tostring(content, method='html')
print('獲取到文章內容')
# 獲取bs4對象
soup = BeautifulSoup(result, 'html.parser', from_encoding='utf-8')
new_list = []
# 經過標籤來獲取內容
ls = soup.find_all(["p", "img"])
for table in ls:
res = {}
data = table.get_text()
if data:
# # 去除空字符和特殊字符
new_data = "".join(data.split())
new_data = new_data.replace(u'\ufeff', '')
if new_data != "":
res["text"] = new_data
new_list.append(res)
link = table.get('src')
if link:
oss_url = self.upload_oss(link)
res["img"] = oss_url
new_list.append(res)
if article_min_pic != '':
article_min_pic = self.upload_oss(article_min_pic)
# article_min_pic = article_min_pic.replace('http', 'https')
new_list.insert(0, {'img': article_min_pic})
browser.quit()
return new_list, article_time
except Exception as e:
print('文章不存在了', e)
return 1, 1
# 上傳圖片到oss
def upload_oss(self, url):
kw = {
'fileurl': url,
'filepath': 'gander_goose/dev/test2'
}
result = requests.post(url='url', data=kw)
result = result.json()
oss_url = result.get('oss_file_url')
oss_url = oss_url.replace('url', 'url1')
oss_url = oss_url.replace('http', 'https')
return oss_url
# 數據上傳mongo
def upload_mongo(self, article_dict):
try:
client = MongoClient('127.0.0.1', 27017)
my_db = client.wechat
my_db.articles.insert_one(article_dict)
print('上傳到mongo成功')
except Exception as e:
print('上傳到mongo失敗:', e)
# 插入到mysql
def upload_mysql(self, title, name, date, oss_url, summary, web_name, link):
try:
# 上傳mysql
# 建立Connection鏈接
conn = connect(host='localhost', port=3306, database='wechat',
user='root', password='mysql', charset='utf8')
# 得到Cursor對象
cs1 = conn.cursor()
# 執行insert語句,並返回受影響的行數:添加一條數據
# 增長
now = datetime.datetime.now()
imgurl = "https://cdn.max-digital.cn/gander_goose/dev/test2/15368082362561.jpg"
sql1 = "insert into zj_article_info (title,author,wechat_art_date,min_pic,summary,web_name,is_show,is_big,link,round_head_img,create_time) values ('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" % (
title, name, date, oss_url, summary, web_name, 0, 0, link, imgurl, now)
cs1.execute(sql1)
# 獲取最新插入的文章的ID
new_article_id = int(conn.insert_id())
# 修改分類--24小時下文章的自定義排序值
# sql2 = 'update zj_article_group set sort_num = sort_num + 1 where group_id=1'
# cs1.execute(sql2)
# 上線到24小時分類
sql3 = 'insert into zj_article_group (article_id,group_id,sort_num,create_time) values ("%s", "%s", "%s", "%s")' % (
new_article_id, 1, 1, now)
cs1.execute(sql3)
# 修改文章上線狀態
sql4 = "update zj_article_info set is_show = 1, zj_art_date='%s' where id='%s'" % (now, new_article_id)
cs1.execute(sql4)
conn.commit()
cs1.close()
conn.close()
print('上傳到mysql成功')
except Exception as e:
print('mysql上傳失敗:', e)
def get_title(self, title, query):
# 查詢mysql
conn = connect(host='127.0.0.1', port=3306,
database='test',
user='root', password='mysql', charset='utf8')
# 得到Cursor對象
cs1 = conn.cursor()
res = 'select * from zj_article_info where title = "%s" and web_name = "%s" ' % (title, query)
num = cs1.execute(res)
return num
def run(self):
text = self.send_request(self.base_url)
self.first_analysis(text)
if __name__ == '__main__': huxiu = huxiu_spider() while True: start_time = time.time() print('開始時間:', time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time))) huxiu.run() time.sleep(3600)