Elasticsearch整合scrapy在AI量化引擎中的應用

採集到的數據的存儲,咱們使用elasticscarch,下文簡稱es。es是基於lucene的全文索引服務,lucene是一個全文索引的開發包,而es在此基礎上擴展了不少搜索引擎必備的功能,並且提供的restful的API,es愈來愈像一個nosql數據。與之類似的產品是solr。solr的schema對於中文應用的配置不太方便。html

es的python api文檔地址以下:python

https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/index.htmlgit

es的基本操做,與數據庫的概念能夠對應上,index對應的是數據庫,doc_type對應數據表, id是unique_key,doc是一個dict格式的數據記錄。github

from elasticsearch import Elasticsearch
from datetime import datetime
class ESMgr(object):
    def __init__(self,index_name,doc_type):
        self.es = Elasticsearch(hosts='your ip')
        self.index_name = index_name
        self.doc_type = doc_type

    def add_doc(self,doc):
        id = doc.get('id')
        doc.pop('id')
        self.es.index(index=self.index_name, doc_type=self.doc_type,id=id, body=doc)

doc是一個dict格式,es同mongodb這樣的nosql相似,能夠不須要預先定義schema(es裏叫field mapping),會按照文檔字段的格式猜想字段的類型,但須要注意,一旦這個字段生成,它的mapping就是不可修改的,要修改只能刪除重建索引,因此在建索引之初這個mapping就要考慮清楚。sql

相同的id會自動把整個doc覆蓋掉,在保存網頁數據的時候,咱們能夠直接使用url,固然若是考慮存儲空間,能夠存儲url的hashcode。mongodb

查詢直接get便可,數據庫

#按id字段查詢
def get(self,id):
    try:
        source = self.es.get(index=self.index_name, doc_type=self.doc_type, id=id)['_source']
    except:
        return None
    return source

可使用es庫直接對內容增量排重,若是es庫裏已存在,也就是已經完成採集的url,就再也不request請求。api

class IngoreRequestMiddleware(object):
    def __init__(self):
        self.es = ESMgr(index_name='index_article',doc_type='article')

    def process_request(self, request, spider):
        # 查不到會返回None,不爲None,則已存在,無需再request
        # es只有article的url,庫裏存在就再也不採集了
        if self.es.is_doc_exist(request.url):
            logging.info('exist:%s' % request.url)
            raise IgnoreRequest("IgnoreRequest : %s" % request.url)
        else:
            return None

這裏提供一個「忽略」請求的中間件,須要在scrapy settings.py裏進行掛載restful

DOWNLOADER_MIDDLEWARES = {
'eagle.middlewares.IngoreRequestMiddleware': 533,
...

這裏值得注意一下,downloader minddlewares和spider middlewares所處的位置。app

  • 引擎打開一個網站(open a domain),找處處理該網站的Spider並向該spider請求第一個要爬取的URL(s)。

  • 引擎從Spider中獲取到第一個要爬取的URL並在調度器(Scheduler)以Request調度。

  • 引擎向調度器請求下一個要爬取的URL。

  • 調度器返回下一個要爬取的URL給引擎,引擎將URL經過下載中間件(請求(request)方向)轉發給下載器(Downloader)。

  • 一旦頁面下載完畢,下載器生成一個該頁面的Response,並將其經過下載中間件(返回(response)方向)發送給引擎。

  • 引擎從下載器中接收到Response並經過Spider中間件(輸入方向)發送給Spider處理。

  • Spider處理Response並返回爬取到的Item及(跟進的)新的Request給引擎。

  • 引擎將(Spider返回的)爬取到的Item給Item Pipeline,將(Spider返回的)Request給調度器。

  • (從第二步)重複直到調度器中沒有更多地request,引擎關閉該網站。

process_request(request, spider),若是每一個middleware都返回None,則這個請求會正常被處理,除非返回一個IgnoreRequest,這時這個請求就會被過濾。

使用elasticsearch-head組件能夠像數據庫管理軟件同樣查看索引狀態以及瀏覽文檔,這個插件如何安裝你們能夠自行google/百度。另外,ELK套件裏的kibana裏有一個Dev Tools,這個比較有用,能夠直接在裏邊使用dsl語法訪問數據。

es若是當成普通nosql來使用,能夠不手動定義mapping,它內置第一次本身選擇字段的mapping,但後續就沒法修改了,這也是底層lucene的限制。默認自符串,會被當成「keyword」類型,就是沒有進行分詞和索引,就是普通的一個串,像數據庫那般CURD沒有任何問題,但用到es的搜索功能就檢索不到了。

es畢竟主要是服務於全文索引,不然咱們直接用mongodb就行了,查詢語法更簡單,因此es還重在這個search的服務。

以下代碼對es的查詢做了封裝,同時按分頁查詢,並對關鍵詞作了高亮(highlight)顯示。

from elasticsearch import Elasticsearch
from datetime import datetime

class ESMgr(object):
    def __init__(self,index_name='index_article',doc_type='article'):
        self.es = Elasticsearch(hosts='47.94.133.21')
        self.index_name = index_name
        self.doc_type = doc_type

    def search(self,keywords,page = 1):
        response = self.es.search(
            index=self.index_name,
            body={
                "query": {
                    "multi_match": {
                        "query": keywords,
                        "fields": ["title", "content"]
                    }
                },
                "from": (page - 1) * 10,
                "size": 10,
                "highlight": {
                    "pre_tags": ['<span class="keyWord">'],
                    "post_tags": ['</span>'],
                    "fields": {
                        "title": {},
                        "content": {},
                    }
                }
            }
        )

        total_nums = response["hits"]["total"]

        hit_list = []
        for hit in response["hits"]["hits"]:
            hit_dict = {}

            if "title" in hit["highlight"]:
                #這裏是一個list,join後變成string
                hit_dict["title"] = "".join(hit["highlight"]["title"])
            else:
                hit_dict["title"] = hit["_source"]["title"]
            if "content" in hit["highlight"]:
                hit_dict["content"] = "".join(hit["highlight"]["content"])[:500]
            else:
                hit_dict["content"] = hit["_source"]["content"][:500]

            hit_dict["datetime"] = hit["_source"]["datetime"]
            hit_dict["url"] = hit["_source"]["url"]
            hit_dict["score"] = hit["_score"]

            hit_list.append(hit_dict)

        return hit_list

github上有人提供了elasticsearch-py的高級封裝庫:elasticsearch-dsl-py

https://github.com/elastic/elasticsearch-dsl-py

使用起來會直觀一點,固然有時候,對於底層api的理解也會帶來一些麻煩。

最後要解決的一個問題是mapping,對於中文而言,咱們對title,content是須要分詞的。

使用kibana的Dev Tools,能夠對一個新索引設定一次mapping,設定以後沒法修改新增doc若是有新的field,es會自動按第一次寫入的數據添加對應的mapping。以下是對title和content字段配置ik分詞的mapping命令。

PUT index_article_ik

{

  "mappings":{

    "article":{

      "properties": {

            "content": {

                "type": "text",

                "analyzer": "ik_max_word",

                "search_analyzer": "ik_max_word"

            },

            "title": {

                "type": "text",

                "analyzer": "ik_max_word",

                "search_analyzer": "ik_max_word"

            }

        }

    }

}

}

關於做者:魏佳斌,互聯網產品/技術總監,北京大學光華管理學院(MBA),特許金融分析師(CFA),資深產品經理/碼農。偏心python,深度關注互聯網趨勢,人工智能,AI金融量化。致力於使用最前沿的認知技術去理解這個複雜的世界。

掃描下方二維碼,關注:AI量化實驗室(ailabx),瞭解AI量化最前沿技術、資訊。

相關文章
相關標籤/搜索