其餘版本,請參閱版本化的插件文檔。html
有關插件的問題,請在討論論壇中打開一個主題,對於bug或特性請求,在Github中打開一個issue,關於Elastic支持的插件列表,請考慮Elastic支持矩陣。git
這個輸入將讀取來自Kafka主題的事件。github
這個插件使用Kafka客戶端1.1.0,有關broker兼容性,請參閱官方Kafka兼容性參考資料,若是連接的兼容wiki不是最新的,請聯繫Kafka支持/社區確認兼容性。web
若是你須要這個插件中尚未提供的特性(包括客戶端版本升級),請提交一個關於你須要什麼細節的問題。正則表達式
這個輸入支持鏈接到Kafka:apache
默認狀況下,安全性是禁用的,可是能夠根據須要打開。bootstrap
Logstash Kafka消費者處理組管理並使用Kafka主題的默認偏移管理策略。安全
Logstash實例默認狀況下以一個邏輯組的形式來訂閱Kafka主題,每一個Logstash的Kafka消費者能夠運行多個線程來增長讀吞吐量。或者,你可使用相同的group_id
運行多個Logstash實例,以便在物理機器上分散負載,主題中的消息將分發給具備相同group_id
的全部Logstash實例。服務器
理想狀況下,爲了達到完美的平衡,你應該擁有與分區數量同樣多的線程,線程多於分區意味着有些線程將處於空閒狀態。session
有關更多信息,請參閱http://kafka.apache.org/documentation.html#theconsumer。
Kafka消費者配置:http://kafka.apache.org/document.html#consumerconfigs。
如下來自Kafka broker的元數據被添加到[@metadata]
字段下:
[@metadata][kafka][topic]
:消息被消費的原始kafka主題。[@metadata][kafka][consumer_group]
:消費者組。[@metadata][kafka][partition]
:此消息的分區信息。[@metadata][kafka][offset]
:此消息的原始記錄偏移量。[@metadata][kafka][key]
:記錄key,若是有的話。[@metadata][kafka][timestamp]
:Kafka broker接收此消息時的時間戳。請注意,在輸出時@metadata
字段不是任何事件的一部分,若是你須要將這些信息插入到原始事件中,你必須使用mutate
過濾器來手動將所需的字段複製到你的event
中。
這個插件支持如下配置選項以及後面描述的通用選項。
設置 | 輸入類型 | 要求 |
---|---|---|
auto_commit_interval_ms |
string | No |
auto_offset_reset |
string | No |
bootstrap_servers |
string | No |
check_crcs |
string | No |
client_id |
string | No |
connections_max_idle_ms |
string | No |
consumer_threads |
number | No |
decorate_events |
boolean | No |
enable_auto_commit |
string | No |
exclude_internal_topics |
string | No |
fetch_max_bytes |
string | No |
fetch_max_wait_ms |
string | No |
fetch_min_bytes |
string | No |
group_id |
string | No |
heartbeat_interval_ms |
string | No |
jaas_path |
有效的文件系統路徑 | No |
kerberos_config |
有效的文件系統路徑 | No |
key_deserializer_class |
string | No |
max_partition_fetch_bytes |
string | No |
max_poll_interval_ms |
string | No |
max_poll_records |
string | No |
metadata_max_age_ms |
string | No |
partition_assignment_strategy |
string | No |
poll_timeout_ms |
number | No |
receive_buffer_bytes |
string | No |
reconnect_backoff_ms |
string | No |
request_timeout_ms |
string | No |
retry_backoff_ms |
string | No |
sasl_kerberos_service_name |
string | No |
sasl_mechanism |
string | No |
security_protocol |
string,["PLAINTEXT" ,"SSL" ,"SASL_PLAINTEXT" ,"SASL_SSL" ]之一 |
No |
send_buffer_bytes |
string | No |
session_timeout_ms |
string | No |
ssl_key_password |
password | No |
ssl_keystore_location |
有效的文件系統路徑 | No |
ssl_keystore_password |
password | No |
ssl_keystore_type |
string | No |
ssl_truststore_location |
有效的文件系統路徑 | No |
ssl_truststore_password |
password | No |
ssl_truststore_type |
string | No |
topics |
array | No |
topics_pattern |
string | No |
value_deserializer_class |
string | No |
還能夠查看全部輸入插件支持的通用選項列表。
auto_commit_interval_ms
「5000」
auto_offset_reset
若是Kafka中沒有初始偏移量,或者偏移量超出範圍,該怎麼辦:
earliest
:自動重置偏移量到最先的偏移量latest
:自動重置偏移量到最新的偏移量none
:若是沒有爲消費者組找到先前的偏移量,則向消費者拋出異常bootstrap_servers
"localhost:9092"
host1:port1,host2:port2
的形式,這些url僅用於初始鏈接,以發現完整的集羣成員(可能會動態更改),所以這個列表不須要包含完整的服務器集(不過,若是一個服務器宕機,你可能須要多個服務器)。check_crcs
client_id
"logstash"
connections_max_idle_ms
consumer_threads
1
decorate_events
false
kafka
的字段,其中包含如下屬性:topic
:此消息關聯的主題、consumer_group
:這個事件中用來讀取的消費者組、partition
:此消息關聯的分區、offset
:此消息關聯的分區的偏移量、key
:包含消息key的ByteBuffer。enable_auto_commit
"true"
true
,消費者按期向Kafka提交已經返回的消息的偏移量,當進程失敗時,將使用這個提交的偏移量做爲消費開始的位置。exclude_internal_topics
true
,從內部主題接收記錄的惟一方法就是訂閱它。fetch_max_bytes
fetch_max_wait_ms
fetch_min_bytes
,服務器在響應提取請求以前將阻塞的最大時間,這應該小於或等於poll_timeout_ms
中使用的超時。fetch_min_bytes
group_id
"logstash"
group_id
的全部Logstash實例。heartbeat_interval_ms
session.timeout.ms
,但一般應該設置不高於該值的1/3,它能夠調整得更低,以控制正常從新平衡的預期時間。jaas_path
Java身份驗證和受權服務(JAAS)API爲Kafka提供用戶身份驗證和受權服務,這個設置提供了JAAS文件的路徑,Kafka客服端的樣例JAAS文件:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true renewTicket=true serviceName="kafka"; };
請注意,在配置文件中指定jaas_path
和kerberos_config
將會添加到全局JVM系統屬性中,這意味着若是你有多個Kafka輸入,它們都共享相同的jaas_path
和kerberos_config
。若是不但願這樣作,則必須在不一樣的JVM實例上運行Logstash的獨立實例。
kerberos_config
krb5.conf
樣式,詳見https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html。key_deserializer_class
"org.apache.kafka.common.serialization.StringDeserializer"
max_partition_fetch_bytes
#partitions * max.partition.fetch.bytes
,這個大小必須至少是服務器容許的最大消息大小的最大值,不然可能生產者發送的消息比消費者可以提取的大,若是發生這種狀況,消費者可能會陷入在某個分區上提取大量消息的困境。max_poll_interval_ms
poll()
調用之間的最大延遲,這爲消費者在提取更多記錄以前能夠空閒的時間設置了上限,若是在超時過時以前沒有調用poll()
,就認爲消費者失敗了而且組將從新平衡,以便將分區從新分配給另外一個成員,配置request_timeout_ms
的值必須老是大於max_poll_interval_ms
。max_poll_records
poll()
的單個調用中返回的記錄的最大數量。metadata_max_age_ms
partition_assignment_strategy
poll_timeout_ms
100
receive_buffer_bytes
reconnect_backoff_ms
request_timeout_ms
retry_backoff_ms
sasl_kerberos_service_name
sasl_mechanism
"GSSAPI"
security_protocol
PLAINTEXT
、SSL
、SASL_PLAINTEXT
、SASL_SSL
"PLAINTEXT"
PLAINTEXT
、SSL
、SASL_PLAINTEXT
、SASL_SSL
。send_buffer_bytes
session_timeout_ms
poll_timeout_ms
沒有被調用,消費者將被標記爲死亡,併爲group_id
標識的組觸發從新平衡操做。ssl_key_password
ssl_keystore_location
ssl_keystore_password
ssl_keystore_type
ssl_truststore_location
ssl_truststore_password
ssl_truststore_type
topics
["logstash"]
["logstash"]
。topics_pattern
value_deserializer_class
"org.apache.kafka.common.serialization.StringDeserializer"
全部輸入插件都支持如下配置選項:
設置 | 輸入類型 | 要求 |
---|---|---|
add_field |
hash | No |
codec |
codec | No |
enable_metric |
boolean | No |
id |
string | No |
tags |
array | No |
type |
string | No |
add_field
{}
codec
"plain"
enable_metric
true
id
向插件配置添加惟一的ID
,若是沒有指定ID,則Logstash將生成一個,強烈建議在配置中設置此ID,當你有兩個或多個相同類型的插件時,這一點特別有用。例如,若是你有兩個log4j輸入,在本例中添加一個命名ID將有助於在使用監視API時監視Logstash。
input { kafka { id => "my_plugin_id" } }
tags
type
type
字段,類型主要用於過濾器激活,該type
做爲事件自己的一部分存儲,所以你也可使用該類型在Kibana中搜索它。若是你試圖在已經擁有一個type
的事件上設置一個type
(例如,當你將事件從發送者發送到索引器時),那麼新的輸入將不會覆蓋現有的type
,發送方的type
集在其生命週期中始終與該事件保持一致,甚至在發送到另外一個Logstash服務器時也是如此。