Logstash 參考指南(Kafka輸入插件)

Kafka輸入插件

其餘版本,請參閱版本化的插件文檔html

獲取幫助

有關插件的問題,請在討論論壇中打開一個主題,對於bug或特性請求,在Github中打開一個issue,關於Elastic支持的插件列表,請考慮Elastic支持矩陣git

描述

這個輸入將讀取來自Kafka主題的事件。github

這個插件使用Kafka客戶端1.1.0,有關broker兼容性,請參閱官方Kafka兼容性參考資料,若是連接的兼容wiki不是最新的,請聯繫Kafka支持/社區確認兼容性。web

若是你須要這個插件中尚未提供的特性(包括客戶端版本升級),請提交一個關於你須要什麼細節的問題。正則表達式

這個輸入支持鏈接到Kafka:apache

  • SSL(要求插件版本3.0.0或以上)
  • Kerberos SASL(須要插件版本5.1.0或以上)

默認狀況下,安全性是禁用的,可是能夠根據須要打開。bootstrap

Logstash Kafka消費者處理組管理並使用Kafka主題的默認偏移管理策略。安全

Logstash實例默認狀況下以一個邏輯組的形式來訂閱Kafka主題,每一個Logstash的Kafka消費者能夠運行多個線程來增長讀吞吐量。或者,你可使用相同的group_id運行多個Logstash實例,以便在物理機器上分散負載,主題中的消息將分發給具備相同group_id的全部Logstash實例。服務器

理想狀況下,爲了達到完美的平衡,你應該擁有與分區數量同樣多的線程,線程多於分區意味着有些線程將處於空閒狀態。session

有關更多信息,請參閱http://kafka.apache.org/documentation.html#theconsumer

Kafka消費者配置:http://kafka.apache.org/document.html#consumerconfigs

元數據字段

如下來自Kafka broker的元數據被添加到[@metadata]字段下:

  • [@metadata][kafka][topic]:消息被消費的原始kafka主題。
  • [@metadata][kafka][consumer_group]:消費者組。
  • [@metadata][kafka][partition]:此消息的分區信息。
  • [@metadata][kafka][offset]:此消息的原始記錄偏移量。
  • [@metadata][kafka][key]:記錄key,若是有的話。
  • [@metadata][kafka][timestamp]:Kafka broker接收此消息時的時間戳。

請注意,在輸出時@metadata字段不是任何事件的一部分,若是你須要將這些信息插入到原始事件中,你必須使用mutate過濾器來手動將所需的字段複製到你的event中。

Kafka輸入配置選項

這個插件支持如下配置選項以及後面描述的通用選項。

設置 輸入類型 要求
auto_commit_interval_ms string No
auto_offset_reset string No
bootstrap_servers string No
check_crcs string No
client_id string No
connections_max_idle_ms string No
consumer_threads number No
decorate_events boolean No
enable_auto_commit string No
exclude_internal_topics string No
fetch_max_bytes string No
fetch_max_wait_ms string No
fetch_min_bytes string No
group_id string No
heartbeat_interval_ms string No
jaas_path 有效的文件系統路徑 No
kerberos_config 有效的文件系統路徑 No
key_deserializer_class string No
max_partition_fetch_bytes string No
max_poll_interval_ms string No
max_poll_records string No
metadata_max_age_ms string No
partition_assignment_strategy string No
poll_timeout_ms number No
receive_buffer_bytes string No
reconnect_backoff_ms string No
request_timeout_ms string No
retry_backoff_ms string No
sasl_kerberos_service_name string No
sasl_mechanism string No
security_protocol string,["PLAINTEXT","SSL","SASL_PLAINTEXT","SASL_SSL"]之一 No
send_buffer_bytes string No
session_timeout_ms string No
ssl_key_password password No
ssl_keystore_location 有效的文件系統路徑 No
ssl_keystore_password password No
ssl_keystore_type string No
ssl_truststore_location 有效的文件系統路徑 No
ssl_truststore_password password No
ssl_truststore_type string No
topics array No
topics_pattern string No
value_deserializer_class string No

還能夠查看全部輸入插件支持的通用選項列表。

auto_commit_interval_ms

  • 值類型爲string
  • 默認值爲「5000」
  • 消費者偏移量提交給Kafka的頻率(毫秒)

auto_offset_reset

  • 值類型爲string
  • 此設置沒有默認值
  • 若是Kafka中沒有初始偏移量,或者偏移量超出範圍,該怎麼辦:

    • earliest:自動重置偏移量到最先的偏移量
    • latest:自動重置偏移量到最新的偏移量
    • none:若是沒有爲消費者組找到先前的偏移量,則向消費者拋出異常
    • 其餘:向消費者拋出異常

bootstrap_servers

  • 值類型爲string
  • 默認值爲"localhost:9092"
  • 用於創建到集羣的初始鏈接的Kafka實例的url列表,這個列表應該是host1:port1,host2:port2的形式,這些url僅用於初始鏈接,以發現完整的集羣成員(可能會動態更改),所以這個列表不須要包含完整的服務器集(不過,若是一個服務器宕機,你可能須要多個服務器)。

check_crcs

  • 值類型爲string
  • 此設置沒有默認值
  • 自動檢查被消費的記錄的CRC32,這確保了在線路或磁盤上的消息沒有發生損壞,這個檢查增長了一些開銷,所以在尋求極端性能的狀況下可能會禁用它。

client_id

  • 值類型爲string
  • 默認值爲"logstash"
  • 請求時要傳遞給服務器的id字符串,這樣作的目的是經過容許包含邏輯應用程序名稱來跟蹤ip/端口之外的請求源。

connections_max_idle_ms

  • 值類型爲string
  • 此設置沒有默認值
  • 在這個配置指定的毫秒數以後關閉空閒鏈接。

consumer_threads

  • 值類型爲number
  • 默認值爲1
  • 理想狀況下,爲了達到完美的平衡,你應該擁有與分區數量同樣多的線程,線程多於分區意味着有些線程將處於空閒狀態。

decorate_events

  • 值類型爲boolean
  • 默認值爲false
  • 可向事件添加Kafka元數據,好比主題、消息大小的選項,這將向logstash事件中添加一個名爲kafka的字段,其中包含如下屬性:topic:此消息關聯的主題、consumer_group:這個事件中用來讀取的消費者組、partition:此消息關聯的分區、offset:此消息關聯的分區的偏移量、key:包含消息key的ByteBuffer。

enable_auto_commit

  • 值類型爲string
  • 默認值爲"true"
  • 若是是true,消費者按期向Kafka提交已經返回的消息的偏移量,當進程失敗時,將使用這個提交的偏移量做爲消費開始的位置。

exclude_internal_topics

  • 值類型爲string
  • 此設置沒有默認值
  • 內部主題(如偏移量)的記錄是否應該公開給消費者,若是設置爲true,從內部主題接收記錄的惟一方法就是訂閱它。

fetch_max_bytes

  • 值類型爲string
  • 此設置沒有默認值
  • 提取請求時服務器應該返回的最大數據量,這不是絕對最大值,若是提取的第一個非空分區中的第一個消息大於此值,消息仍然會返回,以確保消費者可以進行下去。

fetch_max_wait_ms

  • 值類型爲string
  • 此設置沒有默認值
  • 若是沒有足夠的數據當即知足fetch_min_bytes,服務器在響應提取請求以前將阻塞的最大時間,這應該小於或等於poll_timeout_ms中使用的超時。

fetch_min_bytes

  • 值類型爲string
  • 此設置沒有默認值
  • 提取請求時服務器應該返回的最小數據量,若是可用數據不足,請求將在響應請求前等待大量的數據積累。

group_id

  • 值類型爲string
  • 默認值爲"logstash"
  • 此消費者所屬的組的標識符,消費者組是由多個處理器組成的單個邏輯訂閱服務器,主題中的消息將分發給具備相同group_id的全部Logstash實例。

heartbeat_interval_ms

  • 值類型爲string
  • 此設置沒有默認值
  • 從心跳到消費者協調器的預期時間,心跳被用來確保消費者會話保持活躍,並在新消費者加入或離開組時促進從新平衡,該值必須設置爲低於session.timeout.ms,但一般應該設置不高於該值的1/3,它能夠調整得更低,以控制正常從新平衡的預期時間。

jaas_path

  • 值類型爲path
  • 此設置沒有默認值
  • Java身份驗證和受權服務(JAAS)API爲Kafka提供用戶身份驗證和受權服務,這個設置提供了JAAS文件的路徑,Kafka客服端的樣例JAAS文件:

    KafkaClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useTicketCache=true
      renewTicket=true
      serviceName="kafka";
      };

    請注意,在配置文件中指定jaas_pathkerberos_config將會添加到全局JVM系統屬性中,這意味着若是你有多個Kafka輸入,它們都共享相同的jaas_pathkerberos_config。若是不但願這樣作,則必須在不一樣的JVM實例上運行Logstash的獨立實例。

kerberos_config

key_deserializer_class

  • 值類型爲string
  • 默認值爲"org.apache.kafka.common.serialization.StringDeserializer"
  • 用於反序列化記錄key的Java類。

max_partition_fetch_bytes

  • 值類型爲string
  • 此設置沒有默認值
  • 服務器將返回每一個分區的最大數據量,請求使用的最大總內存爲#partitions * max.partition.fetch.bytes,這個大小必須至少是服務器容許的最大消息大小的最大值,不然可能生產者發送的消息比消費者可以提取的大,若是發生這種狀況,消費者可能會陷入在某個分區上提取大量消息的困境。

max_poll_interval_ms

  • 值類型爲string
  • 此設置沒有默認值
  • 當使用消費者組管理時,poll()調用之間的最大延遲,這爲消費者在提取更多記錄以前能夠空閒的時間設置了上限,若是在超時過時以前沒有調用poll(),就認爲消費者失敗了而且組將從新平衡,以便將分區從新分配給另外一個成員,配置request_timeout_ms的值必須老是大於max_poll_interval_ms

max_poll_records

  • 值類型爲string
  • 此設置沒有默認值
  • 在對poll()的單個調用中返回的記錄的最大數量。

metadata_max_age_ms

  • 值類型爲string
  • 此設置沒有默認值
  • 以毫秒爲單位的時間週期後,即便咱們沒有看到任何分區領導更改,也會強制刷新元數據,以主動發現任何新的broker或分區。

partition_assignment_strategy

  • 值類型爲string
  • 此設置沒有默認值
  • 客戶端將使用分區分配策略的類名在消費者實例之間分配分區全部權。

poll_timeout_ms

  • 值類型爲number
  • 默認值爲100
  • kafka消費者將等待從主題接收新消息的時間。

receive_buffer_bytes

  • 值類型爲string
  • 此設置沒有默認值
  • 讀取數據時使用的TCP接收緩衝區(SO_RCVBUF)的大小

reconnect_backoff_ms

  • 值類型爲string
  • 此設置沒有默認值
  • 在嘗試從新鏈接到給定主機以前等待的時間量,這避免了在一個緊密循環中重複鏈接主機,此回退適用於消費者向broker發送的全部請求。

request_timeout_ms

  • 值類型爲string
  • 此設置沒有默認值
  • 配置控制客戶端等待請求響應的最長時間,若是超時以前沒有收到響應,若有必要客戶端將從新發送請求,或者在重試耗盡時失敗請求。

retry_backoff_ms

  • 值類型爲string
  • 此設置沒有默認值
  • 在試圖重試失敗的提取請求到給定主題分區以前等待的時間量,這避免了在一個緊湊的循環中重複的提取和失敗。

sasl_kerberos_service_name

  • 值類型爲string
  • 此設置沒有默認值
  • Kafka broker運行的Kerberos主體名稱,這能夠在Kafka的JAAS配置或Kafka的配置中定義。

sasl_mechanism

  • 值類型爲string
  • 默認值爲"GSSAPI"
  • 用於客戶端鏈接的SASL機制,這多是安全提供者可用的任何機制,GSSAPI是默認機制。

security_protocol

  • 值能夠是:PLAINTEXTSSLSASL_PLAINTEXTSASL_SSL
  • 默認值爲"PLAINTEXT"
  • 要使用的安全協議,能夠是PLAINTEXTSSLSASL_PLAINTEXTSASL_SSL

send_buffer_bytes

  • 值類型爲string
  • 此設置沒有默認值
  • 發送數據時要使用的TCP發送緩衝區(SO_SNDBUF)的大小

session_timeout_ms

  • 值類型爲string
  • 此設置沒有默認值
  • 超時以後,若是poll_timeout_ms沒有被調用,消費者將被標記爲死亡,併爲group_id標識的組觸發從新平衡操做。

ssl_key_password

  • 值類型爲password
  • 此設置沒有默認值
  • 密鑰存儲文件中私有密鑰的密碼。

ssl_keystore_location

  • 值類型爲path
  • 此設置沒有默認值
  • 若是須要客戶端身份驗證,則此設置存儲密鑰存儲路徑。

ssl_keystore_password

  • 值類型爲password
  • 此設置沒有默認值
  • 若是須要客戶端身份驗證,則此設置存儲密鑰庫密碼

ssl_keystore_type

  • 值類型爲string
  • 此設置沒有默認值
  • 密鑰存儲庫類型。

ssl_truststore_location

  • 值類型爲path
  • 此設置沒有默認值
  • JKS信任存儲庫路徑用於驗證Kafka broker的證書。

ssl_truststore_password

  • 值類型爲password
  • 此設置沒有默認值
  • 信任存儲庫的密碼。

ssl_truststore_type

  • 值類型爲string
  • 此設置沒有默認值
  • 信任存儲庫類型。

topics

  • 值類型爲array
  • 默認值爲["logstash"]
  • 要訂閱的主題列表,默認爲["logstash"]

topics_pattern

  • 值類型爲string
  • 此設置沒有默認值
  • 訂閱的主題正則表達式模式,使用此配置時,主題配置將被忽略。

value_deserializer_class

  • 值類型爲string
  • 默認值爲"org.apache.kafka.common.serialization.StringDeserializer"
  • 用於反序列化記錄值的Java類。

通用選項

全部輸入插件都支持如下配置選項:

設置 輸入類型 要求
add_field hash No
codec codec No
enable_metric boolean No
id string No
tags array No
type string No

細節

add_field

  • 值類型爲hash
  • 默認值爲{}
  • 向事件添加字段。

codec

  • 值類型爲codec
  • 默認值爲"plain"
  • 用於輸入數據的編解碼器,在輸入數據以前,輸入編解碼器是一種方便的解碼方法,不須要在你的Logstash管道中使用單獨的過濾器。

enable_metric

  • 值類型是boolean
  • 默認值是true
  • 禁用或啓用這個特定插件實例的指標日誌,默認狀況下,咱們記錄全部咱們能夠記錄的指標,可是你能夠禁用特定插件的指標集合。

id

  • 值類型爲string
  • 這個設置沒有默認值
  • 向插件配置添加惟一的ID,若是沒有指定ID,則Logstash將生成一個,強烈建議在配置中設置此ID,當你有兩個或多個相同類型的插件時,這一點特別有用。例如,若是你有兩個log4j輸入,在本例中添加一個命名ID將有助於在使用監視API時監視Logstash。

    input {
      kafka {
        id => "my_plugin_id"
      }
    }

tags

  • 值類型爲array
  • 這個設置沒有默認值
  • 向事件添加任意數量的標記,這有助於之後的處理。

type

  • 值類型爲string
  • 這個設置沒有默認值
  • 向該輸入處理的全部事件添加type字段,類型主要用於過濾器激活,該type做爲事件自己的一部分存儲,所以你也可使用該類型在Kibana中搜索它。若是你試圖在已經擁有一個type的事件上設置一個type(例如,當你將事件從發送者發送到索引器時),那麼新的輸入將不會覆蓋現有的type,發送方的type集在其生命週期中始終與該事件保持一致,甚至在發送到另外一個Logstash服務器時也是如此。
相關文章
相關標籤/搜索