Elasticsearch 預處理沒有奇技淫巧,請先用好這一招!

一、上問題
1.1 線上實戰問題 1——字符串切分
es能夠根據_id字符串切分,再聚合統計嗎 好比:數據一、_id=C12345 數據二、_id=C12456 數據三、_id=C31268java

經過es聚合統計 C1開頭的數量有2個 C3開頭的數據有1個node

這個API怎麼寫,有大佬指導下嗎?python

1.2 線上實戰問題 2——json 轉 object
插入的時候,能不能對原數據進行必定的轉化,再進行indexinggit

{
"headers":{
"userInfo":[
"{ \"password\": \"test\",\n \"username\": \"zy\"}"
]
}
}
這裏面的已是字符串了,能在數據插入階段把這個 json 轉成 object 麼?github

1.3 線上實戰問題 3——更新數組元素
我想對一個list每一個值後面都加一個字符:sql

好比 {"tag":["a","b","c"]} 這樣一個文檔 我想變成 {"tag":["a2","b2","c2"]} 這樣的,json

各位有沒有試過用 foreach 和 script 結合使用?數組

二、問題拆解分析
「問題 1」:分析環節須要聚合統計,固然用painless script 也能實現,但數據量大,勢必有性能問題。微信

能夠把數據處理前置,把前_id兩個字符提取出來,做爲一個字段處理。less

「問題 2」:寫入的時候指望作字符類型的轉換,把複雜的字符串轉換爲格式化後的 Object 對象數據。

「問題 3」:數組類型數據所有規則化更新,固然 painless script 腳本也能夠實現。

可是,在寫入環節處理,就能極大減輕後面分析環節的負擔。

以上三個問題,寫入前用 java 或者 python 寫程序處理,而後再寫入 Elasticsearch 也是一種方案。

但,若是要死磕一把,有沒有更好的方案呢?可否在寫入前進行數據的預處理呢?

三、什麼是數據預處理
通常狀況下,咱們程序寫入數據或者從第三方數據源(Mysql、Oracle、HBase、Spark等)導入數據,都是原始數據張什麼樣,直接批量同步 ES,寫入ES索引化的數據就是什麼樣。以下圖所示:

Elasticsearch 預處理沒有奇技淫巧,請先用好這一招!

如前所述的三個實戰問題,實際業務數據可能不見得是咱們真正分析環節所須要的。

須要對這些數據進行合理的預處理後,才便於後面環節的分析和數據挖掘。

數據預處理的步驟大體拆解以下:

數據清洗。
主要是爲了去除 重複數據,去噪音(即干擾數據)以及填充缺省值。

數據集成。
將多個數據源的數據放在一個統一的數據存儲中。

數據轉換。
將數據轉化成適合數據挖掘或分析的形式。

在 Elasticsearch 中,有沒有預處理的實現呢?

四、Elasticsearch 數據預處理
Elasticsearch的ETL利器——Ingest節點,已經將節點角色劃分、Ingest 節點做用,Ingest 實踐、Ingest 和 logstash 預處理優缺點對比都作了解讀。有相關盲點的同窗,能夠移步過去過一遍知識點。

Ingest 節點的本質——在實際文檔創建索引以前,使用 Ingest 節點對文檔進行預處理。Ingest 節點攔截批量索引和單個索引請求,應用轉換,而後將文檔傳遞迴單個索引或批量索引API 寫入數據。

下面這張圖,比較形象的說明的 Elasticsearch 數據預處理的流程。

Elasticsearch 預處理沒有奇技淫巧,請先用好這一招!

實際業務場景中,預處理步驟以下:

步驟1:定義 Pipeline,經過 Pipeline 實現數據預處理。
根據實際要處理的複雜數據的特色,有針對性的設置1個或者多個 pipeline (管道),上圖的粉紅和黃色部分。

步驟2:寫入數據關聯Pipeline。
寫入數據、更新數據或者 reindex 索引環節,指定要處理索引的 pipeline , 實際就是寫入索引與上面的 pipeline0 和 pipelineZ 關聯起來。

步驟3:寫入數據。
劃重點:Ingest 實如今實際文檔編制索引(索引化)以前對文檔進行預處理。

五、實踐一把
5.1 線上問題 1 實現
PUT _ingest/pipeline/split_id
{
"processors": [
{
"script": {
"lang": "painless",
"source": "ctx.myid_prefix = ctx.myid.substring(0,2)"
}
}
]
}

藉助 script 處理器中的 substring 提取子串,構造新的前綴串字段,用於分析環節的聚合操做。

5.2 線上問題 2 實現
PUT _ingest/pipeline/json_builder
{
"processors": [
{
"json": {
"field": "headers.userInfo",
"target_field": "headers.userInfo.target"
}
}
]
}
藉助 json 處理器作字段類型轉換,字符串轉成了 json。

5.3 線上問題3 實現
PUT _ingest/pipeline/add_builder
{
"processors": [
{
"script": {
"lang": "painless",
"source": """
for (int i=0; i < ctx.tag.length;i++) {
ctx.tag[i]=ctx.tag[i]+"2";
}
"""
}
}
]
}
藉助 script 處理器,循環遍歷數組,實現了每一個數組字段內容的再填充。

篇幅緣由,更詳細解讀參見:

https://github.com/mingyitianxia/deep_elasticsearch/blob/master/es_dsl_study/1.ingest_dsl.md

六、不預處理 VS 預處理後寫入方案對比
「方案 1」:數據原樣導入Elasticsearch,分析階段再作 painless 腳本處理。簡單粗暴。

導入一時爽,處理費大勁!

如前所述,script 處理能力有限,且因爲 script 徒增性能問題煩惱。

不推薦使用。

「方案 2」:提早借助 Ingest 節點實現數據預處理,作好必要的數據的清洗(ETL) 操做,哪怕增大空間存儲(如新增字段),也要以空間換時間,爲後續分析環節掃清障礙。

看似寫入變得複雜,實則必須。「以空間爲分析贏取了時間」。

推薦使用。

七、常見問題
7.1 Ingest 節點是必須設置的嗎?
默認狀況下,全部節點都默認啓用 Ingest,所以任何節點均可以完成數據的預處理任務。

可是,當集羣數據量級夠大,集羣規模夠大後,建議拆分節點角色,和獨立主節點、獨立協調節點同樣,設置獨立專用的 Ingest 節點。

7.2 pipeline 何時指定呢?
建立索引、建立模板、更新索引、reindex 以及 update_by_query 環節 均可以指定 pipeline。

7.2.1 建立索引環節指定 pipeline
PUT ms-test
{
"settings": {
"index.default_pipeline": "init_pipeline"
}
}
7.2.2 建立模板環節指定 pipeline
PUT _template/template_1
{
"index_patterns": ["te", "bar"],
"settings": {
"number_of_shards": 1,
"index.default_pipeline":"add_builder"
}
}
7.2.3 更新索引環節指定pipeline(原索引未指定)
PUT /my_index/_settings
{
"index" : {
"default_pipeline" : "my_pipeline"
}
}
7.2.4 reindex 環節添加 pipeline
POST _reindex
{
"source": {
"index": "source"
},
"dest": {
"index": "dest",
"pipeline": "some_ingest_pipeline"
}
}
7.2.5 update 環節指定pipeline
POST twitter/_update_by_query?pipeline=set-foo
八、小結
開篇三個問題都是在死磕 Elasticsearch QQ羣、微信羣中討論的線上業務問題。藉助 Elasticsearch Ingest 節點的預處理環節,都能很好的解決。

Ingest Pipelines 是 Elasticsearch 數據預處理的核心功能,一旦將其應用於生產實戰環境,你會發現很「「香」」,而且你會離不開它。

參考:

https://dev.classmethod.jp/server-side/elasticsearch/elasticsearch-ingest-node/

《數據分析實戰 45 講》

推薦更多:

Elasticsearch的ETL利器——Ingest節點

Elasticsearch 預處理沒有奇技淫巧,請先用好這一招!

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息