這個文檔是 Cloudera Distribution of Apache Kafka 1.3.x. 其餘版本的文檔在Cloudera Documentation. html
使用Kafka source 讓數據從Kafka topics 流入 Hadoop. The Kafka source 能夠與任何Flume sink合併, 這樣很容易把數據從 Kafka 寫到 HDFS, HBase, 以及Solr. node
下面的 Flume 配置示例,是使用 Kafka source 發送數據到 HDFS sink: web
tier1.sources = source1 tier1.channels = channel1 tier1.sinks = sink1 tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier1.sources.source1.zookeeperConnect = zk01.example.com:2181 tier1.sources.source1.topic = weblogs tier1.sources.source1.groupId = flume tier1.sources.source1.channels = channel1 tier1.sources.source1.interceptors = i1 tier1.sources.source1.interceptors.i1.type = timestamp tier1.sources.source1.kafka.consumer.timeout.ms = 100 tier1.channels.channel1.type = memory tier1.channels.channel1.capacity = 10000 tier1.channels.channel1.transactionCapacity = 1000 tier1.sinks.sink1.type = hdfs tier1.sinks.sink1.hdfs.path = /tmp/kafka/%{topic}/%y-%m-%d tier1.sinks.sink1.hdfs.rollInterval = 5 tier1.sinks.sink1.hdfs.rollSize = 0 tier1.sinks.sink1.hdfs.rollCount = 0 tier1.sinks.sink1.hdfs.fileType = DataStream tier1.sinks.sink1.channel = channel1
爲了更高的吞吐量, 能夠配置多個Kafka sources讀取一個 topic.若是全部sources配置一個相同的groupID, 而且topic 有多個分區, 設置每個source 從不一樣的分區讀取數據,就能夠改善效率. apache
Property Name | Default Value | Description |
---|---|---|
type | 必須設置爲org.apache.flume.source.kafka.KafkaSource. | |
zookeeperConnect | The URI of the ZooKeeper server or quorum used by Kafka. This can be a single node (for example, zk01.example.com:2181) or a comma-separated list of nodes in a ZooKeeper quorum (for example, zk01.example.com:2181,zk02.example.com:2181, zk03.example.com:2181). | |
topic | source 讀取消息的Kafka topic。 Flume 每一個source只支持一個 topic.。 | |
groupID | flume | The unique identifier of the Kafka consumer group. Set the same groupID in all sources to indicate that they belong to the same consumer group. |
batchSize | 1000 | 向channel寫入消息的最多條數 |
batchDurationMillis | 1000 | 向channel書寫的最大時間 (毫秒) 。 |
其餘Kafka consumer 支持的屬性 | 經過Kafka source配置Kafka consumer。能夠使用任何consumer 支持的屬性。 Prepend the consumer property name with the prefix kafka. (for example, kafka.fetch.min.bytes). See the Kafka documentation for the full list of Kafka consumer properties. |
調優 less
使用Kafka sink 從一個 Flume source發送數據到 Kafka . You can use the Kafka sink in addition to Flume sinks such as HBase or HDFS. dom
The following Flume configuration example uses a Kafka sink with an exec source: ide
tier1.sources = source1 tier1.channels = channel1 tier1.sinks = sink1 tier1.sources.source1.type = exec tier1.sources.source1.command = /usr/bin/vmstat 1 tier1.sources.source1.channels = channel1 tier1.channels.channel1.type = memory tier1.channels.channel1.capacity = 10000 tier1.channels.channel1.transactionCapacity = 1000 tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink tier1.sinks.sink1.topic = sink1 tier1.sinks.sink1.brokerList = kafka01.example.com:9092,kafka02.example.com:9092 tier1.sinks.sink1.channel = channel1 tier1.sinks.sink1.batchSize = 20
The following table describes parameters the Kafka sink supports; required properties are listed in bold. oop
Property Name | Default Value | Description |
---|---|---|
type | 必須設置爲: org.apache.flume.sink.kafka.KafkaSink. | |
brokerList | The brokers the Kafka sink uses to discover topic partitions, formatted as a comma-separated list of hostname:port entries. You do not need to specify the entire list of brokers, but Cloudera recommends that you specify at least two for high availability. | |
topic | default-flume-topic | The Kafka topic to which messages are published by default. If the event header contains a topic field, the event is published to the designated topic, overriding the configured topic. |
batchSize | 100 | The number of messages to process in a single batch. Specifying a larger batchSize can improve throughput and increase latency. |
requiredAcks | 1 | The number of replicas that must acknowledge a message before it is written successfully. Possible values are 0 (do not wait for an acknowledgement), 1 (wait for the leader to acknowledge only), and -1 (wait for all replicas to acknowledge). To avoid potential loss of data in case of a leader failure, set this to -1. |
其餘Kafka producer所支持的屬性 | Used to configure the Kafka producer used by the Kafka sink. You can use any producer properties supported by Kafka. Prepend the producer property name with the prefix kafka. (for example, kafka.compression.codec). See the Kafka documentation for the full list of Kafka producer properties. |
Kafka sink 使用 topic 以及 key properties from the FlumeEvent headers to determine where to send events in Kafka. If the header contains the topic property, that event is sent to the designated topic, overriding the configured topic. If the header contains the key property, that key is used to partition events within the topic. Events with the same key are sent to the same partition. If the key parameter is not specified, events are distributed randomly to partitions. Use these properties to control the topics and partitions to which events are sent through the Flume source or interceptor. 性能
tier1.sources = source1 tier1.channels = channel1 tier1.sinks = sink1 tier1.sources.source1.type = exec tier1.sources.source1.command = /usr/bin/vmstat 1 tier1.sources.source1.channels = channel1 tier1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel tier1.channels.channel1.capacity = 10000 tier1.channels.channel1.transactionCapacity = 1000 tier1.channels.channel1.brokerList = kafka02.example.com:9092,kafka03.example.com:9092 tier1.channels.channel1.topic = channel2 tier1.channels.channel1.zookeeperConnect = zk01.example.com:2181 tier1.channels.channel1.parseAsFlumeEvent = true tier1.sinks.sink1.type = hdfs tier1.sinks.sink1.hdfs.path = /tmp/kafka/channel tier1.sinks.sink1.hdfs.rollInterval = 5 tier1.sinks.sink1.hdfs.rollSize = 0 tier1.sinks.sink1.hdfs.rollCount = 0 tier1.sinks.sink1.hdfs.fileType = DataStream tier1.sinks.sink1.channel = channel1
下面的列表描述了Kafka channel 所支持的參數; 粗體爲必要參數. fetch
Property Name | Default Value | Description |
---|---|---|
type | 必須設置爲:org.apache.flume.channel.kafka.KafkaChannel. | |
brokerList | The brokers the Kafka channel uses to discover topic partitions, formatted as a comma-separated list of hostname:port entries. You do not need to specify the entire list of brokers, but Cloudera recommends that you specify at least two for high availability. | |
zookeeperConnect | The URI of the ZooKeeper server or quorum used by Kafka. This can be a single node (for example, zk01.example.com:2181) or a comma-separated list of nodes in a ZooKeeper quorum (for example, zk01.example.com:2181,zk02.example.com:2181, zk03.example.com:2181). | |
topic | flume-channel | The Kafka topic the channel will use. |
groupID | flume | The unique identifier of the Kafka consumer group the channel uses to register with Kafka. |
parseAsFlumeEvent | true | Set to true if a Flume source is writing to the channel and expects AvroDataums with the FlumeEvent schema (org.apache.flume.source.avro.AvroFlumeEvent) in the channel. Set to false if other producers are writing to the topic that the channel is using. |
readSmallestOffset | false | If true, reads all data in the topic. If false, reads only data written after the channel has started. Only used when parseAsFlumeEvent is false. |
kafka.consumer.timeout.ms | 100 | 當向sink寫數據時輪詢的間隔時間. |
其餘Kafka producer所支持的屬性 | Used to configure the Kafka producer. You can use any producer properties supported by Kafka. Prepend the producer property name with the prefix kafka. (for example, kafka.compression.codec). See the Kafka documentation for the full list of Kafka producer properties. |