參考官網:http://kafka.apache.org/quickstartjava
官網下載地址 http://kafka.apache.org/downloadsnode
截至2019年7月8日 最新版本爲 2.3.0 2.12爲編譯的scala版本 2.3.0爲kafka版本git
Scala 2.12 - kafka_2.12-2.3.0.tgz (asc, sha512)github
解壓 > tar -xzf kafka_2.12-2.3.0.tgz > cd kafka_2.12-2.3.0
要先啓動zookeeper kafka內置了一個 也能夠不用apache
> bin/zookeeper-server-start.sh config/zookeeper.properties [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ... > bin/kafka-server-start.sh config/server.properties [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) ...
replication-factor爲1 partitions爲1 > bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test 查看topic > bin/kafka-topics.sh --list --bootstrap-server localhost:9092 test
也能夠不建立topic 設置自動建立 當publish的時候bootstrap
用command line client 進行測試 一行就是一條消息app
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
command line consumer 能夠接收消息less
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message
單broker沒有意思 咱們能夠設置三個brokerdom
首先爲每一個broker 複製配置文件socket
> cp config/server.properties config/server-1.properties > cp config/server.properties config/server-2.properties
而後編輯
config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dirs=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dirs=/tmp/kafka-logs-2
broker.id是惟一的 cluster中每個node的名字 咱們在same machine上 全部要設置listeners和log.dirs 以防衝突
建一個topic 一個partitions 三個replication-factor
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
用describe看看都是什麼狀況 > bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
有幾個概念 :
"leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
"replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
"isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
剛纔那個topic > bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
發送 接收
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic ... my test message 1 my test message 2 ^C > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 1 my test message 2 ^C
試一下容錯 fault-tolerance
> ps aux | grep server-1.properties 7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java... > kill -9 7564 看一下變化:Leader換了一個 由於1被幹掉了 > bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
仍是收到了消息 > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 1 my test message 2 ^C
剛纔都是console 的數據,其餘的sources other systems呢 用Kafka Connect
弄一個數據 > echo -e "foo\nbar" > test.txt
啓動 指定配置文件 > bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
驗證一下 > more test.sink.txt foo bar
消費者端 > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning {"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"} ...
能夠繼續寫入 > echo Another line>> test.txt
http://kafka.apache.org/22/documentation/streams/quickstart
WordCountDemo
代碼片斷
// Serializers/deserializers (serde) for String and Long types final Serde<String> stringSerde = Serdes.String(); final Serde<Long> longSerde = Serdes.Long(); // Construct a `KStream` from the input topic "streams-plaintext-input", where message values // represent lines of text (for the sake of this example, we ignore whatever may be stored // in the message keys). KStream<String, String> textLines = builder.stream("streams-plaintext-input", Consumed.with(stringSerde, stringSerde); KTable<String, Long> wordCounts = textLines // Split each text line, by whitespace, into words. .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) // Group the text words as message keys .groupBy((key, value) -> value) // Count the occurrences of each word (message key). .count() // Store the running counts as a changelog stream to the output topic. wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
建一個 Kafka producer 指定input topic output topic
> bin/kafka-topics.sh --create \ --bootstrap-server localhost:9092 \ --replication-factor 1 \ --partitions 1 \ --topic streams-wordcount-output \ --config cleanup.policy=compact Created topic "streams-wordcount-output".
啓動WordCount demo application
bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
啓動一個生產者寫數據
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input all streams lead to kafka hello kafka streams
啓動一個消費者接數據
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer all 1 streams 1 lead 1 to 1 kafka 1 hello 1 kafka 2 streams 2 kafka 1