1、安裝Zookeeperhtml
參考: Zookeeper的下載、安裝和啓動 java
Zookeeper 集羣搭建--單機僞分佈式集羣apache
2、下載Kafkabootstrap
進入http://kafka.apache.org/downloads 服務器
我這裏使用版本:kafka_2.11-1.0.1.tgzsession
3、Kafka目錄dom
解壓到/usr/local路徑下: tar -zxvf kafka_2.11-1.0.1.tgz分佈式
/bin 操做kafka的可執行腳本ide
/config 配置文件所在的目錄post
/libs 依賴庫目錄
/logs 日誌數據目錄。kafka把server端的日誌分爲: server, request, state, log-cleaner, controller
建立log目錄 cd /usr/local/kafka_2.11-1.0.1 && mkdir kafkaLogs
4、配置
一、配置zookeeper
二、kafka配置
進入config/server.properties
#broker的id,集羣中的每臺機器id惟一,其餘兩臺分別1和2 broker.id=0 #是Kafka綁定的interface,這裏須要寫本機內網ip地址,否則bind端口失敗 #其餘兩臺分別是192.168.1.5和192.168.1.9 host.name=192.168.1.3 #向zookeeper註冊的對外暴露的ip和port,118.212.149.51是192.168.1.3的外網ip地址 #若是不配置kafka部署在外網服務器的話本地是訪問不到的. advertised.listeners=PLAINTEXT://118.212.149.51:9092 #zk集羣的ip和port,zk集羣教程: zookeeper.connect=192.168.1.3:2181,192.168.1.5:2181,192.168.1.9:2181 #log目錄,剛剛上邊建好的. log.dirs=/usr/local/kafka_2.11-1.0.1/kafkaLogs
3、啓動Kafka
一、啓動Kafka
./kafka-server-start.sh ../config/server.properties
啓動Kafka,出現內存不夠。默認內存爲1G,若是設備內存比較小,修改配置
bin下面的kafka-server-start.sh,修改
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
爲export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
這裏啓動一個Kafka。若是是部署kafka集羣,分別啓動多個Kafak。
二、查看broker的Id
登陸Zookeeper
./zkCli.sh -server 127.0.0.1:2181
這裏的0是broker的id
查看broker信息
get /brokers/ids/0
4、建立主題
一、建立主題test1
/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
查看全部的主題:
./kafka-topics.sh --list --zookeeper localhost:2181
5、查看topic詳細信息
./kafka-topics.sh --describe --zookeeper localhost:2181
第一行topic信息摘要: 分別是topic名字(Topic), partition數量(PartitionCount), 副本數量(ReplicationFactor), 配置(Config)
第二行分別爲test1的topic全部partition。依次爲topic名字(Topic), partition號(Partition), 此partition所在的borker(Leader),副本所在的broker(Replicas),
Isr列表(同步狀態副本列表),通俗理解爲替補隊員。 不是每一個broker均可以做爲替補隊員。首先這個broker得存有副本,其次副本還得知足條件。
6、生產消息
./kafka-console-producer.sh --broker-list PLAINTEXT://47.xx.47.120:9092
testTopic1 是主題名稱
7、消費消息
./kafka-console-consumer.sh --bootstrap-server PLAINTEXT://47.xx.xx.120:9092 --topic testTopic1 --from-beginning
8、JAVA實戰
一、引入依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.0.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.1</version> </dependency>
二、生成者
import java.util.Properties; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.log4j.Logger; public class Producer { static Logger log = Logger.getLogger(Producer.class); private static final String TOPIC = "test"; private static final String BROKER_LIST = "47.xx.xx.120:9092"; private static KafkaProducer<String,String> producer = null; /* 初始化生產者 */ static { Properties configs = initConfig(); producer = new KafkaProducer<String, String>(configs); } /* 初始化配置 */ private static Properties initConfig(){ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); return properties; } public static void main(String[] args) throws InterruptedException { //消息實體 ProducerRecord<String , String> record = null; for (int i = 0; i < 5; i++) { record = new ProducerRecord<String, String>(TOPIC, "product value"+(int)(10*(Math.random()))); //發送消息 producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (null != e){ log.info("send error" + e.getMessage()); }else { System.out.println(String.format("offset:%s,partition:%s",recordMetadata.offset(),recordMetadata.partition())); } } }); } producer.close(); } }
啓動生產者,控制檯輸出
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder] 2019-08-01 13:52:22.494 INFO --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [47.xx.xx.120:9092] buffer.memory = 33554432 client.id = compression.type = none connections.max.idle.ms = 540000 enable.idempotence = false interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 0 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer 13:52:22.494 [main] INFO o.a.k.c.producer.ProducerConfig - ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [47.xx.xx.120:9092] buffer.memory = 33554432 client.id = compression.type = none connections.max.idle.ms = 540000 enable.idempotence = false interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 0 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer 2019-08-01 13:52:22.609 INFO --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1 13:52:22.609 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 1.0.1 2019-08-01 13:52:22.609 INFO --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e 13:52:22.609 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : c0518aa65f25317e 2019-08-01 13:52:23.174 INFO --- [ main] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 13:52:23.174 [main] INFO o.a.k.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. offset:15,partition:0 offset:16,partition:0 offset:17,partition:0 offset:18,partition:0 offset:19,partition:0 Process finished with exit code 0
三、消費者
import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.log4j.Logger; public class Consumer { static Logger log = Logger.getLogger(Producer.class); private static final String TOPIC = "test"; private static final String BROKER_LIST = "47.xx.xx.120:9092"; private static KafkaConsumer<String,String> consumer = null; static { Properties configs = initConfig(); consumer = new KafkaConsumer<String, String>(configs); consumer.subscribe(Arrays.asList(TOPIC)); } private static Properties initConfig(){ Properties properties = new Properties(); properties.put("bootstrap.servers",BROKER_LIST); properties.put("group.id","0"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("enable.auto.commit", "true"); properties.setProperty("auto.offset.reset", "earliest"); return properties; } public static void main(String[] args) { while (true) { ConsumerRecords<String, String> records = consumer.poll(10); for (ConsumerRecord<String, String> record : records) { log.info(record); } } } }
運行後,控制檯輸出
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder] 2019-08-01 13:52:43.837 INFO --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [47.xx.xx.120:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = 0 heartbeat.interval.ms = 3000 interceptor.classes = null internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 13:52:43.837 [main] INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [47.xx.xx.120:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = 0 heartbeat.interval.ms = 3000 interceptor.classes = null internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 2019-08-01 13:52:43.972 INFO --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1 13:52:43.972 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 1.0.1 2019-08-01 13:52:43.972 INFO --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e 13:52:43.972 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : c0518aa65f25317e 2019-08-01 13:52:44.350 INFO --- [ main] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=0] Discovered group coordinator 47.xx.xx.120:9092 (id: 2147483647 rack: null) 13:52:44.350 [main] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=0] Discovered group coordinator 47.xx.xx.120:9092 (id: 2147483647 rack: null) 2019-08-01 13:52:44.356 INFO --- [ main] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=0] Revoking previously assigned partitions [] 13:52:44.356 [main] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=0] Revoking previously assigned partitions [] 2019-08-01 13:52:44.356 INFO --- [ main] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=0] (Re-)joining group 13:52:44.356 [main] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=0] (Re-)joining group 2019-08-01 13:52:44.729 INFO --- [ main] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=0] Successfully joined group with generation 3 13:52:44.729 [main] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=0] Successfully joined group with generation 3 2019-08-01 13:52:44.730 INFO --- [ main] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=0] Setting newly assigned partitions [test-0] 13:52:44.730 [main] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=0] Setting newly assigned partitions [test-0] 2019-08-01 13:52:45.033 INFO --- [ main] c.e.filesmanager.utils.kafka.Producer : ConsumerRecord(topic = test, partition = 0, offset = 15, CreateTime = 1564638743167, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value6) 13:52:45.033 [main] INFO c.e.f.utils.kafka.Producer - ConsumerRecord(topic = test, partition = 0, offset = 15, CreateTime = 1564638743167, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value6) 2019-08-01 13:52:45.033 INFO --- [ main] c.e.filesmanager.utils.kafka.Producer : ConsumerRecord(topic = test, partition = 0, offset = 16, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value7) 13:52:45.033 [main] INFO c.e.f.utils.kafka.Producer - ConsumerRecord(topic = test, partition = 0, offset = 16, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value7) 2019-08-01 13:52:45.033 INFO --- [ main] c.e.filesmanager.utils.kafka.Producer : ConsumerRecord(topic = test, partition = 0, offset = 17, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value4) 13:52:45.033 [main] INFO c.e.f.utils.kafka.Producer - ConsumerRecord(topic = test, partition = 0, offset = 17, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value4) 2019-08-01 13:52:45.033 INFO --- [ main] c.e.filesmanager.utils.kafka.Producer : ConsumerRecord(topic = test, partition = 0, offset = 18, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value4) 13:52:45.033 [main] INFO c.e.f.utils.kafka.Producer - ConsumerRecord(topic = test, partition = 0, offset = 18, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value4) 2019-08-01 13:52:45.033 INFO --- [ main] c.e.filesmanager.utils.kafka.Producer : ConsumerRecord(topic = test, partition = 0, offset = 19, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value3) 13:52:45.033 [main] INFO c.e.f.utils.kafka.Producer - ConsumerRecord(topic = test, partition = 0, offset = 19, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value3)