Python 操做 ElasticSearch

  官方文檔:https://elasticsearch-py.readthedocs.io/en/master/

  一、介紹

    python提供了操做ElasticSearch 接口,所以要用python來操做ElasticSearch,首先要安裝python的ElasticSearch包,用命令 pip install elasticsearch安裝或下載安裝:https://pypi.python.org/pypi/elasticsearch/5.4.0html

  二、建立索引

    假如建立索引名稱爲ott,類型爲ott_type的索引,該索引中有五個字段:python

    title:存儲中文標題,app

    date:存儲日期格式(2017-09-08),elasticsearch

    keyword:存儲中文關鍵字,spa

    source:存儲中文來源,3d

    link:存儲連接,code

    建立映射:orm

    

    

 

  三、索引數據

    

    批量索引htm

    利用bulk批量索引數據對象

    

  四、查詢索引

    

 

  五、刪除數據

    

   六、完整代碼

#coding:utf8
import os
import time
from os import walk
import CSVOP
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

class ElasticObj:
    def __init__(self, index_name,index_type,ip ="127.0.0.1"):
        '''

        :param index_name: 索引名稱
        :param index_type: 索引類型
        '''
        self.index_name =index_name
        self.index_type = index_type
        # 無用戶名密碼狀態
        #self.es = Elasticsearch([ip])
        #用戶名密碼狀態
        self.es = Elasticsearch([ip],http_auth=('elastic', 'password'),port=9200)

    def create_index(self,index_name="ott",index_type="ott_type"):
        '''
        建立索引,建立索引名稱爲ott,類型爲ott_type的索引
        :param ex: Elasticsearch對象
        :return:
        '''
        #建立映射
        _index_mappings = {
            "mappings": {
                self.index_type: {
                    "properties": {
                        "title": {
                            "type": "text",
                            "index": True,
                            "analyzer": "ik_max_word",
                            "search_analyzer": "ik_max_word"
                        },
                        "date": {
                            "type": "text",
                            "index": True
                        },
                        "keyword": {
                            "type": "string",
                            "index": "not_analyzed"
                        },
                        "source": {
                            "type": "string",
                            "index": "not_analyzed"
                        },
                        "link": {
                            "type": "string",
                            "index": "not_analyzed"
                        }
                    }
                }

            }
        }
        if self.es.indices.exists(index=self.index_name) is not True:
            res = self.es.indices.create(index=self.index_name, body=_index_mappings)
            print res


    def IndexData(self):
        es = Elasticsearch()
        csvdir = 'D:/work/ElasticSearch/exportExcels'
        filenamelist = []
        for (dirpath, dirnames, filenames) in walk(csvdir):
            filenamelist.extend(filenames)
            break
        total = 0
        for file in filenamelist:
            csvfile = csvdir + '/' + file
            self.Index_Data_FromCSV(csvfile,es)
            total += 1
            print total
            time.sleep(10)

    def Index_Data_FromCSV(self,csvfile):
        '''
        從CSV文件中讀取數據,並存儲到es中
        :param csvfile: csv文件,包括完整路徑
        :return:
        '''
        list = CSVOP.ReadCSV(csvfile)
        index = 0
        doc = {}
        for item in list:
            if index > 1:#第一行是標題
                doc['title'] = item[0]
                doc['link'] = item[1]
                doc['date'] = item[2]
                doc['source'] = item[3]
                doc['keyword'] = item[4]
                res = self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)
                print(res['created'])
            index += 1
            print index

    def Index_Data(self):
        '''
        數據存儲到es
        :return:
        '''
        list = [
            {   "date": "2017-09-13",
                "source": "慧聰網",
                "link": "http://info.broadcast.hc360.com/2017/09/130859749974.shtml",
                "keyword": "電視",
                "title": "付費 電視 行業面臨的轉型和挑戰"
             },
            {   "date": "2017-09-13",
                "source": "中國文明網",
                "link": "http://www.wenming.cn/xj_pd/yw/201709/t20170913_4421323.shtml",
                "keyword": "電視",
                "title": "電視 專題片《巡視利劍》廣獲好評:鐵腕反腐凝聚黨心民心"
             }
              ]
        for item in list:
            res = self.es.index(index=self.index_name, doc_type=self.index_type, body=item)
            print(res['created'])

    def bulk_Index_Data(self):
        '''
        用bulk將批量數據存儲到es
        :return:
        '''
        list = [
            {"date": "2017-09-13",
             "source": "慧聰網",
             "link": "http://info.broadcast.hc360.com/2017/09/130859749974.shtml",
             "keyword": "電視",
             "title": "付費 電視 行業面臨的轉型和挑戰"
             },
            {"date": "2017-09-13",
             "source": "中國文明網",
             "link": "http://www.wenming.cn/xj_pd/yw/201709/t20170913_4421323.shtml",
             "keyword": "電視",
             "title": "電視 專題片《巡視利劍》廣獲好評:鐵腕反腐凝聚黨心民心"
             },
            {"date": "2017-09-13",
             "source": "人民電視",
             "link": "http://tv.people.com.cn/BIG5/n1/2017/0913/c67816-29533981.html",
             "keyword": "電視",
             "title": "中國第21批赴剛果(金)維和部隊啓程--人民 電視 --人民網"
             },
            {"date": "2017-09-13",
             "source": "站長之家",
             "link": "http://www.chinaz.com/news/2017/0913/804263.shtml",
             "keyword": "電視",
             "title": "電視 盒子 哪一個牌子好? 吐血奉獻三大選購祕笈"
             }
        ]
        ACTIONS = []
        i = 1
        for line in list:
            action = {
                "_index": self.index_name,
                "_type": self.index_type,
                "_id": i, #_id 也能夠默認生成,不賦值
                "_source": {
                    "date": line['date'],
                    "source": line['source'].decode('utf8'),
                    "link": line['link'],
                    "keyword": line['keyword'].decode('utf8'),
                    "title": line['title'].decode('utf8')}
            }
            i += 1
            ACTIONS.append(action)
            # 批量處理
        success, _ = bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True)
        print('Performed %d actions' % success)

    def Delete_Index_Data(self,id):
        '''
        刪除索引中的一條
        :param id:
        :return:
        '''
        res = self.es.delete(index=self.index_name, doc_type=self.index_type, id=id)
        print res

    def Get_Data_Id(self,id):

        res = self.es.get(index=self.index_name, doc_type=self.index_type,id=id)
        print(res['_source'])

        print '------------------------------------------------------------------'
        #
        # # 輸出查詢到的結果
        for hit in res['hits']['hits']:
            # print hit['_source']
            print hit['_source']['date'],hit['_source']['source'],hit['_source']['link'],hit['_source']['keyword'],hit['_source']['title']

    def Get_Data_By_Body(self):
        # doc = {'query': {'match_all': {}}}
        doc = {
            "query": {
                "match": {
                    "keyword": "電視"
                }
            }
        }
        _searched = self.es.search(index=self.index_name, doc_type=self.index_type, body=doc)

        for hit in _searched['hits']['hits']:
            # print hit['_source']
            print hit['_source']['date'], hit['_source']['source'], hit['_source']['link'], hit['_source']['keyword'], \
            hit['_source']['title']




obj =ElasticObj("ott","ott_type",ip ="47.93.117.127")
# obj = ElasticObj("ott1", "ott_type1")

# obj.create_index()
obj.Index_Data()
# obj.bulk_Index_Data()
# obj.IndexData()
# obj.Delete_Index_Data(1)
# csvfile = 'D:/work/ElasticSearch/exportExcels/2017-08-31_info.csv'
# obj.Index_Data_FromCSV(csvfile)
# obj.GetData(es)
相關文章
相關標籤/搜索