當Elasticsearch碰見Kafka

Elasticsearch做爲當前主流的全文檢索引擎,除了強大的全文檢索能力和高擴展性以外,對多種數據源的兼容能力也是其成功的祕訣之一。而Elasticsearch強大的數據源兼容能力,主要來源於其核心組件之一的Logstash, Logstash經過插件的形式實現了對多種數據源的輸入和輸出。Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,是一種常見的數據源,也是Logstash支持的衆多輸入輸出源的其中一個。本文將從實踐的角度,研究使用Logstash Kafka Input插件實現將Kafka中數據導入到Elasticsearch的過程。apache

當Elasticsearch碰見Kafka

使用Logstash Kafka插件鏈接Kafka和Elasticsearchbootstrap

1 Logstash Kafka input插件簡介服務器

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。網絡

Logstash Kafka Input插件使用Kafka API從Kafka topic中讀取數據信息,使用時須要注意Kafka的版本及對應的插件版本是否一致。該插件支持經過SSL和Kerveros SASL方式鏈接Kafka。另外該插件提供了group管理,並使用默認的offset管理策略來操做Kafka topic。架構

Logstash默認狀況下會使用一個單獨的group來訂閱Kafka消息,每一個Logstash Kafka Consumer會使用多個線程來增長吞吐量。固然也能夠多個Logstash實例使用同一個group_id,來均衡負載。另外建議把Consumer的個數設置爲Kafka分區的大小,以提供更好的性能。elasticsearch

2 測試環境準備分佈式

2.1 建立Elasticsearch集羣微服務

爲了簡化搭建過程,本文使用了騰訊雲Elasticsearch service。騰訊雲Elasticsearch service不只能夠實現Elasticsearch集羣的快速搭建,還提供了內置Kibana,集羣監控,專用主節點,Ik分詞插件等功能,極大的簡化了Elasticsearch集羣的建立和管理工做。工具

2.2 建立Kafka服務源碼分析

Kafka服務的搭建採用騰訊雲CKafka來完成。與Elasticsearch Service同樣,騰訊雲CKafka能夠實現Kafka服務的快速建立,100%兼容開源Kafka API(0.9版本)。

2.3 服務器

除了準備Elasticsearch和Kafka,另外還須要準備一臺服務器,用於運行Logstash以鏈接Elasticsearch和Kafka。

2.4 注意事項

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。

1) 須要將Elasticsearch、Kafka和服務器建立在同一個網絡下,以便實現網絡互通。因爲本文采用的是騰訊雲相關的技術服務,所以只須要將Elasticsearch service,CKafka和CVM建立在同一個私有網路(VPC)下便可。

2) 注意獲取Elasticsearch serivce,CKafka和CVM的內網地址和端口,以便後續服務使用

本次測試中:

當Elasticsearch碰見Kafka

3 使用Logstash鏈接Elasticsearch和Kafka

3.1 Kafka準備

1) 建立名爲kafka_es_test的topic

2) 安裝JDK

3) 安裝Kafka工具包

4) 建立producer和consumer驗證kafka功能

3.2 安裝Logstash

Logstash的安裝和使用能夠參考[一文快速上手Logstash]

3.3 配置Logstash Kafka input插件

建立kafka_test_pipeline.conf文件內容以下:

input{
 kafka{
 bootstrap_servers=>"192.168.13.10:9092"
 topics=>["kafka_es_test"]
 group_id=>"logstash_kafka_test"
 }
}
output{
 elasticsearch{
 hosts=>["192.168.0.8:9200"]
 }
}
其中定義了一個kafka的input和一個elasticsearch的output

對於Kafka input插件上述三個參數爲必填參數,除此以外還有一些對插件行爲進行調整的一些參數如:

auto_commit_interval_ms 用於設置Consumer提交offset給Kafka的時間間隔

consumer_threads 用於設置Consumer的線程數,默認爲1,實際中應設置與Kafka Topic分區數一致

fetch_max_wait_ms 用於指定Consumer等待一個fetch請求達到fetch_min_bytes的最長時間

fetch_min_bytes 用於指定Consumer fetch請求應返回的最小數據量

topics_pattern 用於經過正則訂閱符合某一規則的一組topic

3.4 啓動Logstash

如下操做在Logstash根目錄中進行

1) 驗證配置

./bin/logstash -f kafka_test_pipeline.conf --config.test_and_exit
若有錯誤,根據提示修改配置文件。若配置正確會獲得以下結果

Sending Logstash's logs to /root/logstash-5.6.13/logs which is now configured via log4j2.properties
[2018-11-11T15:24:01,598][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/root/logstash-5.6.13/modules/netflow/configuration"}
[2018-11-11T15:24:01,603][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/root/logstash-5.6.13/modules/fb_apache/configuration"}
Configuration OK
[2018-11-11T15:24:01,746][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
2) 啓動Logstash

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。

./bin/logstash -f kafka_test_pipeline.conf --config.reload.automatic
觀察日誌是否有錯誤提示,並及時處理

3.4 啓動Kafka Producer

如下操做在Kafka工具包根目錄下進行

./bin/kafka-console-producer.sh --broker-list 192.168.13.10:9092 --topic kafka_es_test
寫入測試數據

This is a message
3.5 Kibana驗證結果

登陸Elasticsearch對應Kibana, 在Dev Tools中進行以下操做

1) 查看索引

GET _cat/indices
能夠看到一個名爲logstash-xxx.xx.xx的索引被建立成功

green open .kibana QUw45tN0SHqeHbF9-QVU6A 1 1 1 0 5.5kb 2.7kb
green open logstash-2018.11.11 DejRdNJVQ1e1MwbyJjJjLw 5 1 1 0 8.7kb 4.3kb
2) 查看寫入的數據

GET logstash-2018.11.11/_search
能夠看到數據已經被成功寫入

{
 "took": 0,
 "timed_out": false,
 "_shards": {
 "total": 5,
 "successful": 5,
 "skipped": 0,
 "failed": 0
 },
 "hits": {
 "total": 1,
 "max_score": 1,
 "hits": [
 {
 "_index": "logstash-2018.11.11",
 "_type": "logs",
 "_id": "AWcBsEegMu-Dkjm1ap3H",
 "_score": 1,
 "_source": {
 "message": "This is a message",
 "@version": "1",
 "@timestamp": "2018-11-11T07:33:09.079Z"
 }
 }
 ]
 }
}
4 總結

Logstash做爲Elastic Stack中數據採集和處理的核心組件,爲Elasticsearch提供了強大的數據源兼容能力。從測試過程能夠看出,使用Logstash實現kafka和Elaticsearch的鏈接過程至關簡單方便。另外Logstash的數據處理功能,也使得采用該架構的系統對數據映射和處理有自然的優點。

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。

然而,使用Logstash實現Kafka和Elasticsearch的鏈接,並非鏈接Kafka和Elasticsearch的惟一方案,另外一種常見的方案是使用Kafka Connect, 能夠參考「當Elasticsearch碰見Kafka--Kafka Connect」  

相關文章
相關標籤/搜索