kafka springboot (或 springcloud ) 整合


《SpringCloud Nginx 高併發核心編程》 環境搭建 - 系列

組件 連接地址
windows centos 虛擬機 安裝&排坑 vagrant+java+springcloud+redis+zookeeper鏡像下載(&製做詳解))
centos mysql 安裝&排坑 centos mysql 筆記(內含vagrant mysql 鏡像)
linux kafka安裝&排坑 kafka springboot (或 springcloud ) 整合
Linux openresty 安裝 Linux openresty 安裝
【必須】Linux Redis 安裝(帶視頻) Linux Redis 安裝(帶視頻)
【必須】Linux Zookeeper 安裝(帶視頻) Linux Zookeeper 安裝, 帶視頻
Windows Redis 安裝(帶視頻) Windows Redis 安裝(帶視頻)
RabbitMQ 離線安裝(帶視頻) RabbitMQ 離線安裝(帶視頻)
ElasticSearch 安裝, 帶視頻 ElasticSearch 安裝, 帶視頻
Nacos 安裝(帶視頻) Nacos 安裝(帶視頻)
【必須】Eureka Eureka 入門,帶視頻
【必須】springcloud Config 入門,帶視頻 springcloud Config 入門,帶視頻
【必須】SpringCloud 腳手架打包與啓動 SpringCloud腳手架打包與啓動
Linux 自啓動 假死自啓動 定時自啓 Linux 自啓動 假死啓動

1 Apache Kafka 簡介

Kafka是最初由Linkedin公司開發,是一個分佈式、分區的、多副本的、多訂閱者,基於zookeeper協調的分佈式日誌系統(也能夠當作MQ系統),常見能夠用於web/nginx日誌、訪問日誌,消息服務等等,Linkedin於2010年貢獻給了Apache基金會併成爲頂級開源項目。mysql

img

2 Apache Kafka 安裝

1.1 - 驗證Java是否安裝

但願你已經在你的機器上安裝了java,因此你只需使用下面的命令驗證它。linux

$ java -version

若是java在您的機器上成功安裝,您能夠看到已安裝的Java的版本。nginx

Linux 安裝jdk

這裏須要安裝1.8以上版本web

第一步:用java -version於查看是否安裝了jdk, 若是版本是對的, 則不須要重複安裝面試

第二步:下載須要安裝的linux版本redis

JDK1.8安裝包在Oracle官網的下載路徑爲:spring

https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

下載以前,須要註冊Oracle帳號。

第三步: 將安裝包上傳到Linux,而且解壓

上傳到Linux服務器,建立JDK的安裝目錄,將jdk壓縮包解壓到安裝目錄

mkdir -p /usr/local/java

tar -zxvf /usr/local/jdk-8u121-linux-x64.tar.gz -C /usr/local/java

爲了方便後續的使用,和JDK版本的升級,能夠爲JDK創建一個統一的軟鏈接 /usr/jdk,命令以下:

ln -s /usr/local/java/jdk1.8.0_121/ /usr/jdk

第四步驟:檢查而且升級Linux的glibc 核心源碼包

在linux 上運行jdk 1.7及以上版本,會依賴到glibc 核心源碼包版本,其版本必須在2.4或以上。 能夠經過如下命令,查看glibc核心源碼包的版本。

rpm -qi glibc

若是版本低於2.4,使用如下命令進行安裝

yum install glibc.i686

第四步:配置JDK的環境變量,而且加載環境變量

編輯linux系統配置文件

vi /etc/profile

在最後一行,追加JDK的環境變量、全局類路徑配置

export JAVA_HOME=/usr/local/java/jdk1.8.0_121

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export PATH=$JAVA_HOME/bin:$PATH

添加完成後,還須要加載修改完的linux配置文件,執行下面的指令:

source /etc/profile

第六步:查看JDK是否安裝成功

使用java -version 命令,若是看到以下的輸出,則表示JDK的安裝,已經成功:

[root@localhost local]# java -version

java version "1.8.0_121"

Java(TM) SE Runtime Environment (build 1.8.0_121-b12)

Java HotSpot(TM) 64-Bit Server VM (build 25.161-b12, mixed mode)

1.2 - 驗證ZooKeeper是否安裝

  • Apache Kafka 的運行依賴了ZooKeeper,因此安裝前,須要檢查ZooKeeper是否已經安裝

  • 驗證ZooKeeper安裝命令爲:

/work/zookeeper/zookeeper-1/bin/zkServer.sh  status

具體的結果以下:

[root@localhost work]# /work/zookeeper/zookeeper-1/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /work/zookeeper/zookeeper-1/bin/../conf/zoo.cfg
Mode: follower
[root@localhost work]# /work/zookeeper/zookeeper-2/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /work/zookeeper/zookeeper-2/bin/../conf/zoo.cfg
Mode: leader

下載kafka

在這裏插入圖片描述

下載地址爲:

http://kafka.apache.org/downloads , 瘋狂創客圈網盤也已經備好

建議下載1.1之前的版本,若是kafka_2.11-1.0.2, 安裝的時候問題比較少, 而後將kafka 安裝包上傳到 虛擬機

在這裏插入圖片描述

3 單節點安裝

步驟3.2 - 解壓tar文件

如今您已經在您的機器上下載了最新版本的Kafka, 使用如下命令提取tar文件, 也就是解壓縮 -

$ cd /work/
$ tar -zxvf kafka_2.11-1.0.2.tgz
$ cd kafka_2.11-1.0.2
[root@localhost kafka_2.11-1.0.2]# ll
total 52
drwxr-xr-x 3 root root  4096 Apr  7  2020 bin
drwxr-xr-x 2 root root  4096 Apr  7  2020 config
drwxr-xr-x 2 root root  4096 Nov 23 22:23 libs
-rw-r--r-- 1 root root 32216 Apr  7  2020 LICENSE
-rw-r--r-- 1 root root   337 Apr  7  2020 NOTICE
drwxr-xr-x 2 root root  4096 Apr  7  2020 site-docs

步驟3.2 - 建立日誌目錄與環境變量

[root@localhost ~]#  cd /work/kafka_2.11-1.0.2/

[root@localhostkafka_2.11-1.0.2]#  mkdir -p logs/kafka1-logs

建立環境變量 vi /etc/profile

export KAFKA_HOME=/work/kafka_2.11-1.0.2

修改配置文件:

進入kafka的config目錄下,有一個server.properties,主要修改的地方以下:

broker的全局惟一編號,不能重複
broker.id=1
監聽
listeners=PLAINTEXT://192.168.233.128:9092

advertised.listeners=PLAINTEXT://192.168.233.128:9092

日誌目錄
log.dirs=/work/kafka_2.11-1.0.2/logs/kafka1-logs
配置zookeeper的鏈接(若是不是本機,須要該爲ip或主機名)
zookeeper.connect=localhost:2181

vi /work/kafka_2.11-1.0.2/config/server.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.233.128:9092


# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/work/kafka_2.11-1.0.2/logs/kafka1-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

啓動Kafka 而且測試

$ nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties  2>&1 &

打印的日誌信息沒有報錯,能夠看到以下信息

[root@localhost ~]#  $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
[2020-11-25 21:59:42,557] INFO KafkaConfig values:
        advertised.host.name = null
        advertised.listeners = null
        advertised.port = null
        alter.config.policy.class.name = null
        authorizer.class.name =
        auto.create.topics.enable = true
        auto.leader.rebalance.enable = true
        background.threads = 10
        broker.id = 1
        broker.id.generation.enable = true
        broker.rack = null
        compression.type = producer
        connections.max.idle.ms = 600000
        controlled.shutdown.enable = true
        controlled.shutdown.max.retries = 3
        controlled.shutdown.retry.backoff.ms = 5000
        controller.socket.timeout.ms = 30000
        create.topic.policy.class.name = null
        default.replication.factor = 1
        delete.records.purgatory.purge.interval.requests = 1
        delete.topic.enable = true
        fetch.purgatory.purge.interval.requests = 1000
        group.initial.rebalance.delay.ms = 0
        group.max.session.timeout.ms = 300000
        group.min.session.timeout.ms = 6000
        host.name =
        inter.broker.listener.name = null
        inter.broker.protocol.version = 1.0-IV0
        leader.imbalance.check.interval.seconds = 300
        leader.imbalance.per.broker.percentage = 10
        listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
        listeners = PLAINTEXT://192.168.233.128:9092
        log.cleaner.backoff.ms = 15000
        log.cleaner.dedupe.buffer.size = 134217728
        log.cleaner.delete.retention.ms = 86400000
        log.cleaner.enable = true
        log.cleaner.io.buffer.load.factor = 0.9
        log.cleaner.io.buffer.size = 524288
        log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
        log.cleaner.min.cleanable.ratio = 0.5
        log.cleaner.min.compaction.lag.ms = 0
        log.cleaner.threads = 1
        log.cleanup.policy = [delete]
        log.dir = /tmp/kafka-logs
        log.dirs = /work/kafka_2.11-1.0.2/logs/kafka1-logs
        log.flush.interval.messages = 9223372036854775807
        log.flush.interval.ms = null
        log.flush.offset.checkpoint.interval.ms = 60000
        log.flush.scheduler.interval.ms = 9223372036854775807
        log.flush.start.offset.checkpoint.interval.ms = 60000
        log.index.interval.bytes = 4096
        log.index.size.max.bytes = 10485760
        log.message.format.version = 1.0-IV0
        log.message.timestamp.difference.max.ms = 9223372036854775807
        log.message.timestamp.type = CreateTime
        log.preallocate = false
        log.retention.bytes = -1
        log.retention.check.interval.ms = 300000
        log.retention.hours = 168
        log.retention.minutes = null
        log.retention.ms = null
        log.roll.hours = 168
        log.roll.jitter.hours = 0
        log.roll.jitter.ms = null
        log.roll.ms = null
        log.segment.bytes = 1073741824
        log.segment.delete.delay.ms = 60000
        max.connections.per.ip = 2147483647
        max.connections.per.ip.overrides =
        message.max.bytes = 1000012
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        min.insync.replicas = 1
        num.io.threads = 8
        num.network.threads = 3
        num.partitions = 1
        num.recovery.threads.per.data.dir = 1
        num.replica.fetchers = 1
        offset.metadata.max.bytes = 4096
        offsets.commit.required.acks = -1
        offsets.commit.timeout.ms = 5000
        offsets.load.buffer.size = 5242880
        offsets.retention.check.interval.ms = 600000
        offsets.retention.minutes = 1440
        offsets.topic.compression.codec = 0
        offsets.topic.num.partitions = 50
        offsets.topic.replication.factor = 1
        offsets.topic.segment.bytes = 104857600
        port = 9092
        principal.builder.class = null
        producer.purgatory.purge.interval.requests = 1000
        queued.max.request.bytes = -1
        queued.max.requests = 500
        quota.consumer.default = 9223372036854775807
        quota.producer.default = 9223372036854775807
        quota.window.num = 11
        quota.window.size.seconds = 1
        replica.fetch.backoff.ms = 1000
        replica.fetch.max.bytes = 1048576
        replica.fetch.min.bytes = 1
        replica.fetch.response.max.bytes = 10485760
        replica.fetch.wait.max.ms = 500
        replica.high.watermark.checkpoint.interval.ms = 5000
        replica.lag.time.max.ms = 10000
        replica.socket.receive.buffer.bytes = 65536
        replica.socket.timeout.ms = 30000
        replication.quota.window.num = 11
        replication.quota.window.size.seconds = 1
        request.timeout.ms = 30000
        reserved.broker.max.id = 1000
        sasl.enabled.mechanisms = [GSSAPI]
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.principal.to.local.rules = [DEFAULT]
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism.inter.broker.protocol = GSSAPI
        security.inter.broker.protocol = PLAINTEXT
        socket.receive.buffer.bytes = 102400
        socket.request.max.bytes = 104857600
        socket.send.buffer.bytes = 102400
        ssl.cipher.suites = null
        ssl.client.auth = none
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
        transaction.max.timeout.ms = 900000
        transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
        transaction.state.log.load.buffer.size = 5242880
        transaction.state.log.min.isr = 1
        transaction.state.log.num.partitions = 50
        transaction.state.log.replication.factor = 1
        transaction.state.log.segment.bytes = 104857600
        transactional.id.expiration.ms = 604800000
        unclean.leader.election.enable = false
        zookeeper.connect = localhost:2181
        zookeeper.connection.timeout.ms = 6000
        zookeeper.session.timeout.ms = 6000
        zookeeper.set.acl = false
        zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)
[2020-11-25 21:59:42,694] INFO starting (kafka.server.KafkaServer)
[2020-11-25 21:59:42,699] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2020-11-25 21:59:42,878] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2020-11-25 21:59:42,886] INFO Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,886] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,886] INFO Client environment:java.version=1.8.0_11 (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,886] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,886] INFO Client environment:java.home=/work/java/jdk1.8.0_11/jre (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,886] INFO Client environment:java.class.path=.:/work/java/jdk1.8.0_11/lib/dt.jar:/work/java/jdk1.8.0_11/lib/tools.jar:/work/kafka_2.11-1.0.2/bin/../libs/aopalliance-repackaged-2.5.0-b32.jar:/work/kafka_2.11-1.0.2/bin/../libs/argparse4j-0.7.0.jar:/work/kafka_2.11-1.0.2/bin/../libs/commons-lang3-3.5.jar:/work/kafka_2.11-1.0.2/bin/../libs/connect-api-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/connect-file-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/connect-json-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/connect-runtime-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/connect-transforms-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/guava-20.0.jar:/work/kafka_2.11-1.0.2/bin/../libs/hk2-api-2.5.0-b32.jar:/work/kafka_2.11-1.0.2/bin/../libs/hk2-locator-2.5.0-b32.jar:/work/kafka_2.11-1.0.2/bin/../libs/hk2-utils-2.5.0-b32.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-annotations-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-core-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-databind-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-jaxrs-base-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-jaxrs-json-provider-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-module-jaxb-annotations-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/javassist-3.20.0-GA.jar:/work/kafka_2.11-1.0.2/bin/../libs/javassist-3.21.0-GA.jar:/work/kafka_2.11-1.0.2/bin/../libs/javax.annotation-api-1.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/javax.inject-1.jar:/work/kafka_2.11-1.0.2/bin/../libs/javax.inject-2.5.0-b32.jar:/work/kafka_2.11-1.0.2/bin/../libs/javax.servlet-api-3.1.0.jar:/work/kafka_2.11-1.0.2/bin/../libs/javax.ws.rs-api-2.0.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-client-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-common-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-container-servlet-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-container-servlet-core-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-guava-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-media-jaxb-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-server-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-continuation-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-http-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-io-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-security-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-server-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-servlet-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-servlets-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-util-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jopt-simple-5.0.4.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka_2.11-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka_2.11-1.0.2-sources.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka_2.11-1.0.2-test-sources.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka-clients-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka-log4j-appender-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka-streams-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka-streams-examples-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka-tools-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/log4j-1.2.17.jar:/work/kafka_2.11-1.0.2/bin/../libs/lz4-java-1.4.jar:/work/kafka_2.11-1.0.2/bin/../libs/maven-artifact-3.5.0.jar:/work/kafka_2.11-1.0.2/bin/../libs/metrics-core-2.2.0.jar:/work/kafka_2.11-1.0.2/bin/../libs/osgi-resource-locator-1.0.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/plexus-utils-3.0.24.jar:/work/kafka_2.11-1.0.2/bin/../libs/reflections-0.9.11.jar:/work/kafka_2.11-1.0.2/bin/../libs/rocksdbjni-5.7.3.jar:/work/kafka_2.11-1.0.2/bin/../libs/scala-library-2.11.12.jar:/work/kafka_2.11-1.0.2/bin/../libs/slf4j-api-1.7.25.jar:/work/kafka_2.11-1.0.2/bin/../libs/slf4j-log4j12-1.7.25.jar:/work/kafka_2.11-1.0.2/bin/../libs/snappy-java-1.1.4.jar:/work/kafka_2.11-1.0.2/bin/../libs/validation-api-1.1.0.Final.jar:/work/kafka_2.11-1.0.2/bin/../libs/zkclient-0.10.jar:/work/kafka_2.11-1.0.2/bin/../libs/zookeeper-3.4.10.jar (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:os.version=3.10.0-123.el7.x86_64 (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:user.dir=/root (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,888] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@481a996b (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,991] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
[2020-11-25 21:59:42,999] INFO Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2020-11-25 21:59:43,012] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2020-11-25 21:59:43,086] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x1006049103f0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2020-11-25 21:59:43,094] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2020-11-25 21:59:44,369] INFO Cluster ID = 4MOhHbbzS42FdvekFfLwTQ (kafka.server.KafkaServer)
[2020-11-25 21:59:44,381] WARN No meta.properties file under dir /work/kafka_2.11-1.0.2/logs/kafka1-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2020-11-25 21:59:44,412] INFO [ThrottledRequestReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2020-11-25 21:59:44,429] INFO [ThrottledRequestReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2020-11-25 21:59:44,442] INFO [ThrottledRequestReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2020-11-25 21:59:44,541] INFO Loading logs. (kafka.log.LogManager)
[2020-11-25 21:59:44,547] INFO Logs loading complete in 6 ms. (kafka.log.LogManager)
[2020-11-25 21:59:45,086] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2020-11-25 21:59:45,095] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2020-11-25 21:59:45,394] INFO Awaiting socket connections on 192.168.233.128:9092. (kafka.network.Acceptor)
[2020-11-25 21:59:45,399] INFO [SocketServer brokerId=1] Started 1 acceptor threads (kafka.network.SocketServer)
[2020-11-25 21:59:45,422] INFO [ExpirationReaper-1-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-25 21:59:45,423] INFO [ExpirationReaper-1-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-25 21:59:45,427] INFO [ExpirationReaper-1-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-25 21:59:45,438] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
[2020-11-25 21:59:45,646] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2020-11-25 21:59:45,648] INFO [ExpirationReaper-1-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-25 21:59:45,651] INFO [ExpirationReaper-1-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-25 21:59:45,658] INFO [ExpirationReaper-1-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-25 21:59:45,698] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2020-11-25 21:59:45,701] INFO [GroupCoordinator 1]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2020-11-25 21:59:45,701] INFO [GroupCoordinator 1]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2020-11-25 21:59:45,705] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-11-25 21:59:45,718] INFO [ProducerId Manager 1]: Acquired new producerId block (brokerId:1,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1 (kafka.coordinator.transaction.ProducerIdManager)
[2020-11-25 21:59:45,741] INFO [TransactionCoordinator id=1] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2020-11-25 21:59:45,771] INFO [TransactionCoordinator id=1] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2020-11-25 21:59:45,774] INFO [Transaction Marker Channel Manager 1]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2020-11-25 21:59:45,807] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2020-11-25 21:59:45,811] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2020-11-25 21:59:45,812] INFO Registered broker 1 at path /brokers/ids/1 with addresses: EndPoint(192.168.233.128,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
[2020-11-25 21:59:45,813] WARN No meta.properties file under dir /work/kafka_2.11-1.0.2/logs/kafka1-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2020-11-25 21:59:45,893] INFO [SocketServer brokerId=1] Started processors for 1 acceptors (kafka.network.SocketServer)
[2020-11-25 21:59:45,894] INFO Kafka version : 1.0.2 (org.apache.kafka.common.utils.AppInfoParser)
[2020-11-25 21:59:45,894] INFO Kafka commitId : 2a121f7b1d402825 (org.apache.kafka.common.utils.AppInfoParser)
[2020-11-25 21:59:45,895] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)

測試kafka

可是並不能保證Kafka已經啓動成功,輸入jps查看進程,若是能夠看到Kafka進程,表示啓動成功

[hadoop@Master ~]$ jps
9173 Kafka
9462 Jps
8589 QuorumPeerMain
[hadoop@Master ~]$ jps -m
9472 Jps -m
9173 Kafka /opt/kafka/config/server.properties
8589 QuorumPeerMain /opt/zookeeper/bin/../conf/zoo.cfg

建立topic

[hadoop@Master ~]$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper 192.168.233.128:2181 --replication-factor 1 --partitions 1 --topic test

參數說明:
– zookeeper:指定kafka鏈接zk的鏈接url,該值和server.properties文件中的配置項{zookeeper.connect}同樣

這裏爲 192.168.233.128:2181

– replication-factor:指定副本數量
– partitions:指定分區數量
– topic:主題名稱

[root@localhost ~]# $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper 192.168.233.128:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

查看全部的topic信息

[hadoop@Master ~]$  $KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper 192.168.233.128:2181

結果以下;

[root@localhost ~]#  $KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper 192.168.233.128:9092 
test

啓動測試生產者

[hadoop@Master ~]$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list 192.168.233.128:9092  --topic test

注意,命令中的端口,是kafka的端口

執行上述命令後,就會在控制檯等待鍵入消息體,直接輸入消息值(value)便可,每行(以換行符分隔)表示一條消息,以下所示。

>Hello Kafka!
>你好 kafka!

正常狀況,每次回車表示觸發「發送」操做,回車後可直接使用「Ctrl + c」退出生產者控制檯,再使用 kafka-console-consumer.sh 腳本驗證本次的生產狀況。

啓動測試消費者

[hadoop@Master ~]$ $KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper 192.168.233.128:9092 --topic test --from-beginning

注意:

  • 1 命令中的端口,是zookeeper 的端口

  • –from-beginning參數若是有表示從最開始消費數據,舊的和新的數據都會被消費,而沒有該參數表示只會消費新產生的數據

執行效果

發送端的執行效果

[root@localhost ~]# $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list 192.168.233.128:9092  --topic test
>aaa bbbb
>ccc fff
>Hello Kafka!
>你好 kafka!
>

接收端的執行效果

[root@localhost ~]# $KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper 192.168.233.128:2181 --topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
aaa bbbb
ccc fff
Hello Kafka!
你好 kafka!

4 集羣模式 節點安裝

  • config/server.properties複製三份,分別命名爲server1.properties,server2.properties,server3.properties
  • 修改server1.properties
- - broker.id=1
  - listeners=PLAINTEXT://:9092
  - advertised.listeners=PLAINTEXT://192.168.233.128:9092(其中192.168.233.128是我本機的ip)
  - log.dirs=/work/kafka_2.11-1.0.2/logs/kafka1-logs
  - zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
  • 同理,修改server2.properties
- - broker.id=2
  - listeners=PLAINTEXT://:9093
  - advertised.listeners=PLAINTEXT://192.168.233.128:9093(其中192.168.233.128是我本機的ip)
  - log.dirs=/work/kafka_2.11-1.0.2/logs/kafka2-logs
  - zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
  • 同理,修改server3.properties
- - broker.id=3
  - listeners=PLAINTEXT://:9094
  - advertised.listeners=PLAINTEXT://192.168.233.128:9094(其中192.168.233.128是我本機的ip)
  - log.dirs=/work/kafka_2.11-1.0.2/logs/kafka3-logs
  - zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
  • 而後執行如下命令
nohup /work/kafka_2.11-1.0.2/bin/kafka-server-start.sh /work/kafka_2.11-1.0.2/config/server3.properties > /work/kafka_2.11-1.0.2/logs/kafka3-logs/startup.log 2>&1 &
nohup /work/kafka_2.11-1.0.2/bin/kafka-server-start.sh /work/kafka_2.11-1.0.2/config/server2.properties > /work/kafka_2.11-1.0.2/logs/kafka2-logs/startup.log 2>&1 &
nohup /work/kafka_2.11-1.0.2/bin/kafka-server-start.sh /work/kafka_2.11-1.0.2/config/server1.properties > /work/kafka_2.11-1.0.2/logs/kafka1-logs/startup.log 2>&1 &
  • 經過startup.log,或者同級目錄下的server.log查看是否有報錯便可。

5 消息系統的類型

一個消息系統負責將數據從一個應用傳遞到另一個應用,應用只需關注於數據,無需關注數據在兩個或多個應用間是如何傳遞的。分佈式消息傳遞基於可靠的消息隊列,在客戶端應用和消息系統之間異步傳遞消息。有兩種主要的消息傳遞模式:點對點傳遞模式、發佈-訂閱模式

5.1 點對點消息傳遞模式

在點對點消息系統中,消息持久化到一個隊列中。此時,將有一個或多個消費者消費隊列中的數據。可是一條消息只能被消費一次。當一個消費者消費了隊列中的某條數據以後,該條數據則從消息隊列中刪除。該模式即便有多個消費者同時消費數據,也能保證數據處理的順序。這種架構描述示意圖以下:

img

生產者發送一條消息到queue,只有一個消費者能收到

5.2 發佈-訂閱消息傳遞模式(kafka)

在發佈-訂閱消息系統中,消息被持久化到一個topic中。與點對點消息系統不一樣的是,消費者能夠訂閱一個或多個topic,消費者能夠消費該topic中全部的數據,同一條數據能夠被多個消費者消費,數據被消費後不會立馬刪除。在發佈-訂閱消息系統中,消息的生產者稱爲發佈者,消費者稱爲訂閱者。該模式的示例圖以下:

img

發佈者發送到topic的消息,只有訂閱了topic的訂閱者纔會收到消息

 如上圖所示,發佈訂閱模式是一個基於消息送的消息傳送模型,改模型能夠有多種不一樣的訂閱者。生產者將消息放入消息隊列後,隊列會將消息推送給訂閱過該類消息的消費者(相似微信公衆號)。

大部分的消息系統選用發佈-訂閱模式。Kafka就是一種發佈-訂閱模式

六、Kafka中的術語解釋

6.1 概述

在深刻理解Kafka以前,先介紹一下Kafka中的術語。下圖展現了Kafka的相關術語以及之間的關係:

img

上圖中, 一個topic配置了3個partition。集羣中的每一個broker存儲一個或多個partition。

Partition1有兩個offset:0和1。Partition2有4個offset。Partition3有1個offset。副本的id和副本所在的機器的id剛好相同。

若是一個topic的副本數爲3,那麼Kafka將在集羣中爲每一個partition建立3個相同的副本。多個producer和consumer可同時生產和消費數據。

6.2 broker

Kafka 集羣包含一個或多個服務器,服務器節點稱爲broker。

broker存儲topic的數據。若是某topic有N個partition,集羣有N個broker,那麼每一個broker存儲該topic的一個partition。

若是某topic有N個partition,集羣有(N+M)個broker,那麼其中有N個broker存儲該topic的一個partition,剩下的M個broker不存儲該topic的partition數據。

若是某topic有N個partition,集羣中broker數目少於N個,那麼一個broker存儲該topic的一個或多個partition。在實際生產環境中,儘可能避免這種狀況的發生,這種狀況容易致使Kafka集羣數據不均衡。

6.3 Topic

每條發佈到Kafka集羣的消息都有一個類別,這個類別被稱爲Topic。(物理上不一樣Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic便可生產或消費數據而沒必要關心數據存於何處)

相似於數據庫的表名

6.3 Partition

topic中的數據分割爲一個或多個partition。每一個topic至少有一個partition。每一個partition中的數據使用多個segment文件存儲。partition中的數據是有序的,不一樣partition間的數據丟失了數據的順序。若是topic有多個partition,消費數據時就不能保證數據的順序。在須要嚴格保證消息的消費順序的場景下,須要將partition數目設爲1。

6.4 Producer

生產者即數據的發佈者,該角色將消息發佈到Kafka的topic中。broker接收到生產者發送的消息後,broker將該消息追加到當前用於追加數據的segment文件中。生產者發送的消息,存儲到一個partition中,生產者也能夠指定數據存儲的partition。

6.5 Consumer

消費者能夠從broker中讀取數據。消費者能夠消費多個topic中的數據。

6.6 Consumer Group

每一個Consumer屬於一個特定的Consumer Group(可爲每一個Consumer指定group name,若不指定group name則屬於默認的group)。

6.7 Leader

每一個partition有多個副本,其中有且僅有一個做爲Leader,Leader是當前負責數據的讀寫的partition。

6.8 Follower

Follower跟隨Leader,全部寫請求都經過Leader路由,數據變動會廣播給全部Follower,Follower與Leader保持數據同步。若是Leader失效,則從Follower中選舉出一個新的Leader。當Follower與Leader掛掉、卡住或者同步太慢,leader會把這個follower從「in sync replicas」(ISR)列表中刪除,從新建立一個Follower。

七、經常使用Message Queue對比

7.1 RabbitMQ

RabbitMQ是使用Erlang編寫的一個開源的消息隊列,自己支持不少的協議:AMQP,XMPP, SMTP, STOMP,也正因如此,它很是重量級,更適合於企業級的開發。同時實現了Broker構架,這意味着消息在發送給客戶端時先在中心隊列排隊。對路由,負載均衡或者數據持久化都有很好的支持。

7.2 Redis

Redis是一個基於Key-Value對的NoSQL數據庫,開發維護很活躍。雖然它是一個Key-Value數據庫存儲系統,但它自己支持MQ功能,因此徹底能夠當作一個輕量級的隊列服務來使用。對於RabbitMQ和Redis的入隊和出隊操做,各執行100萬次,每10萬次記錄一次執行時間。測試數據分爲128Bytes、512Bytes、1K和10K四個不一樣大小的數據。實驗代表:入隊時,當數據比較小時Redis的性能要高於RabbitMQ,而若是數據大小超過了10K,Redis則慢的沒法忍受;出隊時,不管數據大小,Redis都表現出很是好的性能,而RabbitMQ的出隊性能則遠低於Redis。

7.3 ZeroMQ

ZeroMQ號稱最快的消息隊列系統,尤爲針對大吞吐量的需求場景。ZeroMQ可以實現RabbitMQ不擅長的高級/複雜的隊列,可是開發人員須要本身組合多種技術框架,技術上的複雜度是對這MQ可以應用成功的挑戰。ZeroMQ具備一個獨特的非中間件的模式,你不須要安裝和運行一個消息服務器或中間件,由於你的應用程序將扮演這個服務器角色。你只須要簡單的引用ZeroMQ程序庫,可使用NuGet安裝,而後你就能夠愉快的在應用程序之間發送消息了。可是ZeroMQ僅提供非持久性的隊列,也就是說若是宕機,數據將會丟失。其中,Twitter的Storm 0.9.0之前的版本中默認使用ZeroMQ做爲數據流的傳輸(Storm從0.9版本開始同時支持ZeroMQ和Netty做爲傳輸模塊)。

7.4 ActiveMQ

ActiveMQ是Apache下的一個子項目。 相似於ZeroMQ,它可以以代理人和點對點的技術實現隊列。同時相似於RabbitMQ,它少許代碼就能夠高效地實現高級應用場景。

7.5 Kafka/Jafka

Kafka是Apache下的一個子項目,是一個高性能跨語言分佈式發佈/訂閱消息隊列系統,而Jafka是在Kafka之上孵化而來的,即Kafka的一個升級版。具備如下特性:快速持久化,能夠在O(1)的系統開銷下進行消息持久化;高吞吐,在一臺普通的服務器上既能夠達到10W/s的吞吐速率;徹底的分佈式系統,Broker、Producer、Consumer都原生自動支持分佈式,自動實現負載均衡;支持Hadoop數據並行加載,對於像Hadoop的同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka經過Hadoop的並行加載機制統一了在線和離線的消息處理。Apache Kafka相對於ActiveMQ是一個很是輕量級的消息系統,除了性能很是好以外,仍是一個工做良好的分佈式系統。

8 Kafka的開發

8.1開發簡單的Kafka 應用程序

簡單的發送端代碼

package test;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class SimpleProducer {
        private static Producer<Integer,String> producer;
        private final Properties props=new Properties();
        public SimpleProducer(){
                //定義鏈接的broker list
                props.put("metadata.broker.list", "192.168.1.216:9092");
                //定義序列化類 Java中對象傳輸以前要序列化
                props.put("serializer.class", "kafka.serializer.StringEncoder");
                producer = new Producer<Integer, String>(new ProducerConfig(props));
        }
        public static void main(String[] args) {
                SimpleProducer sp=new SimpleProducer();
                //定義topic
                String topic="mytopic";

                //定義要發送給topic的消息
                String messageStr = "This is a message";

                //構建消息對象
                KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr);

                //推送消息到broker
                producer.send(data);
                producer.close();
        }
}

kafka單機環境端口就是kafka broker端口9092,這裏定義topic爲mytopic固然能夠本身隨便定義不用考慮服務器是否建立,對於發送消息的話上面代碼是簡單的單條發送,若是發送數據量很大的話send方法屢次推送會耗費時間,因此建議把data數據按必定量分組放到List中,最後send一下AarrayList便可,這樣速度會大幅度提升

簡單的Kafka 接收端代碼

package test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class SimpleHLConsumer {
        private final ConsumerConnector consumer;
        private final String topic;

        public SimpleHLConsumer(String zookeeper, String groupId, String topic) {
                Properties props = new Properties();
                //定義鏈接zookeeper信息
                props.put("zookeeper.connect", zookeeper);
                //定義Consumer全部的groupID
                props.put("group.id", groupId);
                props.put("zookeeper.session.timeout.ms", "500");
                props.put("zookeeper.sync.time.ms", "250");
                props.put("auto.commit.interval.ms", "1000");
                consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
                this.topic = topic;
        }

        public void testConsumer() {
                Map<String, Integer> topicCount = new HashMap<String, Integer>();
                //定義訂閱topic數量
                topicCount.put(topic, new Integer(1));
                //返回的是全部topic的Map
                Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
                //取出咱們要須要的topic中的消息流
                List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
                for (final KafkaStream stream : streams) {
                        ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
                        while (consumerIte.hasNext())
                                System.out.println("Message from Topic :" + new String(consumerIte.next().message()));
                }
                if (consumer != null)
                        consumer.shutdown();
        }

        public static void main(String[] args) {
                String topic = "mytopic";
                SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("192.168.233.128:2181/kafka", "testgroup", topic);
                simpleHLConsumer.testConsumer();
        }

}

消費者代碼主要邏輯就是對生產者發送過來的數據作簡單處理和輸出,注意這裏的地址是zookeeper的地址而且包括節點/kafka,topic名稱要一致

8.2開發 通用kafka模塊

開發一個通用的kafka模塊發送和接收模塊,其餘的模塊,只須要調用該kafka模塊統一的發送接口和開發接收邏輯便可。

能夠經過數據庫,進行 微服務Provider、 訂閱主題 topic、訂閱組 group 配置。 服務在消息後,自動就那些配置的接收類的回調。

庫表的配置以下:
在這裏插入圖片描述

該kafka模塊處於 瘋狂創客圈的 Crazy-SpringCloud腳手架中, 模塊名稱爲 base-kafka ,啓動以後的swagger 界面以下:

在這裏插入圖片描述

能夠經過該接口發送某個topic的消息,若是在數據庫裏配置了訂閱關係,若是 provider-name( 微服務名稱) 訂閱了 test 主題,而且配置了消息的回調類和方法, 那麼就會就會進行消息的消費。

消費的界面以下:

在這裏插入圖片描述

9 Kafka 原理

來看看生產者和消費者、主題和組之間的關係:

若是看到這張圖你很懵逼,木有關係!咱們先來分析相關概念
  Producer:Producer即生產者,消息的產生者,是消息的入口。

  kafka cluster
  

 Broker:Broker是kafka實例,每一個服務器上有一個或多個kafka的實例,咱們姑且認爲每一個broker對應一臺服務器。每一個kafka集羣內的broker都有一個不重複的編號,如圖中的broker-0、broker-1等……
  

 Topic:消息的主題,能夠理解爲消息的分類,kafka的數據就保存在topic。在每一個broker上均可以建立多個topic。
  

 Partition:Topic的分區,每一個topic能夠有多個分區,分區的做用是作負載,提升kafka的吞吐量。同一個topic在不一樣的Partition分區的數據是不重複的,partition的表現形式就是一個一個的文件夾!
   

 Replication:每個分區都有多個副本,副本的做用是作備胎。當主分區(Leader)故障的時候會選擇一個備胎(Follower)上位,成爲Leader。在kafka中默認副本的最大數量是10個,且副本的數量不能大於Broker的數量,follower和leader絕對是在不一樣的機器,同一機器對同一個分區也只可能存放一個副本(包括本身)。

Message:每一條發送的消息主體。
 

 Consumer:消費者,即消息的消費方,是消息的出口。

  Consumer Group:咱們能夠將多個消費組組成一個消費者組,在kafka的設計中,同一個Partition分區的數據只能被消費者組中的某一個消費者消費。同一個消費者組的消費者能夠消費同一個topic的不一樣分區的數據,這也是爲了提升kafka的吞吐量!

  Zookeeper:kafka集羣依賴zookeeper來保存集羣的的元信息,來保證系統的可用性。

要點1:同一個topic在不一樣的Partition分區的數據是不重複的

要點2:同一個Partition分區的數據只能被消費者組中的某一個消費者消費

工做流程分析

  上面介紹了kafka的基礎架構及基本概念,不知道你們看完有沒有對kafka有個大體印象,若是對還比較懵也不要緊!咱們接下來再結合上面的結構圖分析kafka的工做流程,最後再回來整個梳理一遍我相信你會更有收穫!

發送數據

  咱們看上面的架構圖中,producer就是生產者,是數據的入口。注意看圖中的紅色箭頭,Producer在寫入數據的時候永遠的找leader,不會直接將數據寫入follower!那leader怎麼找呢?寫入的流程又是什麼樣的呢?咱們看下圖:

  img

 

發送的流程就在圖中已經說明了,就不單獨在文字列出來了!須要注意的一點是,消息寫入leader後,follower是主動的去leader進行同步的!producer採用push模式將數據發佈到broker,每條消息追加到分區中,順序寫入磁盤,因此保證同一分區內的數據是有序的!寫入示意圖以下:

  img

  上面說到數據會寫入到不一樣的分區,那kafka爲何要作分區呢?相信你們應該也能猜到,分區的主要目的是:
  一、 方便擴展。由於一個topic能夠有多個partition,因此咱們能夠經過擴展機器去輕鬆的應對日益增加的數據量。
  二、 提升併發。以partition爲讀寫單位,能夠多個消費者同時消費數據,提升了消息的處理效率。

  熟悉負載均衡的朋友應該知道,當咱們向某個服務器發送請求的時候,服務端可能會對請求作一個負載,將流量分發到不一樣的服務器,那在kafka中,若是某個topic有多個partition,producer又怎麼知道該將數據發往哪一個partition呢?kafka中有幾個原則:
  一、 partition在寫入的時候能夠指定須要寫入的partition,若是有指定,則寫入對應的partition。
  二、 若是沒有指定partition,可是設置了數據的key,則會根據key的值hash出一個partition。
  三、 若是既沒指定partition,又沒有設置key,則會輪詢選出一個partition。

  保證消息不丟失是一個消息隊列中間件的基本保證,那producer在向kafka寫入消息的時候,怎麼保證消息不丟失呢?其實上面的寫入流程圖中有描述出來,那就是經過ACK應答機制!在生產者向隊列寫入數據的時候能夠設置參數來肯定是否確認kafka接收到數據,這個參數可設置的值爲01all
  0表明producer往集羣發送數據不須要等到集羣的返回,不確保消息發送成功。安全性最低可是效率最高。
  1表明producer往集羣發送數據只要leader應答就能夠發送下一條,只確保leader發送成功。
  all表明producer往集羣發送數據須要全部的follower都完成從leader的同步纔會發送下一條,確保leader發送成功和全部的副本都完成備份。安全性最高,可是效率最低。

  最後要注意的是,若是往不存在的topic寫數據,能不能寫入成功呢?kafka會自動建立topic,分區和副本的數量根據默認配置都是1。

保存數據

  Producer將數據寫入kafka後,集羣就須要對數據進行保存了!kafka將數據保存在磁盤,可能在咱們的通常的認知裏,寫入磁盤是比較耗時的操做,不適合這種高併發的組件。Kafka初始會單獨開闢一塊磁盤空間,順序寫入數據(效率比隨機寫入高)。

Partition 結構
  前面說過了每一個topic均可以分爲一個或多個partition,若是你以爲topic比較抽象,那partition就是比較具體的東西了!Partition在服務器上的表現形式就是一個一個的文件夾,每一個partition的文件夾下面會有多組segment文件,每組segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中沒有)三個文件, log文件就實際是存儲message的地方,而index和timeindex文件爲索引文件,用於檢索消息。

  img

  如上圖,這個partition有三組segment文件,每一個log文件的大小是同樣的,可是存儲的message數量是不必定相等的(每條的message大小不一致)。文件的命名是以該segment最小offset來命名的,如000.index存儲offset爲0~368795的消息,kafka就是利用分段+索引的方式來解決查找效率的問題。

Message結構
上面說到log文件就實際是存儲message的地方,咱們在producer往kafka寫入的也是一條一條的message,那存儲在log中的message是什麼樣子的呢?消息主要包含消息體、消息大小、offset、壓縮類型……等等!咱們重點須要知道的是下面三個:
  一、 offset:offset是一個佔8byte的有序id號,它能夠惟一肯定每條消息在parition內的位置!
  二、 消息大小:消息大小佔用4byte,用於描述消息的大小。
  三、 消息體:消息體存放的是實際的消息數據(被壓縮過),佔用的空間根據具體的消息而不同。

存儲策略
  不管消息是否被消費,kafka都會保存全部的消息。那對於舊數據有什麼刪除策略呢?
  一、 基於時間,默認配置是168小時(7天)。
  二、 基於大小,默認配置是1073741824。
  須要注意的是,kafka讀取特定消息的時間複雜度是O(1),因此這裏刪除過時的文件並不會提升kafka的性能!

消費數據

  消息存儲在log文件後,消費者就能夠進行消費了。與生產消息相同的是,消費者在拉取消息的時候也是找leader去拉取。

  多個消費者能夠組成一個消費者組(consumer group),每一個消費者組都有一個組id!同一個消費組者的消費者能夠消費同一topic下不一樣分區的數據,可是不會組內多個消費者消費同一分區的數據!!!是否是有點繞。咱們看下圖:

  img

  圖示是消費者組內的消費者小於partition數量的狀況,因此會出現一個消費者消費多個partition數據的狀況,消費的速度也就不及只處理一個partition的消費者的處理速度!

若是是消費者組的消費者多於partition的數量,那會不會出現多個消費者消費同一個partition的數據呢?

上面已經提到過不會出現這種狀況!注意:多出來的消費者不消費任何partition的數據。因此在實際的應用中,建議消費者組的consumer的數量與partition的數量一致!,至少比partition多。

 

partition如何存儲的呢?

partition劃分爲多組segment,每一個segment又包含.log、.index、.timeindex文件,存放的每條message包含offset、消息大小、消息體……咱們屢次提到segment和offset,查找消息的時候是怎麼利用segment+offset配合查找的呢?假如如今須要查找一個offset爲368801的message是什麼樣的過程呢?咱們先看看下面的圖:

img

  一、 先找到offset的368801message所在的segment文件(利用二分法查找),這裏找到的就是在第二個segment文件。
  二、 打開找到的segment中的.index文件(也就是368796.index文件,該文件起始偏移量爲368796+1,咱們要查找的offset爲368801的message在該index內的偏移量爲368796+5=368801,因此這裏要查找的相對offset爲5)。因爲該文件採用的是稀疏索引的方式存儲着相對offset及對應message物理偏移量的關係,因此直接找相對offset爲5的索引找不到,這裏一樣利用二分法查找相對offset小於或者等於指定的相對offset的索引條目中最大的那個相對offset,因此找到的是相對offset爲4的這個索引。
  三、 根據找到的相對offset爲4的索引肯定message存儲的物理偏移位置爲256。打開數據文件,從位置爲256的那個地方開始順序掃描直到找到offset爲368801的那條Message。

  這套機制是創建在offset爲有序的基礎上,利用segment+有序offset+稀疏索引+二分查找+順序查找等多種手段來高效的查找數據!至此,消費者就能拿到須要處理的數據進行處理了。

那每一個消費者又是怎麼記錄本身消費的位置呢?

在早期的版本中,消費者將消費到的offset維護zookeeper中,consumer每間隔一段時間上報一次,這裏容易致使重複消費,且性能很差!在新的版本中消費者消費到的offset已經直接維護在kafk集羣的__consumer_offsets這個topic中!

回到◀瘋狂創客圈

瘋狂創客圈 - Java高併發研習社羣,爲你們開啓大廠之門

相關文章
相關標籤/搜索