前面 FLink 的文章中咱們已經介紹了說 Flink 已經有不少自帶的 Connector。html
一、《從0到1學習Flink》—— Data Source 介紹 java
二、《從0到1學習Flink》—— Data Sink 介紹node
其中包括了 Source 和 Sink 的,後面我也講了下如何自定義本身的 Source 和 Sink。git
那麼今天要作的事情是啥呢?就是介紹一下 Flink 自帶的 ElasticSearch Connector,咱們今天就用他來作 Sink,將 Kafka 中的數據通過 Flink 處理後而後存儲到 ElasticSearch。es6
安裝 ElasticSearch,這裏就忽略,本身找我之前的文章,建議安裝 ElasticSearch 6.0 版本以上的,畢竟要跟上時代的節奏。github
下面就講解一下生產環境中如何使用 Elasticsearch Sink 以及一些注意點,及其內部實現機制。apache
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
上面這依賴版本號請本身根據使用的版本對應改變下。服務器
下面全部的代碼都沒有把 import 引入到這裏來,若是須要查看更詳細的代碼,請查看個人 GitHub 倉庫地址:併發
https://github.com/zhisheng17/flink-learning/tree/master/flink-learning-connectors/flink-learning-connectors-es6elasticsearch
這個 module 含有本文的全部代碼實現,固然越寫到後面本身可能會作一些抽象,因此若是有代碼改變很正常,請直接查看所有項目代碼。
這個工具類是本身封裝的,getEsAddresses 方法將傳入的配置文件 es 地址解析出來,能夠是域名方式,也能夠是 ip + port 形式。
addSink 方法是利用了 Flink 自帶的 ElasticsearchSink 來封裝了一層,傳入了一些必要的調優參數和 es 配置參數,下面文章還會再講些其餘的配置。
ElasticSearchSinkUtil.java
public class ElasticSearchSinkUtil { /** * es sink * * @param hosts es hosts * @param bulkFlushMaxActions bulk flush size * @param parallelism 並行數 * @param data 數據 * @param func * @param <T> */ public static <T> void addSink(List<HttpHost> hosts, int bulkFlushMaxActions, int parallelism, SingleOutputStreamOperator<T> data, ElasticsearchSinkFunction<T> func) { ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(hosts, func); esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions); data.addSink(esSinkBuilder.build()).setParallelism(parallelism); } /** * 解析配置文件的 es hosts * * @param hosts * @return * @throws MalformedURLException */ public static List<HttpHost> getEsAddresses(String hosts) throws MalformedURLException { String[] hostList = hosts.split(","); List<HttpHost> addresses = new ArrayList<>(); for (String host : hostList) { if (host.startsWith("http")) { URL url = new URL(host); addresses.add(new HttpHost(url.getHost(), url.getPort())); } else { String[] parts = host.split(":", 2); if (parts.length > 1) { addresses.add(new HttpHost(parts[0], Integer.parseInt(parts[1]))); } else { throw new MalformedURLException("invalid elasticsearch hosts format"); } } } return addresses; } }
Main.java
public class Main { public static void main(String[] args) throws Exception { //獲取全部參數 final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args); //準備好環境 StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool); //從kafka讀取數據 DataStreamSource<Metrics> data = KafkaConfigUtil.buildSource(env); //從配置文件中讀取 es 的地址 List<HttpHost> esAddresses = ElasticSearchSinkUtil.getEsAddresses(parameterTool.get(ELASTICSEARCH_HOSTS)); //從配置文件中讀取 bulk flush size,表明一次批處理的數量,這個但是性能調優參數,特別提醒 int bulkSize = parameterTool.getInt(ELASTICSEARCH_BULK_FLUSH_MAX_ACTIONS, 40); //從配置文件中讀取並行 sink 數,這個也是性能調優參數,特別提醒,這樣纔可以更快的消費,防止 kafka 數據堆積 int sinkParallelism = parameterTool.getInt(STREAM_SINK_PARALLELISM, 5); //本身再自帶的 es sink 上一層封裝了下 ElasticSearchSinkUtil.addSink(esAddresses, bulkSize, sinkParallelism, data, (Metrics metric, RuntimeContext runtimeContext, RequestIndexer requestIndexer) -> { requestIndexer.add(Requests.indexRequest() .index(ZHISHENG + "_" + metric.getName()) //es 索引名 .type(ZHISHENG) //es type .source(GsonUtil.toJSONBytes(metric), XContentType.JSON)); }); env.execute("flink learning connectors es6"); } }
配置都支持集羣模式填寫,注意用 , 分隔!
kafka.brokers=localhost:9092 kafka.group.id=zhisheng-metrics-group-test kafka.zookeeper.connect=localhost:2181 metrics.topic=zhisheng-metrics stream.parallelism=5 stream.checkpoint.interval=1000 stream.checkpoint.enable=false elasticsearch.hosts=localhost:9200 elasticsearch.bulk.flush.max.actions=40 stream.sink.parallelism=5
執行 Main 類的 main 方法,咱們的程序是隻打印 flink 的日誌,沒有打印存入的日誌(由於咱們這裏沒有打日誌):
因此看起來不知道咱們的 sink 是否有用,數據是否從 kafka 讀取出來後存入到 es 了。
你能夠查看下本地起的 es 終端或者服務器的 es 日誌就能夠看到效果了。
es 日誌以下:
上圖是我本地 Mac 電腦終端的 es 日誌,能夠看到咱們的索引了。
若是還不放心,你也能夠在你的電腦裝個 kibana,而後更加的直觀查看下 es 的索引狀況(或者直接敲 es 的命令)
咱們用 kibana 查看存入 es 的索引以下:
程序執行了一會,存入 es 的數據量就很大了。
上面代碼已經能夠實現你的大部分場景了,可是若是你的業務場景須要保證數據的完整性(不能出現丟數據的狀況),那麼就須要添加一些重試策略,由於在咱們的生產環境中,頗有可能會由於某些組件不穩定性致使各類問題,因此這裏咱們就要在數據存入失敗的時候作重試操做,這裏 flink 自帶的 es sink 就支持了,經常使用的失敗重試配置有:
一、bulk.flush.backoff.enable 用來表示是否開啓重試機制 二、bulk.flush.backoff.type 重試策略,有兩種:EXPONENTIAL 指數型(表示屢次重試之間的時間間隔按照指數方式進行增加)、CONSTANT 常數型(表示屢次重試之間的時間間隔爲固定常數) 三、bulk.flush.backoff.delay 進行重試的時間間隔 四、bulk.flush.backoff.retries 失敗重試的次數 五、bulk.flush.max.actions: 批量寫入時的最大寫入條數 六、bulk.flush.max.size.mb: 批量寫入時的最大數據量 七、bulk.flush.interval.ms: 批量寫入的時間間隔,配置後則會按照該時間間隔嚴格執行,無視上面的兩個批量寫入配置
看下啦,就是以下這些配置了,若是你須要的話,能夠在這個地方配置擴充了。
寫入 ES 的時候會有這些狀況會致使寫入 ES 失敗:
一、ES 集羣隊列滿了,報以下錯誤
12:08:07.326 [I/O dispatcher 13] ERROR o.a.f.s.c.e.ElasticsearchSinkBase - Failed Elasticsearch item request: ElasticsearchException[Elasticsearch exception [type=es_rejected_execution_exception, reason=rejected execution of org.elasticsearch.transport.TransportService$7@566c9379 on EsThreadPoolExecutor[name = node-1/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@f00b373[Running, pool size = 4, active threads = 4, queued tasks = 200, completed tasks = 6277]]]]
是這樣的,我電腦安裝的 es 隊列容量默認應該是 200,我沒有修改過。我這裏若是配置的 bulk flush size * 併發 sink 數量 這個值若是大於這個 queue capacity ,那麼就很容易致使出現這種由於 es 隊列滿了而寫入失敗。
固然這裏你也能夠經過調大點 es 的隊列。參考:https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-threadpool.html
二、ES 集羣某個節點掛了
這個就不用說了,確定寫入失敗的。跟過源碼能夠發現 RestClient 類裏的 performRequestAsync 方法一開始會隨機的從集羣中的某個節點進行寫入數據,若是這臺機器掉線,會進行重試在其餘的機器上寫入,那麼當時寫入的這臺機器的請求就須要進行失敗重試,不然就會把數據丟失!
三、ES 集羣某個節點的磁盤滿了
這裏說的磁盤滿了,並非磁盤真的就沒有一點剩餘空間的,是 es 會在寫入的時候檢查磁盤的使用狀況,在 85% 的時候會打印日誌警告。
這裏我看了下源碼以下圖:
若是你想繼續讓 es 寫入的話就須要去從新配一下 es 讓它繼續寫入,或者你也能夠清空些沒必要要的數據騰出磁盤空間來。
DataStream<String> input = ...; input.addSink(new ElasticsearchSink<>( config, transportAddresses, new ElasticsearchSinkFunction<String>() {...}, new ActionRequestFailureHandler() { @Override void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throw Throwable { if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) { // full queue; re-add document for indexing indexer.add(action); } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) { // malformed document; simply drop request without failing sink } else { // for all other failures, fail the sink // here the failure is simply rethrown, but users can also choose to throw custom exceptions throw failure; } } }));
若是僅僅只是想作失敗重試,也能夠直接使用官方提供的默認的 RetryRejectedExecutionFailureHandler ,該處理器會對 EsRejectedExecutionException 致使到失敗寫入作重試處理。若是你沒有設置失敗處理器(failure handler),那麼就會使用默認的 NoOpFailureHandler 來簡單處理全部的異常。
本文寫了 Flink connector es,將 Kafka 中的數據讀取並存儲到 ElasticSearch 中,文中講了如何封裝自帶的 sink,而後一些擴展配置以及 FailureHandler 狀況下要怎麼處理。(這個問題但是線上很容易遇到的)
原創地址爲:http://www.54tianzhisheng.cn/2018/12/30/Flink-ElasticSearch-Sink/