Kafka入門 --安裝和簡單實用

 

1、安裝Zookeeperhtml

參考: Zookeeper的下載、安裝和啓動 java

Zookeeper 集羣搭建--單機僞分佈式集羣apache

 

2、下載Kafkabootstrap

進入http://kafka.apache.org/downloads 服務器

我這裏使用版本:kafka_2.11-1.0.1.tgzsession

 

3、Kafka目錄dom

解壓到/usr/local路徑下: tar -zxvf  kafka_2.11-1.0.1.tgz分佈式

/bin  操做kafka的可執行腳本ide

/config 配置文件所在的目錄post

/libs 依賴庫目錄

/logs 日誌數據目錄。kafka把server端的日誌分爲: server, request, state, log-cleaner, controller

建立log目錄  cd /usr/local/kafka_2.11-1.0.1 && mkdir kafkaLogs

 

4、配置

一、配置zookeeper

二、kafka配置

進入config/server.properties

#broker的id,集羣中的每臺機器id惟一,其餘兩臺分別1和2
broker.id=0  
#是Kafka綁定的interface,這裏須要寫本機內網ip地址,否則bind端口失敗
#其餘兩臺分別是192.168.1.5和192.168.1.9
host.name=192.168.1.3 
#向zookeeper註冊的對外暴露的ip和port,118.212.149.51是192.168.1.3的外網ip地址
#若是不配置kafka部署在外網服務器的話本地是訪問不到的.
advertised.listeners=PLAINTEXT://118.212.149.51:9092 
#zk集羣的ip和port,zk集羣教程:
zookeeper.connect=192.168.1.3:2181,192.168.1.5:2181,192.168.1.9:2181
#log目錄,剛剛上邊建好的.
log.dirs=/usr/local/kafka_2.11-1.0.1/kafkaLogs

  

 

3、啓動Kafka

一、啓動Kafka

./kafka-server-start.sh  ../config/server.properties

 啓動Kafka,出現內存不夠。默認內存爲1G,若是設備內存比較小,修改配置

bin下面的kafka-server-start.sh,修改 

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
爲export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

 這裏啓動一個Kafka。若是是部署kafka集羣,分別啓動多個Kafak。

 

二、查看broker的Id

登陸Zookeeper

./zkCli.sh -server 127.0.0.1:2181

這裏的0是broker的id

 

查看broker信息

get /brokers/ids/0

 

 

 

 

4、建立主題

一、建立主題test1

/kafka-topics.sh  --create --zookeeper localhost:2181  --replication-factor 1 --partitions 1 --topic test1

 

查看全部的主題:

./kafka-topics.sh --list --zookeeper localhost:2181

 

5、查看topic詳細信息

./kafka-topics.sh  --describe --zookeeper localhost:2181

第一行topic信息摘要: 分別是topic名字(Topic), partition數量(PartitionCount), 副本數量(ReplicationFactor), 配置(Config)

第二行分別爲test1的topic全部partition。依次爲topic名字(Topic), partition號(Partition), 此partition所在的borker(Leader),副本所在的broker(Replicas),

Isr列表(同步狀態副本列表),通俗理解爲替補隊員。 不是每一個broker均可以做爲替補隊員。首先這個broker得存有副本,其次副本還得知足條件。

 

6、生產消息

./kafka-console-producer.sh  --broker-list  PLAINTEXT://47.xx.47.120:9092

 

 testTopic1 是主題名稱

 

7、消費消息

./kafka-console-consumer.sh --bootstrap-server PLAINTEXT://47.xx.xx.120:9092 --topic testTopic1 --from-beginning

 

 

 

8、JAVA實戰

一、引入依賴

		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.11</artifactId>
			<version>1.0.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>1.0.1</version>
		</dependency>

  

二、生成者

import java.util.Properties;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.Logger;

public class Producer {

    static Logger log = Logger.getLogger(Producer.class);

    private static final String TOPIC = "test";
    private static final String BROKER_LIST = "47.xx.xx.120:9092";
    private static KafkaProducer<String,String> producer = null;

    /*
    初始化生產者
     */
    static {
        Properties configs = initConfig();
        producer = new KafkaProducer<String, String>(configs);
    }

    /*
    初始化配置
     */
    private static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        return properties;
    }

    public static void main(String[] args) throws InterruptedException {
        //消息實體
        ProducerRecord<String , String> record = null;
        for (int i = 0; i < 5; i++) {
            record = new ProducerRecord<String, String>(TOPIC, "product value"+(int)(10*(Math.random())));
            //發送消息
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (null != e){
                        log.info("send error" + e.getMessage());
                    }else {
                        System.out.println(String.format("offset:%s,partition:%s",recordMetadata.offset(),recordMetadata.partition()));
                    }
                }
            });
        }
        producer.close();
    }


}

 啓動生產者,控制檯輸出

SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
2019-08-01 13:52:22.494  INFO   --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [47.xx.xx.120:9092]
	buffer.memory = 33554432
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	enable.idempotence = false
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 0
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

13:52:22.494 [main] INFO  o.a.k.c.producer.ProducerConfig - ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [47.xx.xx.120:9092]
	buffer.memory = 33554432
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	enable.idempotence = false
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 0
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2019-08-01 13:52:22.609  INFO   --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.1
13:52:22.609 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version : 1.0.1
2019-08-01 13:52:22.609  INFO   --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : c0518aa65f25317e
13:52:22.609 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId : c0518aa65f25317e
2019-08-01 13:52:23.174  INFO   --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
13:52:23.174 [main] INFO  o.a.k.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
offset:15,partition:0
offset:16,partition:0
offset:17,partition:0
offset:18,partition:0
offset:19,partition:0

Process finished with exit code 0

  

  

三、消費者

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.Logger;


public class Consumer {


    static Logger log = Logger.getLogger(Producer.class);

    private static final String TOPIC = "test";
    private static final String BROKER_LIST = "47.xx.xx.120:9092";
    private static KafkaConsumer<String,String> consumer = null;

    static {
        Properties configs = initConfig();
        consumer = new KafkaConsumer<String, String>(configs);
        consumer.subscribe(Arrays.asList(TOPIC));
    }

    private static Properties initConfig(){
        Properties properties = new Properties();
        properties.put("bootstrap.servers",BROKER_LIST);
        properties.put("group.id","0");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("enable.auto.commit", "true");
        properties.setProperty("auto.offset.reset", "earliest");
        return properties;
    }


    public static void main(String[] args) {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(10);
            for (ConsumerRecord<String, String> record : records) {
                log.info(record);
            }
        }
    }


}

 運行後,控制檯輸出

 

SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
2019-08-01 13:52:43.837  INFO   --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [47.xx.xx.120:9092]
	check.crcs = true
	client.id = 
	connections.max.idle.ms = 540000
	enable.auto.commit = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = 0
	heartbeat.interval.ms = 3000
	interceptor.classes = null
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 305000
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

13:52:43.837 [main] INFO  o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: 
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [47.xx.xx.120:9092]
	check.crcs = true
	client.id = 
	connections.max.idle.ms = 540000
	enable.auto.commit = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = 0
	heartbeat.interval.ms = 3000
	interceptor.classes = null
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 305000
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2019-08-01 13:52:43.972  INFO   --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.1
13:52:43.972 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version : 1.0.1
2019-08-01 13:52:43.972  INFO   --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : c0518aa65f25317e
13:52:43.972 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId : c0518aa65f25317e
2019-08-01 13:52:44.350  INFO   --- [           main] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=0] Discovered group coordinator 47.xx.xx.120:9092 (id: 2147483647 rack: null)
13:52:44.350 [main] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=0] Discovered group coordinator 47.xx.xx.120:9092 (id: 2147483647 rack: null)
2019-08-01 13:52:44.356  INFO   --- [           main] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=0] Revoking previously assigned partitions []
13:52:44.356 [main] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=0] Revoking previously assigned partitions []
2019-08-01 13:52:44.356  INFO   --- [           main] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=0] (Re-)joining group
13:52:44.356 [main] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=0] (Re-)joining group
2019-08-01 13:52:44.729  INFO   --- [           main] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=0] Successfully joined group with generation 3
13:52:44.729 [main] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=0] Successfully joined group with generation 3
2019-08-01 13:52:44.730  INFO   --- [           main] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=0] Setting newly assigned partitions [test-0]
13:52:44.730 [main] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=0] Setting newly assigned partitions [test-0]
2019-08-01 13:52:45.033  INFO   --- [           main] c.e.filesmanager.utils.kafka.Producer    : ConsumerRecord(topic = test, partition = 0, offset = 15, CreateTime = 1564638743167, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value6)
13:52:45.033 [main] INFO  c.e.f.utils.kafka.Producer - ConsumerRecord(topic = test, partition = 0, offset = 15, CreateTime = 1564638743167, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value6)
2019-08-01 13:52:45.033  INFO   --- [           main] c.e.filesmanager.utils.kafka.Producer    : ConsumerRecord(topic = test, partition = 0, offset = 16, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value7)
13:52:45.033 [main] INFO  c.e.f.utils.kafka.Producer - ConsumerRecord(topic = test, partition = 0, offset = 16, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value7)
2019-08-01 13:52:45.033  INFO   --- [           main] c.e.filesmanager.utils.kafka.Producer    : ConsumerRecord(topic = test, partition = 0, offset = 17, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value4)
13:52:45.033 [main] INFO  c.e.f.utils.kafka.Producer - ConsumerRecord(topic = test, partition = 0, offset = 17, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value4)
2019-08-01 13:52:45.033  INFO   --- [           main] c.e.filesmanager.utils.kafka.Producer    : ConsumerRecord(topic = test, partition = 0, offset = 18, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value4)
13:52:45.033 [main] INFO  c.e.f.utils.kafka.Producer - ConsumerRecord(topic = test, partition = 0, offset = 18, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value4)
2019-08-01 13:52:45.033  INFO   --- [           main] c.e.filesmanager.utils.kafka.Producer    : ConsumerRecord(topic = test, partition = 0, offset = 19, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value3)
13:52:45.033 [main] INFO  c.e.f.utils.kafka.Producer - ConsumerRecord(topic = test, partition = 0, offset = 19, CreateTime = 1564638743174, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = product value3)
  

  

 

 

 

參考: Kafka入門教程和JAVA客戶端使用

相關文章
相關標籤/搜索