①.kafka須要依賴zk管理,在搭建kafka集羣以前須要先搭建zk集羣:html
http://www.javashuo.com/article/p-mntypphp-hv.htmljava
②.從apache kafka官網下載kafka( 二進制版本)node
注意下載的版本不然會在啓動時報錯:找不到主類Kafka.kafka.express
我這裏使用的是2.10版本.apache
③.配置config/server.properties文件:bootstrap
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. #每一個Broker在集羣中的惟一標識.即便Broker的IP地址發生變化,broker.id只要沒變, #則不會影響consumers的消息標識. broker.id=0 #相似於zk的myid, #是否容許Topic被刪除,若是是false,使用管理員工具刪除Topic的時候,kafka並不會處理此操做 delete.topic.enable=true #是否容許自動建立topic,如果false,就須要經過命令建立topic,默認爲true,建議設置成false, #並在使用topic以前手動建立. #若是打開此選項(true)則如下2種請求會觸發topic的自動建立: #①.producer向某個不存在的topic寫入消息 #②.consumer某個不存在的topic讀取消息 auto.create.topics.enable =true ############################# Socket Server Settings ############################# ############################# 下面是服務端網絡的相關配置 ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # kafka server使用的協議,主機名及端口格式以下: # listeners = security_protocol://host_name:port # EXAMPLE: #參考示例: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092,#這是默認配置,使用PLAINTEXT,端口是9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092 # The number of threads handling network requests #broker處理消息的最大線程數,通常狀況下不須要去修改 num.network.threads=3 # The number of threads doing disk I/O #broker處理磁盤IO的線程數,數值應該大於你的硬盤數 num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server #socket的發送緩衝區,socket的調優參數SO_SNDBUFF,若是是-1就使用操做系統的默認值 socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server #socket的接受緩衝區,socket的調優參數SO_RCVBUFF,若是是-1就使用操做系統的默認值 socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) #socket請求的最大數值,防止serverOOM,message.max.bytes必然要小於socket.request.max.bytes,會被topic建立時的指定參數覆蓋 socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma seperated list of directories under which to store log files #存儲log文件的目錄,能夠將多個目錄經過逗號分隔,造成一個目錄列表 log.dirs=/tmp/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. #每一個topic的分區個數,默認爲1,如果在topic建立時候沒有指定的話會被topic建立時的指定參數覆蓋 num.partitions=3 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. #用來恢復log文件以及關閉時將log數據刷新到磁盤的線程數量,每一個目錄對應num.recovery.threads.per.data.dir個線程 num.recovery.threads.per.data.dir=1 ############################# Log Flush Policy ############################# ############################# log文件刷盤的相關配置 ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log文件」sync」到磁盤以前累積的消息條數,由於磁盤IO操做是一個慢操做, #但又是一個」數據可靠性"的必要手段,因此此參數的設置,須要在"數據可靠性"與"性能"之間作必要的權衡. #若是此值過大,將會致使每次"fsync"的時間較長(IO阻塞), #若是此值太小,將會致使"fsync"的次數較多, #這也意味着總體的client請求有必定的延遲.物理server故障,將會致使沒有fsync的消息丟失. #每隔多少個消息觸發一次flush操做,將內存中的數據刷新到磁盤 #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #僅僅經過interval來控制消息的磁盤寫入時機,是不足的. #此參數用於控制"fsync"的時間間隔,若是消息量始終沒有達到閥值,可是離上一次磁盤同步的時間間隔達到閥值,也將觸發. #每隔多少毫秒觸發一次flush操做,將內存中的數據刷新到磁盤 #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# ############################# Log 相關的保存策略 ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. #注意:下面有兩種配置,一種是基於時間的策略,另外一種是基於日誌文件大小的策略,兩種. #策略同是配置的話,只要知足其中一種,則觸發log刪除的操做,刪除操做老是刪除最舊的日誌 # The minimum age of a log file to be eligible for deletion #消息在kafka中保存的時間,168小時前的log,能夠被刪除掉 log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. #當剩餘空間低於log.segment.bytes字節,則開始刪除log #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. # segment日誌文件大小的上限值,當超過這個值,會建立新的segment日誌文件 log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies #每隔300000ms,logcleaner線程將檢查一次,看是否符合上述保留策略的消息能夠被刪除 log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# ############################# Zookeeper的相關配置 ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. #kafka依賴的Zookeeper集羣地址,能夠配置多個Zookeeper地址,使用,隔開 zookeeper.connect=zk1:2181,zk2:2181,zk3:2181 # Timeout in ms for connecting to zookeeper #Zookeeper鏈接超時的超時時間 zookeeper.connection.timeout.ms=6000④.查看啓動日誌:網絡
[2017-09-16 19:22:12,567] INFO KafkaConfig values: advertised.host.name = null advertised.listeners = null advertised.port = null authorizer.class.name = auto.create.topics.enable = true auto.leader.rebalance.enable = true background.threads = 10 broker.id = 3 broker.id.generation.enable = true broker.rack = null compression.type = producer connections.max.idle.ms = 600000 controlled.shutdown.enable = true controlled.shutdown.max.retries = 3 controlled.shutdown.retry.backoff.ms = 5000 controller.socket.timeout.ms = 30000 default.replication.factor = 1 delete.topic.enable = false fetch.purgatory.purge.interval.requests = 1000 group.max.session.timeout.ms = 300000 group.min.session.timeout.ms = 6000 host.name = inter.broker.protocol.version = 0.10.1-IV2 leader.imbalance.check.interval.seconds = 300 leader.imbalance.per.broker.percentage = 10 listeners = PLAINTEXT://k1:9092 log.cleaner.backoff.ms = 15000 log.cleaner.dedupe.buffer.size = 134217728 log.cleaner.delete.retention.ms = 86400000 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308 log.cleaner.min.cleanable.ratio = 0.5 log.cleaner.min.compaction.lag.ms = 0 log.cleaner.threads = 1 log.cleanup.policy = [delete] log.dir = /tmp/kafka-logs log.dirs = /usr/local/kafka_2.10/kafka-logs log.flush.interval.messages = 9223372036854775807 log.flush.interval.ms = null log.flush.offset.checkpoint.interval.ms = 60000 log.flush.scheduler.interval.ms = 9223372036854775807 log.index.interval.bytes = 4096 log.index.size.max.bytes = 10485760 log.message.format.version = 0.10.1-IV2 log.message.timestamp.difference.max.ms = 9223372036854775807 log.message.timestamp.type = CreateTime log.preallocate = false log.retention.bytes = -1 log.retention.check.interval.ms = 300000 log.retention.hours = 168 log.retention.minutes = null log.retention.ms = null log.roll.hours = 168 log.roll.jitter.hours = 0 log.roll.jitter.ms = null log.roll.ms = null log.segment.bytes = 1073741824 log.segment.delete.delay.ms = 60000 max.connections.per.ip = 2147483647 max.connections.per.ip.overrides = message.max.bytes = 1000012 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 min.insync.replicas = 1 num.io.threads = 8 num.network.threads = 3 num.partitions = 3 num.recovery.threads.per.data.dir = 1 num.replica.fetchers = 1 offset.metadata.max.bytes = 4096 offsets.commit.required.acks = -1 offsets.commit.timeout.ms = 5000 offsets.load.buffer.size = 5242880 offsets.retention.check.interval.ms = 600000 offsets.retention.minutes = 1440 offsets.topic.compression.codec = 0 offsets.topic.num.partitions = 50 offsets.topic.replication.factor = 3 offsets.topic.segment.bytes = 104857600 port = 9092 principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder producer.purgatory.purge.interval.requests = 1000 ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS unclean.leader.election.enable = true zookeeper.connect = zk1:2181,zk2:2181,zk3:2181 zookeeper.connection.timeout.ms = 6000 zookeeper.session.timeout.ms = 6000 zookeeper.set.acl = false zookeeper.sync.time.ms = 2000 (kafka.server.KafkaConfig) [2017-09-16 19:22:12,910] INFO starting (kafka.server.KafkaServer) [2017-09-16 19:22:13,183] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager$ThrottledReques tReaper) [2017-09-16 19:22:13,183] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager$ThrottledRequ estReaper) queued.max.requests = 500 quota.consumer.default = 9223372036854775807 quota.producer.default = 9223372036854775807 quota.window.num = 11 quota.window.size.seconds = 1 replica.fetch.backoff.ms = 1000 replica.fetch.max.bytes = 1048576 replica.fetch.min.bytes = 1 replica.fetch.response.max.bytes = 10485760 replica.fetch.wait.max.ms = 500 replica.high.watermark.checkpoint.interval.ms = 5000 replica.lag.time.max.ms = 10000 replica.socket.receive.buffer.bytes = 65536 replica.socket.timeout.ms = 30000 replication.quota.window.num = 11 replication.quota.window.size.seconds = 1 request.timeout.ms = 30000 reserved.broker.max.id = 1000 sasl.enabled.mechanisms = [GSSAPI] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.principal.to.local.rules = [DEFAULT] sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism.inter.broker.protocol = GSSAPI security.inter.broker.protocol = PLAINTEXT socket.receive.buffer.bytes = 102400 socket.request.max.bytes = 104857600 socket.send.buffer.bytes = 102400 ssl.cipher.suites = null ssl.client.auth = none ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null [2017-09-16 19:22:13,241] INFO Connecting to zookeeper on zk1:2181,zk2:2181,zk3:2181 (kafka .server.KafkaServer) [2017-09-16 19:22:15,475] INFO Cluster ID = YkyEXTiPR62G5jdo1v6rKQ (kafka.server.KafkaServer) [2017-09-16 19:22:15,570] INFO Log directory '/usr/local/kafka_2.10/kafka-logs' not found, creating it. (kafka.log.LogMan ager) [2017-09-16 19:22:15,708] INFO Loading logs. (kafka.log.LogManager) [2017-09-16 19:22:15,723] INFO Logs loading complete in 15 ms. (kafka.log.LogManager) [2017-09-16 19:22:21,676] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [2017-09-16 19:22:21,844] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManage r) [2017-09-16 19:22:21,850] WARN No meta.properties file under dir /usr/local/kafka_2.10/kafka-logs/meta.properties (kafka. server.BrokerMetadataCheckpoint) [2017-09-16 19:22:22,028] INFO Awaiting socket connections on k3:9092. (kafka.network.Acceptor) [2017-09-16 19:22:22,032] INFO [Socket Server on Broker 3], Started 1 acceptor threads (kafka.network.SocketServer) [2017-09-16 19:22:22,081] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationRe aper) [2017-09-16 19:22:22,092] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationRe aper) [2017-09-16 19:22:22,174] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationRe aper) [2017-09-16 19:22:22,181] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationRe aper) [2017-09-16 19:22:22,186] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationRe aper) [2017-09-16 19:22:22,218] INFO [GroupCoordinator 3]: Starting up. (kafka.coordinator.GroupCoordinator) [2017-09-16 19:22:22,220] INFO [GroupCoordinator 3]: Startup complete. (kafka.coordinator.GroupCoordinator) [2017-09-16 19:22:22,233] INFO [Group Metadata Manager on Broker 3]: Removed 0 expired offsets in 8 milliseconds. (kafka. coordinator.GroupMetadataManager) [2017-09-16 19:22:22,890] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) [2017-09-16 19:22:22,992] INFO Creating /brokers/ids/3 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2017-09-16 19:22:23,087] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2017-09-16 19:22:23,090] INFO Registered broker 3 at path /brokers/ids/3 with addresses: PLAINTEXT -> EndPoint(192.168.1 .137,9092,PLAINTEXT) (kafka.utils.ZkUtils) [2017-09-16 19:22:23,092] WARN No meta.properties file under dir /usr/local/kafka_2.10/kafka-logs/meta.properties (kafka. server.BrokerMetadataCheckpoint) [2017-09-16 19:22:23,498] INFO [Kafka Server 3], started (kafka.server.KafkaServer) [2017-09-16 19:32:22,220] INFO [Group Metadata Manager on Broker 3]: Removed 0 expired offsets in 0 milliseconds. (kafka. coordinator.GroupMetadataManager)⑤.建立一個topicsession
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test-1 --partitions 3 --replication-factor 3 --config max.message.bytes=64000 --config flush.messages=1⑥.查看topic信息:app
能夠看到主題,分區,副本等一些信息less
[root@localhost kafka_2.10]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test-1 Topic:test-1 PartitionCount:3 ReplicationFactor:3 Configs:max.message.bytes=64000,flush.messages=1 Topic: test-1 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: test-1 Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: test-1 Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2⑦.java client代碼:
生產者:
List<ProducerInterceptor<Integer,String>> interceptors = new ArrayList<ProducerInterceptor<Integer,String>>(); interceptors.add(new KafkaProducerInterceptor()); Properties props = new Properties(); props.put("bootstrap.servers", KafkaCfg.BROCKER_LIST); props.put("key.serializer", IntegerSerializer.class); props.put("value.serializer", StringSerializer.class); props.put("compression.type", "gzip"); @SuppressWarnings("resource") KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(props); String content = ""; for(int i =0;i<100;i++){ content = "hello:"+(i+1); ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>( "test-1", i, content); producer.send(record, new KafkaHandle()); System.out.println("async message:" + content); }SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/E:/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.1/log4j-slf4j-impl-2.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/E:/repository/org/slf4j/slf4j-log4j12/1.7.1/slf4j-log4j12-1.7.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] 722 [main] INFO o.a.k.c.p.ProducerConfig - ProducerConfig values: interceptor.classes = null request.timeout.ms = 30000 ssl.truststore.password = null retry.backoff.ms = 100 buffer.memory = 33554432 batch.size = 16384 ssl.keymanager.algorithm = SunX509 receive.buffer.bytes = 32768 ssl.key.password = null ssl.cipher.suites = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.service.name = null ssl.provider = null max.in.flight.requests.per.connection = 5 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI bootstrap.servers = [192.168.1.135:9092, 192.168.1.136:9092, 192.168.1.137:9092] client.id = max.request.size = 1048576 acks = 1 linger.ms = 0 sasl.kerberos.kinit.cmd = /usr/bin/kinit ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] metadata.fetch.timeout.ms = 60000 ssl.endpoint.identification.algorithm = null ssl.keystore.location = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer ssl.truststore.location = null ssl.keystore.password = null block.on.buffer.full = false key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer metrics.sample.window.ms = 30000 security.protocol = PLAINTEXT metadata.max.age.ms = 300000 ssl.protocol = TLS sasl.kerberos.min.time.before.relogin = 60000 timeout.ms = 30000 connections.max.idle.ms = 540000 ssl.trustmanager.algorithm = PKIX metric.reporters = [] ssl.truststore.type = JKS compression.type = gzip retries = 0 max.block.ms = 60000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner send.buffer.bytes = 131072 reconnect.backoff.ms = 50 metrics.num.samples = 2 ssl.keystore.type = JKS 794 [main] INFO o.a.k.c.p.ProducerConfig - ProducerConfig values: interceptor.classes = null request.timeout.ms = 30000 ssl.truststore.password = null retry.backoff.ms = 100 buffer.memory = 33554432 batch.size = 16384 ssl.keymanager.algorithm = SunX509 receive.buffer.bytes = 32768 ssl.key.password = null ssl.cipher.suites = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.service.name = null ssl.provider = null max.in.flight.requests.per.connection = 5 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI bootstrap.servers = [192.168.1.135:9092, 192.168.1.136:9092, 192.168.1.137:9092] client.id = producer-1 max.request.size = 1048576 acks = 1 linger.ms = 0 sasl.kerberos.kinit.cmd = /usr/bin/kinit ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] metadata.fetch.timeout.ms = 60000 ssl.endpoint.identification.algorithm = null ssl.keystore.location = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer ssl.truststore.location = null ssl.keystore.password = null block.on.buffer.full = false key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer metrics.sample.window.ms = 30000 security.protocol = PLAINTEXT metadata.max.age.ms = 300000 ssl.protocol = TLS sasl.kerberos.min.time.before.relogin = 60000 timeout.ms = 30000 connections.max.idle.ms = 540000 ssl.trustmanager.algorithm = PKIX metric.reporters = [] ssl.truststore.type = JKS compression.type = gzip retries = 0 max.block.ms = 60000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner send.buffer.bytes = 131072 reconnect.backoff.ms = 50 metrics.num.samples = 2 ssl.keystore.type = JKS 798 [main] INFO o.a.k.c.u.AppInfoParser - Kafka version : 0.10.0.1 798 [main] INFO o.a.k.c.u.AppInfoParser - Kafka commitId : a7a17cdec9eaa6c5 async message:hello:1 async message:hello:2 async message:hello:3 async message:hello:4 async message:hello:5 async message:hello:6 async message:hello:7 async message:hello:8 async message:hello:9 async message:hello:10 async message:hello:11 async message:hello:12 async message:hello:13 async message:hello:14 async message:hello:15 async message:hello:16 async message:hello:17 async message:hello:18 async message:hello:19 async message:hello:20 async message:hello:21 async message:hello:22 async message:hello:23 async message:hello:24 async message:hello:25 async message:hello:26 async message:hello:27 async message:hello:28 async message:hello:29 async message:hello:30 async message:hello:31 async message:hello:32 async message:hello:33 async message:hello:34 async message:hello:35 async message:hello:36 async message:hello:37 async message:hello:38 async message:hello:39 async message:hello:40 async message:hello:41 async message:hello:42 async message:hello:43 async message:hello:44 async message:hello:45 async message:hello:46 async message:hello:47 async message:hello:48 async message:hello:49 async message:hello:50 async message:hello:51 async message:hello:52 async message:hello:53 async message:hello:54 async message:hello:55 async message:hello:56 async message:hello:57 async message:hello:58 async message:hello:59 async message:hello:60 async message:hello:61 async message:hello:62 async message:hello:63 async message:hello:64 async message:hello:65 async message:hello:66 async message:hello:67 async message:hello:68 async message:hello:69 async message:hello:70 async message:hello:71 async message:hello:72 async message:hello:73 async message:hello:74 async message:hello:75 async message:hello:76 async message:hello:77 async message:hello:78 async message:hello:79 async message:hello:80 async message:hello:81 async message:hello:82 async message:hello:83 async message:hello:84 async message:hello:85 async message:hello:86 async message:hello:87 async message:hello:88 async message:hello:89 async message:hello:90 async message:hello:91 async message:hello:92 async message:hello:93 async message:hello:94 async message:hello:95 async message:hello:96 async message:hello:97 async message:hello:98 async message:hello:99 async message:hello:100消費者:
ExecutorService fixedPool = Executors.newFixedThreadPool(3); fixedPool.execute(new Runnable() { public void run() { Properties props = new Properties(); props.put("bootstrap.servers", KafkaCfg.BROCKER_LIST); props.put("group.id", KafkaCfg.GROUP_ID); props.put("zookeeper.session.timeout.ms", "60000"); props.put("zookeeper.sync.time.ms", "200"); props.put("enable.auto.commit", "true"); // 自動commit props.put("auto.commit.interval.ms", "1000"); //latest, earliest, none props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", IntegerDeserializer.class); props.put("value.deserializer", StringDeserializer.class); KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(props); consumer.subscribe(Arrays.asList(KafkaCfg.TOPIC, KafkaCfg.TOPIC2)); // 可消費多個topic,組成一個list while (true) { ConsumerRecords<Integer, String> records = consumer.poll(100); for (ConsumerRecord<Integer, String> record : records) { System.out.println("record:"+new Gson().toJson(record)); } } } });record:{"topic":"test-1","partition":0,"offset":17,"timestamp":1505629339505,"timestampType":"CREATE_TIME","checksum":3084842117,"serializedKeySize":4,"serializedValueSize":7,"key":1,"value":"hello:2"} record:{"topic":"test-1","partition":0,"offset":18,"timestamp":1505629339520,"timestampType":"CREATE_TIME","checksum":2036504617,"serializedKeySize":4,"serializedValueSize":7,"key":7,"value":"hello:8"} record:{"topic":"test-1","partition":0,"offset":19,"timestamp":1505629339520,"timestampType":"CREATE_TIME","checksum":2096183246,"serializedKeySize":4,"serializedValueSize":7,"key":8,"value":"hello:9"} record:{"topic":"test-1","partition":0,"offset":20,"timestamp":1505629339524,"timestampType":"CREATE_TIME","checksum":1567468433,"serializedKeySize":4,"serializedValueSize":8,"key":14,"value":"hello:15"} record:{"topic":"test-1","partition":0,"offset":21,"timestamp":1505629339525,"timestampType":"CREATE_TIME","checksum":2250809392,"serializedKeySize":4,"serializedValueSize":8,"key":15,"value":"hello:16"} record:{"topic":"test-1","partition":0,"offset":22,"timestamp":1505629339525,"timestampType":"CREATE_TIME","checksum":795944797,"serializedKeySize":4,"serializedValueSize":8,"key":17,"value":"hello:18"} record:{"topic":"test-1","partition":0,"offset":23,"timestamp":1505629339526,"timestampType":"CREATE_TIME","checksum":1596880373,"serializedKeySize":4,"serializedValueSize":8,"key":21,"value":"hello:22"} record:{"topic":"test-1","partition":0,"offset":24,"timestamp":1505629339527,"timestampType":"CREATE_TIME","checksum":2549012433,"serializedKeySize":4,"serializedValueSize":8,"key":26,"value":"hello:27"} record:{"topic":"test-1","partition":0,"offset":25,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":3946489373,"serializedKeySize":4,"serializedValueSize":8,"key":30,"value":"hello:31"} record:{"topic":"test-1","partition":0,"offset":26,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":4171966126,"serializedKeySize":4,"serializedValueSize":8,"key":32,"value":"hello:33"} record:{"topic":"test-1","partition":0,"offset":27,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":3143199368,"serializedKeySize":4,"serializedValueSize":8,"key":33,"value":"hello:34"} record:{"topic":"test-1","partition":0,"offset":28,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":889962223,"serializedKeySize":4,"serializedValueSize":8,"key":35,"value":"hello:36"} record:{"topic":"test-1","partition":0,"offset":29,"timestamp":1505629339529,"timestampType":"CREATE_TIME","checksum":658609139,"serializedKeySize":4,"serializedValueSize":8,"key":38,"value":"hello:39"} record:{"topic":"test-1","partition":0,"offset":30,"timestamp":1505629339530,"timestampType":"CREATE_TIME","checksum":1769068338,"serializedKeySize":4,"serializedValueSize":8,"key":42,"value":"hello:43"} record:{"topic":"test-1","partition":0,"offset":31,"timestamp":1505629339531,"timestampType":"CREATE_TIME","checksum":3207409220,"serializedKeySize":4,"serializedValueSize":8,"key":44,"value":"hello:45"} record:{"topic":"test-1","partition":2,"offset":17,"timestamp":1505629339518,"timestampType":"CREATE_TIME","checksum":3419956930,"serializedKeySize":4,"serializedValueSize":7,"key":2,"value":"hello:3"} record:{"topic":"test-1","partition":2,"offset":18,"timestamp":1505629339519,"timestampType":"CREATE_TIME","checksum":2857189508,"serializedKeySize":4,"serializedValueSize":7,"key":5,"value":"hello:6"} record:{"topic":"test-1","partition":2,"offset":19,"timestamp":1505629339519,"timestampType":"CREATE_TIME","checksum":2866062050,"serializedKeySize":4,"serializedValueSize":7,"key":6,"value":"hello:7"} record:{"topic":"test-1","partition":2,"offset":20,"timestamp":1505629339522,"timestampType":"CREATE_TIME","checksum":577748521,"serializedKeySize":4,"serializedValueSize":8,"key":12,"value":"hello:13"} record:{"topic":"test-1","partition":2,"offset":21,"timestamp":1505629339525,"timestampType":"CREATE_TIME","checksum":1649992521,"serializedKeySize":4,"serializedValueSize":8,"key":16,"value":"hello:17"} record:{"topic":"test-1","partition":0,"offset":32,"timestamp":1505629339533,"timestampType":"CREATE_TIME","checksum":2322283505,"serializedKeySize":4,"serializedValueSize":8,"key":47,"value":"hello:48"} record:{"topic":"test-1","partition":0,"offset":33,"timestamp":1505629339535,"timestampType":"CREATE_TIME","checksum":2329901557,"serializedKeySize":4,"serializedValueSize":8,"key":48,"value":"hello:49"} record:{"topic":"test-1","partition":2,"offset":22,"timestamp":1505629339525,"timestampType":"CREATE_TIME","checksum":3854334725,"serializedKeySize":4,"serializedValueSize":8,"key":18,"value":"hello:19"} record:{"topic":"test-1","partition":2,"offset":23,"timestamp":1505629339525,"timestampType":"CREATE_TIME","checksum":1792756199,"serializedKeySize":4,"serializedValueSize":8,"key":19,"value":"hello:20"} record:{"topic":"test-1","partition":2,"offset":24,"timestamp":1505629339526,"timestampType":"CREATE_TIME","checksum":2514692525,"serializedKeySize":4,"serializedValueSize":8,"key":22,"value":"hello:23"} record:{"topic":"test-1","partition":2,"offset":25,"timestamp":1505629339527,"timestampType":"CREATE_TIME","checksum":1562610569,"serializedKeySize":4,"serializedValueSize":8,"key":25,"value":"hello:26"} record:{"topic":"test-1","partition":2,"offset":26,"timestamp":1505629339527,"timestampType":"CREATE_TIME","checksum":3501401355,"serializedKeySize":4,"serializedValueSize":8,"key":28,"value":"hello:29"} record:{"topic":"test-1","partition":2,"offset":27,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":2946838050,"serializedKeySize":4,"serializedValueSize":8,"key":31,"value":"hello:32"} record:{"topic":"test-1","partition":2,"offset":28,"timestamp":1505629339529,"timestampType":"CREATE_TIME","checksum":2695171007,"serializedKeySize":4,"serializedValueSize":8,"key":36,"value":"hello:37"} record:{"topic":"test-1","partition":2,"offset":29,"timestamp":1505629339530,"timestampType":"CREATE_TIME","checksum":3877831509,"serializedKeySize":4,"serializedValueSize":8,"key":40,"value":"hello:41"} record:{"topic":"test-1","partition":2,"offset":30,"timestamp":1505629339530,"timestampType":"CREATE_TIME","checksum":2747042666,"serializedKeySize":4,"serializedValueSize":8,"key":41,"value":"hello:42"} record:{"topic":"test-1","partition":2,"offset":31,"timestamp":1505629339531,"timestampType":"CREATE_TIME","checksum":4222789243,"serializedKeySize":4,"serializedValueSize":8,"key":45,"value":"hello:46"} record:{"topic":"test-1","partition":2,"offset":32,"timestamp":1505629339531,"timestampType":"CREATE_TIME","checksum":830470691,"serializedKeySize":4,"serializedValueSize":8,"key":46,"value":"hello:47"} record:{"topic":"test-1","partition":1,"offset":19,"timestamp":1505629339461,"timestampType":"CREATE_TIME","checksum":27654439,"serializedKeySize":4,"serializedValueSize":7,"key":0,"value":"hello:1"} record:{"topic":"test-1","partition":1,"offset":20,"timestamp":1505629339519,"timestampType":"CREATE_TIME","checksum":2877195336,"serializedKeySize":4,"serializedValueSize":7,"key":3,"value":"hello:4"} record:{"topic":"test-1","partition":1,"offset":21,"timestamp":1505629339519,"timestampType":"CREATE_TIME","checksum":2833341777,"serializedKeySize":4,"serializedValueSize":7,"key":4,"value":"hello:5"} record:{"topic":"test-1","partition":1,"offset":22,"timestamp":1505629339520,"timestampType":"CREATE_TIME","checksum":1116560893,"serializedKeySize":4,"serializedValueSize":8,"key":9,"value":"hello:10"} record:{"topic":"test-1","partition":1,"offset":23,"timestamp":1505629339520,"timestampType":"CREATE_TIME","checksum":2285896101,"serializedKeySize":4,"serializedValueSize":8,"key":10,"value":"hello:11"} record:{"topic":"test-1","partition":1,"offset":24,"timestamp":1505629339522,"timestampType":"CREATE_TIME","checksum":672893159,"serializedKeySize":4,"serializedValueSize":8,"key":11,"value":"hello:12"} record:{"topic":"test-1","partition":1,"offset":25,"timestamp":1505629339522,"timestampType":"CREATE_TIME","checksum":1637741071,"serializedKeySize":4,"serializedValueSize":8,"key":13,"value":"hello:14"} record:{"topic":"test-1","partition":2,"offset":33,"timestamp":1505629339543,"timestampType":"CREATE_TIME","checksum":3620398696,"serializedKeySize":4,"serializedValueSize":8,"key":51,"value":"hello:52"} record:{"topic":"test-1","partition":2,"offset":34,"timestamp":1505629339545,"timestampType":"CREATE_TIME","checksum":242342934,"serializedKeySize":4,"serializedValueSize":8,"key":52,"value":"hello:53"} record:{"topic":"test-1","partition":2,"offset":35,"timestamp":1505629339547,"timestampType":"CREATE_TIME","checksum":2840039757,"serializedKeySize":4,"serializedValueSize":8,"key":53,"value":"hello:54"} record:{"topic":"test-1","partition":1,"offset":26,"timestamp":1505629339526,"timestampType":"CREATE_TIME","checksum":464649674,"serializedKeySize":4,"serializedValueSize":8,"key":20,"value":"hello:21"} record:{"topic":"test-1","partition":1,"offset":27,"timestamp":1505629339526,"timestampType":"CREATE_TIME","checksum":3591464331,"serializedKeySize":4,"serializedValueSize":8,"key":23,"value":"hello:24"} record:{"topic":"test-1","partition":1,"offset":28,"timestamp":1505629339526,"timestampType":"CREATE_TIME","checksum":2254864424,"serializedKeySize":4,"serializedValueSize":8,"key":24,"value":"hello:25"} record:{"topic":"test-1","partition":1,"offset":29,"timestamp":1505629339527,"timestampType":"CREATE_TIME","checksum":3670479813,"serializedKeySize":4,"serializedValueSize":8,"key":27,"value":"hello:28"} record:{"topic":"test-1","partition":1,"offset":30,"timestamp":1505629339527,"timestampType":"CREATE_TIME","checksum":1843557739,"serializedKeySize":4,"serializedValueSize":8,"key":29,"value":"hello:30"} record:{"topic":"test-1","partition":1,"offset":31,"timestamp":1505629339528,"timestampType":"CREATE_TIME","checksum":1905538768,"serializedKeySize":4,"serializedValueSize":8,"key":34,"value":"hello:35"} record:{"topic":"test-1","partition":1,"offset":32,"timestamp":1505629339529,"timestampType":"CREATE_TIME","checksum":3985428395,"serializedKeySize":4,"serializedValueSize":8,"key":37,"value":"hello:38"} record:{"topic":"test-1","partition":1,"offset":33,"timestamp":1505629339529,"timestampType":"CREATE_TIME","checksum":3427427349,"serializedKeySize":4,"serializedValueSize":8,"key":39,"value":"hello:40"} record:{"topic":"test-1","partition":1,"offset":34,"timestamp":1505629339530,"timestampType":"CREATE_TIME","checksum":713267988,"serializedKeySize":4,"serializedValueSize":8,"key":43,"value":"hello:44"} record:{"topic":"test-1","partition":1,"offset":35,"timestamp":1505629339536,"timestampType":"CREATE_TIME","checksum":813675607,"serializedKeySize":4,"serializedValueSize":8,"key":49,"value":"hello:50"} record:{"topic":"test-1","partition":1,"offset":36,"timestamp":1505629339541,"timestampType":"CREATE_TIME","checksum":2006019882,"serializedKeySize":4,"serializedValueSize":8,"key":50,"value":"hello:51"}