Python從Elasticsearch獲取數據並輸出.csv文件(附完整代碼)

需求背景

從Elasticsearch(如下簡稱ES)拉產品須要的數據並生成.csv文件, 因爲數據量可能會很大, 採用循環的方式執行. 我以前一直用Java來實現這個功能, 可是Java開發成本大, 部署比較麻煩, 趁着有空來研究下Python的實現. 下面把我這個Python初學者的實現過程分享給諸位, 經歷了不少坑, 文末會貼出完整的代碼.數組

代碼實現

鏈接ES平臺

Python鏈接ES須要使用官方數據包elasticsearch, 若是沒有可使用pip安裝. 這裏有個坑: 建議指定一個高版本安裝, 低版本在鏈接ES時出現了問題. 指定版本安裝方式以下:bash

pip install elasticsearch==6.3.1app

#-*- coding=utf-8'*-
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import csv
import os
import sys

es = Elasticsearch(["http://**.**.**.**:9210"])
# 查看鏈接是否成功
print(es.info())
複製代碼

**換成公司的ES服務IP地址, 這裏只鏈接了測試環境的IP, 咱們公司的測試環境不須要用戶身份信息. 若是還想作的健全一點, 能夠作個環境切換, 條件由入參決定. 若鏈接成功, print會打印出ES註冊和版本相關信息, 這裏便不展現了less

查詢條件

本文采用原生的ES搜索條件, 未進行任何包裝, 因此查詢條件看上去會比較繞. 這裏會有個坑: ES分頁查詢支持的偏移量最多隻有10000, 若是超過一萬會報錯:elasticsearch

TransportError(500, u'search_phase_execution_exception', u'Result window is too large, from + size must be less than or equal to: [10000] but was [50000]oop

解決辦法是: 可對某查詢條件A進行排序, 而後每次查詢都帶上上次查詢的最後值A.X, 用A條件>A.X便可, 這裏只簡單查個'companyId > 200'而且按照'companyId升序'的分頁公司列表, 不做其餘條件展開.測試

def buildQueryBody(pageNo, pageSize):
    # 因爲ES限制, 當偏移量>10000時須要特殊處理, 此處略
    offset = (pageNo -1) * pageSize
    body = {
        "query":{
            "bool":{
                "must":[
                    {
                        "range":{
                            "companyId":{
                                "gt":"200"
                            }
                        }
                    }
                ],
                "must_not":[

                ],
                "should":[

                ]
            }
        },
        "from":offset,
        "size":pageSize,
        "sort":[
            {"companyId":{ "order": "asc" }}
        ],
        "aggs":{

        }
    }
    return body
複製代碼

查詢結果提取

鏈接上ES, 建立好查詢條件後, 便可執行查詢:優化

# 建立查詢條件
body = buildQueryBody(initPageNo, initPageSize)
# 執行查詢
res = es.search(index=indexName, body=body)
複製代碼

這裏的indexName爲索引名, body爲查詢條件. 查詢結果進行判斷:ui

# seccessful == 1則成功
ifSuccess = res['_shards']['successful']
複製代碼

查詢結果列表和總數:編碼

resultList = res['hits']['hits']
total = res['hits']['total']
複製代碼

輸出文件

定義一個文件輸出地址, 這裏採用的方案是: 若是文件存在, 則追加; 若是文件不存在, 則新建.

# 文件是否存在
ifExist = os.path.exists(file)

if (ifExist):
    # 存在文件, 在後面追加
    print ("file already exist:", file)
    csvFile = open(file,'a+')
else:
    # 新建文件
    csvFile = open(file,'wb')
複製代碼

將數據寫入到csv文件用的是這行代碼, 這裏要注意的是: 不要將每一個元素拼接成一個字符串"e1,e2,e3,e4", 不然會出現每一個字符佔據一個單元格的狀況, 應該是(e1,e2,e3,e4)這樣的:

writer.writerow((e1, e2, e3, e4))

若是出現編碼異常的話, 可在全局定義下編碼格式'utf-8':

import sys

reload(sys)
sys.setdefaultencoding('utf-8')
複製代碼

對文件的處理須要注意的是: 1對異常的捕獲處理, 2對文件對象資源的關閉. 我這裏須要打印出四列數據.

try:
    ...
    writer=csv.writer(csvFile)
    writer.writerow((companyId, name, address, telephone))
except IOError as e:
    print ("IOError happen in:", e)
except Exception as e:
    print ("Error happen in:", e)
finally:
    csvFile.close()
複製代碼

循環處理

爲何會作循環處理, 是由於考慮到數據量可能會很大, 進行分批處理. 循環處理的邏輯是: 不斷以'下一頁'的形式進行查詢, 當某次查詢的結果爲空或小於分頁值時(在成功的前提下), 認爲查詢結束. 聽起來很容易就想到了do...while循環, 惋惜Python沒有這種, 因而就用'while True'來實現, 當達到'結束'的條件時, break跳出循環. 對於單次查詢失敗的狀況, 我未作重試機制, 理論上應該是要加的.

注意: 不管是Java仍是Python執行循環, 都要避免邏輯上的死循環的出現, 這裏也作了一個兜底(循環1000次).

initPageNo = 1
initPageSize = 100

# 定義一個兜底循環控制, 防止出現死循環. loopMaxCount值視狀況決定
loopCount = 0
loopMaxCount = 1000
while True:
    # 建立查詢條件
    body = buildQueryBody(initPageNo, initPageSize)
    # 獲取查詢結果
    res = es.search(index=indexName, body=body)
    # seccessful = 1則成功
    ifSuccess = res['_shards']['successful']
    if (ifSuccess == 1):
        # 這次查詢的數量, 用於判斷是否繼續查詢
        resultSize = len(res['hits']['hits'])
        ...
        if (resultSize < initPageSize):
            print ("查詢結束")
            break
        else:
            print ("下一頁, 繼續")
            initPageNo += 1
    else:
        print ("執行查詢失敗!", res)
        
    # 防止每次查詢都失敗形成的死循環 
    loopCount += 1
    if (loopCount >= loopMaxCount):
        break

複製代碼

以上是我實現的全過程, 下面是完整版代碼, 部分關鍵信息已作處理, 基本上能夠直接改改就用了. 若是補充上文中提到的一些可優化點, 應該是一個很是健全的用Python從Elasticsearch獲取數據並輸出.csv方案了.

完整代碼

#-*- coding=utf-8'*-
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import csv
import os
import sys

# 解決寫入文件時中文編碼問題
reload(sys)
sys.setdefaultencoding('utf-8')

# 此處替換IP地址
es = Elasticsearch(["http://**.**.**.**:9210"])
# 查看鏈接是否成功
print(es.info())

# 索引名
indexName = "company_info"

# 輸出文件路徑+文件名, 路徑選擇當前項目路徑
fileName = "es_out.csv"
filePath = os.getcwd()
print("current file path:",filePath)
file = filePath + "/" + fileName

def printCsv(file, dataList):
    # 防止出現空數組
    if (len(dataList) == 0):
        return
    
    csvFile = ""
    writer = ""

    # 文件是否存在
    ifExist = os.path.exists(file)

    if (ifExist):
        # 文件存在, 則在後面追加
        print ("file already exist:", file)
        csvFile = open(file,'a+')
    else:
        # 文件不存在, 則新建文件
        csvFile = open(file,'wb')

    try:
        writer=csv.writer(csvFile)
        if (ifExist == False):
            # 若是是新建的表, 此處定義表的列名信息
            writer.writerow(('公司Id','公司名','地址','手機'))

        # 此處循環寫入表數據
        for data in dataList:
            companyId = data['_source']['companyId'] if (data['_source'].has_key('companyId')) else 0
            name = data['_source']['name'] if (data['_source'].has_key('name')) else '--'
            address = data['_source']['address'] if (data['_source'].has_key('address')) else '--'
            telephone = data['_source']['telephone'] if (data['_source'].has_key('telephone')) else '--'

            # dataStr = "%d,%s,%s,%s" % (companyId, name, address, telephone)
            # print (dataStr)
            writer.writerow((companyId, name, address, telephone))

    except IOError as e:
        print ("IOError happen in:", e)
    except Exception as e:
        print ("Error happen in:", e)
    finally:
        csvFile.close()


def buildQueryBody(pageNo, pageSize):
    # 因爲ES限制, 當偏移量>10000時須要特殊處理, 此處略
    offset = (pageNo -1) * pageSize
    body = {
        "query":{
            "bool":{
                "must":[
                    {
                        "range":{
                            "companyId":{
                                "gt":"200"
                            }
                        }
                    }
                ],
                "must_not":[

                ],
                "should":[

                ]
            }
        },
        "from":offset,
        "size":pageSize,
        "sort":[
            {"companyId":{ "order": "asc" }}
        ],
        "aggs":{

        }
    }
    return body


def main():
    print ("start...")

    initPageNo = 1
    initPageSize = 50

    # 定義一個兜底循環控制, 防止出現死循環. loopMaxCount值視狀況決定
    loopCount = 0
    loopMaxCount = 1000
    while True:
        # 建立查詢條件
        body = buildQueryBody(initPageNo, initPageSize)
        # 獲取查詢結果
        res = es.search(index=indexName, body=body)
        # seccessful == 1則成功
        ifSuccess = res['_shards']['successful']
        if (ifSuccess == 1):
            print ("success query %d times!" % initPageNo)
            resultList = res['hits']['hits']
            total = res['hits']['total']
            # 查詢的數量, 用於判斷是否繼續查詢
            resultSize = len(resultList)
            print ("total: %d, get result: %d" % (total, resultSize))

            # 打印輸出結果
            printCsv(file, resultList)
            
            if (resultSize < initPageSize):
                print ("查詢結束")
                break
            else:
                print ("下一頁, 繼續")
                initPageNo += 1
        else:
            print ("執行查詢失敗!", res)

        loopCount += 1
        if (loopCount >= loopMaxCount):
            break

    print ("end...")
    
if __name__ == '__main__':
    main()
複製代碼

以爲實用的話點個讚唄~.~

相關文章
相關標籤/搜索