elasticsearch簡單集成到scrapy中python
使用elasticsearch的python接口處理數據git
https://github.com/elastic/elasticsearch-dsl-py github
elasticsearch-dsl-py官方使用文檔sql
http://elasticsearch-dsl.readthedocs.io/en/latest/ 服務器
建立一個DocType類,相似於item類app
# 以獲取jobbole網站的文章爲例 from datetime import datetime from elasticsearch_dsl import DocType, Date, Nested, Boolean, \ analyzer, InnerObjectWrapper, Completion, Keyword, Text, Integer from elasticsearch_dsl.connections import connections connections.create_connection(hosts=["localhost"]) # 容許鏈接至多臺服務器 class ArticleType(DocType): #伯樂在線文章類型 title = Text(analyzer="ik_max_word") create_date = Date() url = Keyword() url_object_id = Keyword() front_image_url = Keyword() front_image_path = Keyword() praise_nums = Integer() comment_nums = Integer() fav_nums = Integer() tags = Text(analyzer="ik_max_word") content = Text(analyzer="ik_max_word") class Meta: index = "jobbole" doc_type = "article" if __name__ == "__main__": ArticleType.init() # init方法會根據類定義直接生成mapping
建立一個items類,接收數據scrapy
class JobBoleArticleItem(scrapy.Item): title = scrapy.Field() create_date = scrapy.Field( input_processor=MapCompose(date_convert), ) url = scrapy.Field() url_object_id = scrapy.Field() front_image_url = scrapy.Field( output_processor=MapCompose(return_value) ) front_image_path = scrapy.Field() praise_nums = scrapy.Field( input_processor=MapCompose(get_nums) ) comment_nums = scrapy.Field( input_processor=MapCompose(get_nums) ) fav_nums = scrapy.Field( input_processor=MapCompose(get_nums) ) tags = scrapy.Field( input_processor=MapCompose(remove_comment_tags), output_processor=Join(",") ) content = scrapy.Field() def get_insert_sql(self): insert_sql = """ insert into jobbole_article(title, url, create_date, fav_nums) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE content=VALUES(fav_nums) """ params = (self["title"], self["url"], self["create_date"], self["fav_nums"]) return insert_sql, params def save_to_es(self): article = ArticleType() article.title = self['title'] article.create_date = self["create_date"] article.content = remove_tags(self["content"]) article.front_image_url = self["front_image_url"] if "front_image_path" in self: article.front_image_path = self["front_image_path"] article.praise_nums = self["praise_nums"] article.fav_nums = self["fav_nums"] article.comment_nums = self["comment_nums"] article.url = self["url"] article.tags = self["tags"] article.meta.id = self["url_object_id"] article.save() return
建立一個pipeline類,處理elasticsearch數據寫入elasticsearch
from models.es_types import ArticleType class ElasticsearchPipeline(object): def process_item(self, item, spider): item.save_to_es() return item
配置settingside
ITEM_PIPELINES = { 'ArticleSpider.pipelines.ElasticsearchPipeline': 1 }