從Elasticsearch(如下簡稱ES)拉產品須要的數據並生成.csv文件, 因爲數據量可能會很大, 採用循環的方式執行. 我以前一直用Java來實現這個功能, 可是Java開發成本大, 部署比較麻煩, 趁着有空來研究下Python的實現. 下面把我這個Python初學者的實現過程分享給諸位, 經歷了不少坑, 文末會貼出完整的代碼.數組
Python鏈接ES須要使用官方數據包elasticsearch, 若是沒有可使用pip安裝. 這裏有個坑: 建議指定一個高版本安裝, 低版本在鏈接ES時出現了問題. 指定版本安裝方式以下:bash
pip install elasticsearch==6.3.1
app
#-*- coding=utf-8'*-
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import csv
import os
import sys
es = Elasticsearch(["http://**.**.**.**:9210"])
# 查看鏈接是否成功
print(es.info())
複製代碼
**換成公司的ES服務IP地址, 這裏只鏈接了測試環境的IP, 咱們公司的測試環境不須要用戶身份信息. 若是還想作的健全一點, 能夠作個環境切換, 條件由入參決定. 若鏈接成功, print會打印出ES註冊和版本相關信息, 這裏便不展現了less
本文采用原生的ES搜索條件, 未進行任何包裝, 因此查詢條件看上去會比較繞. 這裏會有個坑: ES分頁查詢支持的偏移量最多隻有10000, 若是超過一萬會報錯:elasticsearch
TransportError(500, u'search_phase_execution_exception', u'Result window is too large, from + size must be less than or equal to: [10000] but was [50000]oop
解決辦法是: 可對某查詢條件A進行排序, 而後每次查詢都帶上上次查詢的最後值A.X, 用A條件>A.X便可, 這裏只簡單查個'companyId > 200'而且按照'companyId升序'的分頁公司列表, 不做其餘條件展開.測試
def buildQueryBody(pageNo, pageSize):
# 因爲ES限制, 當偏移量>10000時須要特殊處理, 此處略
offset = (pageNo -1) * pageSize
body = {
"query":{
"bool":{
"must":[
{
"range":{
"companyId":{
"gt":"200"
}
}
}
],
"must_not":[
],
"should":[
]
}
},
"from":offset,
"size":pageSize,
"sort":[
{"companyId":{ "order": "asc" }}
],
"aggs":{
}
}
return body
複製代碼
鏈接上ES, 建立好查詢條件後, 便可執行查詢:優化
# 建立查詢條件
body = buildQueryBody(initPageNo, initPageSize)
# 執行查詢
res = es.search(index=indexName, body=body)
複製代碼
這裏的indexName爲索引名, body爲查詢條件. 查詢結果進行判斷:ui
# seccessful == 1則成功
ifSuccess = res['_shards']['successful']
複製代碼
查詢結果列表和總數:編碼
resultList = res['hits']['hits']
total = res['hits']['total']
複製代碼
定義一個文件輸出地址, 這裏採用的方案是: 若是文件存在, 則追加; 若是文件不存在, 則新建.
# 文件是否存在
ifExist = os.path.exists(file)
if (ifExist):
# 存在文件, 在後面追加
print ("file already exist:", file)
csvFile = open(file,'a+')
else:
# 新建文件
csvFile = open(file,'wb')
複製代碼
將數據寫入到csv文件用的是這行代碼, 這裏要注意的是: 不要將每一個元素拼接成一個字符串"e1,e2,e3,e4", 不然會出現每一個字符佔據一個單元格的狀況, 應該是(e1,e2,e3,e4)這樣的:
writer.writerow((e1, e2, e3, e4))
若是出現編碼異常的話, 可在全局定義下編碼格式'utf-8':
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
複製代碼
對文件的處理須要注意的是: 1對異常的捕獲處理, 2對文件對象資源的關閉. 我這裏須要打印出四列數據.
try:
...
writer=csv.writer(csvFile)
writer.writerow((companyId, name, address, telephone))
except IOError as e:
print ("IOError happen in:", e)
except Exception as e:
print ("Error happen in:", e)
finally:
csvFile.close()
複製代碼
爲何會作循環處理, 是由於考慮到數據量可能會很大, 進行分批處理. 循環處理的邏輯是: 不斷以'下一頁'的形式進行查詢, 當某次查詢的結果爲空或小於分頁值時(在成功的前提下), 認爲查詢結束. 聽起來很容易就想到了do...while循環, 惋惜Python沒有這種, 因而就用'while True'來實現, 當達到'結束'的條件時, break跳出循環. 對於單次查詢失敗的狀況, 我未作重試機制, 理論上應該是要加的.
注意: 不管是Java仍是Python執行循環, 都要避免邏輯上的死循環的出現, 這裏也作了一個兜底(循環1000次).
initPageNo = 1
initPageSize = 100
# 定義一個兜底循環控制, 防止出現死循環. loopMaxCount值視狀況決定
loopCount = 0
loopMaxCount = 1000
while True:
# 建立查詢條件
body = buildQueryBody(initPageNo, initPageSize)
# 獲取查詢結果
res = es.search(index=indexName, body=body)
# seccessful = 1則成功
ifSuccess = res['_shards']['successful']
if (ifSuccess == 1):
# 這次查詢的數量, 用於判斷是否繼續查詢
resultSize = len(res['hits']['hits'])
...
if (resultSize < initPageSize):
print ("查詢結束")
break
else:
print ("下一頁, 繼續")
initPageNo += 1
else:
print ("執行查詢失敗!", res)
# 防止每次查詢都失敗形成的死循環
loopCount += 1
if (loopCount >= loopMaxCount):
break
複製代碼
以上是我實現的全過程, 下面是完整版代碼, 部分關鍵信息已作處理, 基本上能夠直接改改就用了. 若是補充上文中提到的一些可優化點, 應該是一個很是健全的用Python從Elasticsearch獲取數據並輸出.csv方案了.
#-*- coding=utf-8'*-
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import csv
import os
import sys
# 解決寫入文件時中文編碼問題
reload(sys)
sys.setdefaultencoding('utf-8')
# 此處替換IP地址
es = Elasticsearch(["http://**.**.**.**:9210"])
# 查看鏈接是否成功
print(es.info())
# 索引名
indexName = "company_info"
# 輸出文件路徑+文件名, 路徑選擇當前項目路徑
fileName = "es_out.csv"
filePath = os.getcwd()
print("current file path:",filePath)
file = filePath + "/" + fileName
def printCsv(file, dataList):
# 防止出現空數組
if (len(dataList) == 0):
return
csvFile = ""
writer = ""
# 文件是否存在
ifExist = os.path.exists(file)
if (ifExist):
# 文件存在, 則在後面追加
print ("file already exist:", file)
csvFile = open(file,'a+')
else:
# 文件不存在, 則新建文件
csvFile = open(file,'wb')
try:
writer=csv.writer(csvFile)
if (ifExist == False):
# 若是是新建的表, 此處定義表的列名信息
writer.writerow(('公司Id','公司名','地址','手機'))
# 此處循環寫入表數據
for data in dataList:
companyId = data['_source']['companyId'] if (data['_source'].has_key('companyId')) else 0
name = data['_source']['name'] if (data['_source'].has_key('name')) else '--'
address = data['_source']['address'] if (data['_source'].has_key('address')) else '--'
telephone = data['_source']['telephone'] if (data['_source'].has_key('telephone')) else '--'
# dataStr = "%d,%s,%s,%s" % (companyId, name, address, telephone)
# print (dataStr)
writer.writerow((companyId, name, address, telephone))
except IOError as e:
print ("IOError happen in:", e)
except Exception as e:
print ("Error happen in:", e)
finally:
csvFile.close()
def buildQueryBody(pageNo, pageSize):
# 因爲ES限制, 當偏移量>10000時須要特殊處理, 此處略
offset = (pageNo -1) * pageSize
body = {
"query":{
"bool":{
"must":[
{
"range":{
"companyId":{
"gt":"200"
}
}
}
],
"must_not":[
],
"should":[
]
}
},
"from":offset,
"size":pageSize,
"sort":[
{"companyId":{ "order": "asc" }}
],
"aggs":{
}
}
return body
def main():
print ("start...")
initPageNo = 1
initPageSize = 50
# 定義一個兜底循環控制, 防止出現死循環. loopMaxCount值視狀況決定
loopCount = 0
loopMaxCount = 1000
while True:
# 建立查詢條件
body = buildQueryBody(initPageNo, initPageSize)
# 獲取查詢結果
res = es.search(index=indexName, body=body)
# seccessful == 1則成功
ifSuccess = res['_shards']['successful']
if (ifSuccess == 1):
print ("success query %d times!" % initPageNo)
resultList = res['hits']['hits']
total = res['hits']['total']
# 查詢的數量, 用於判斷是否繼續查詢
resultSize = len(resultList)
print ("total: %d, get result: %d" % (total, resultSize))
# 打印輸出結果
printCsv(file, resultList)
if (resultSize < initPageSize):
print ("查詢結束")
break
else:
print ("下一頁, 繼續")
initPageNo += 1
else:
print ("執行查詢失敗!", res)
loopCount += 1
if (loopCount >= loopMaxCount):
break
print ("end...")
if __name__ == '__main__':
main()
複製代碼
以爲實用的話點個讚唄~.~