Kafka 集羣配置SASL+ACL

1、簡介

在Kafka0.9版本以前,Kafka集羣時沒有安全機制的。Kafka Client應用能夠經過鏈接Zookeeper地址,例如zk1:2181:zk2:2181,zk3:2181等。來獲取存儲在Zookeeper中的Kafka元數據信息。拿到Kafka Broker地址後,鏈接到Kafka集羣,就能夠操做集羣上的全部主題了。因爲沒有權限控制,集羣核心的業務主題時存在風險的。html

本文主要使用SASL+ACLjava

2、技術關鍵點

配置文件

修改broker啓動所需的server.properties文件,你至少須要配置(或修改)如下這些參數:node

listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://$advertised_hostname:9092
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
allow.everyone.if.no.acl.found=true
super.users=User:admin

 

其餘參數講解,請參考連接:python

http://www.javashuo.com/article/p-ybkttuge-hx.htmldocker

 

這裏主要講解幾個重點參數shell

 

默認狀況下,若是資源R沒有關聯acl,除了超級用戶,沒有用戶容許訪問。若是你想改變這種方式你能夠作以下配置apache

allow.everyone.if.no.acl.found=truebootstrap

什麼意思呢?上面的配置已經啓動了acl,除了超級用戶以外,其餘用戶沒法訪問。那麼問題就來了,在kafka集羣中,其它節點須要同步數據,須要相互訪問。ubuntu

它默認會使用ANONYMOUS的用戶名鏈接集羣。在這種狀況下,啓動kafka集羣,必然失敗!因此這個參數必定要配置才行!vim

 

listeners=SASL_PLAINTEXT://:9092

這個參數,表示kafka監聽的地址。此參數必需要配置,默認是註釋掉的。默認會使用listeners=PLAINTEXT://:9092,可是我如今開啓了SASL,必須使用SASL協議鏈接才行。

//:9092 這裏雖然沒有寫IP地址,根據官方解釋,它會監聽全部IP。注意:這裏只能是IP地址,不能是域名。不然啓動時,會提示沒法綁定IP。

 

advertised.listeners 這個參數,表示外部的鏈接地址。這裏能夠寫域名,也能夠寫IP地址。建議使用域名,爲何呢?由於IP可能會變更,可是主機名是不會變更的。

因此在java代碼裏面寫死,就能夠了!注意:必須是SASL協議才行!

 

super.users=User:admin  表示啓動超級用戶admin,注意:此用戶名不容許更改,不然使用生產模式時,會有異常!

 

啓動腳本

bin/kafka-server-start.sh 這個是kafka的啓動腳本,要使用ACL,須要增長一個參數才行。

有2種方法修改,這裏分別介紹一下:

1. 增長環境變量KAFKA_OPTS(推薦)

先來看一下,默認的bin/kafka-server-start.sh的最後一行

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

只須要在最後一行的上面一行,添加一個環境變量便可

export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_cluster_jaas.conf"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

 

2. 增長參數-Djava.security.auth.login.config

直接將最後一行修改成

exec $base_dir/kafka-run-class.sh -Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_cluster_jaas.conf $EXTRA_ARGS kafka.Kafka "$@"

 

JAAS文件

kafka_cluster_jaas.conf

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="123456"
user_admin="123456"
user_reader="123456"
user_writer="123456";
};

 

這個文件,是專門用來作認證的。用戶名和密碼的格式以下:

user_用戶名="密碼"

 

注意:對於超級用戶,這幾行是固定的

username="admin"
password="123456"
user_admin="admin"

這裏指定的是admin用戶密碼爲123456,密碼可自行更改。

下面的,纔是普通用戶。最後一個用戶,要有一個分號才行!

 

3、正式部署

環境介紹

本文采用的環境,參考如下連接

https://www.cnblogs.com/xiao987334176/p/10088497.html#autoid-3-0-0

 

使用了3臺zookeeper和5臺kafka。都是在一臺服務器上面運行的!

其中zookeeper的鏡像,不須要變更,直接啓動便可。

可是kafka的鏡像,須要從新構建,請看下面的內容。

 

建立鏡像

建立空目錄

mkdir /opt/kafka_cluster_acl

 

dockerfile

FROM ubuntu:16.04
# 修改更新源爲阿里雲
ADD sources.list /etc/apt/sources.list
ADD kafka_2.12-2.1.0.tgz /
ADD kafka_cluster_jaas.conf /
# 安裝jdk
RUN apt-get update && apt-get install -y openjdk-8-jdk --allow-unauthenticated && apt-get clean all

EXPOSE 9092
# 添加啓動腳本
ADD run.sh .
RUN chmod 755 run.sh
ENTRYPOINT [ "/run.sh"]

 

kafka_cluster_jaas.conf

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="123456"
user_admin="123456"
user_reader="123456"
user_writer="123456";
};

 

run.sh

#!/bin/bash

if [ -z $broker_id ];then
    echo "broker_id變量不能爲空"
    exit 1
fi

if [ -z $zookeeper ];then
    echo "zookeeper變量不能爲空"
    exit 2
fi

if [ -z $advertised_hostname ];then
    echo "advertised_hostname變量不能爲空"
    exit 3
fi

# 開啓kafka acl驗證
echo "

listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://$advertised_hostname:9092
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
allow.everyone.if.no.acl.found=true
super.users=User:admin

" >> /kafka_2.12-2.1.0/config/server.properties

cd /kafka_2.12-2.1.0
# 設置惟一id
sed -i "21s/0/$broker_id/" /kafka_2.12-2.1.0/config/server.properties
# 設置zookeeper鏈接地址
sed -i "123s/localhost/$zookeeper/" /kafka_2.12-2.1.0/config/server.properties

# 配置啓動腳本,最後一行以前添加環境變量
sed -i -e "44"i'\export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_cluster_jaas.conf"' bin/kafka-server-start.sh

# 添加配置文件
mv /kafka_cluster_jaas.conf /kafka_2.12-2.1.0/config/

# 臨時添加5條hosts
echo "172.168.0.5 kafka-1.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.6 kafka-2.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.7 kafka-3.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.8 kafka-4.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.9 kafka-5.default.svc.cluster.local" >> /etc/hosts

# 啓動kafka
bin/kafka-server-start.sh config/server.properties
View Code

 

注意:因爲沒有DNS,這裏臨時添加了5條hosts記錄。5臺kafka之間,必需要相互連通,不然會報錯

WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

 

sources.list

deb http://mirrors.aliyun.com/ubuntu/ xenial main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial main

deb http://mirrors.aliyun.com/ubuntu/ xenial-updates main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates main

deb http://mirrors.aliyun.com/ubuntu/ xenial universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial universe
deb http://mirrors.aliyun.com/ubuntu/ xenial-updates universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates universe

deb http://mirrors.aliyun.com/ubuntu/ xenial-security main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security main
deb http://mirrors.aliyun.com/ubuntu/ xenial-security universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security universe

 

此時,目錄結構以下:

./
├── dockerfile
├── kafka_2.12-2.1.0.tgz
├── kafka_cluster_jaas.conf
├── run.sh
└── sources.list

 

生成鏡像

docker build -t kafka_cluster_acl /opt/kafka_cluster_acl

 

啓動鏡像

請確保已經啓動了3臺zookeeper的鏡像!

第一個kafka節點

docker run -it -p 9092:9092 -e broker_id=1 -e zookeeper=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 -e advertised_hostname=kafka-1.default.svc.cluster.local  --network br1 --ip=172.168.0.5 kafka_cluster_acl

 

第二個kafka節點

docker run -it -p 9093:9092 -e broker_id=2 -e zookeeper=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 -e advertised_hostname=kafka-2.default.svc.cluster.local --network br1 --ip=172.168.0.6 kafka_cluster_acl

 

第三個kafka節點

docker run -it -p 9094:9092 -e broker_id=3 -e zookeeper=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 -e advertised_hostname=kafka-3.default.svc.cluster.local --network br1 --ip=172.168.0.7 kafka_cluster_acl

 

第四個kafka節點

docker run -it -p 9095:9092 -e broker_id=4 -e zookeeper=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 -e advertised_hostname=kafka-4.default.svc.cluster.local --network br1 --ip=172.168.0.8 kafka_cluster_acl

 

第五個kafka節點

docker run -it -p 9096:9092 -e broker_id=5 -e zookeeper=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 -e advertised_hostname=kafka-5.default.svc.cluster.local --network br1 --ip=172.168.0.9 kafka_cluster_acl

 

客戶端測試

shell腳本客戶端

先來查看docker進程

root@jqb-node128:~# docker ps
CONTAINER ID        IMAGE               COMMAND             CREATED              STATUS              PORTS                    NAMES
a5ff3c8f5c2a        kafka_cluster_acl   "/run.sh"           About a minute ago   Up About a minute   0.0.0.0:9096->9092/tcp   gifted_jones
36a4d94054b5        kafka_cluster_acl   "/run.sh"           3 minutes ago        Up 3 minutes        0.0.0.0:9095->9092/tcp   modest_khorana
f614d734ac8b        kafka_cluster_acl   "/run.sh"           3 minutes ago        Up 3 minutes        0.0.0.0:9094->9092/tcp   tender_kare
29ef9a2edd08        kafka_cluster_acl   "/run.sh"           3 minutes ago        Up 3 minutes        0.0.0.0:9093->9092/tcp   reverent_jepsen
d9cd45c62e86        kafka_cluster_acl   "/run.sh"           3 minutes ago        Up 3 minutes        0.0.0.0:9092->9092/tcp   silly_mcclintock
69dba560bc09        zookeeper_cluster   "/run.sh"           4 minutes ago        Up 4 minutes        0.0.0.0:2183->2181/tcp   confident_fermat
d73a01e76949        zookeeper_cluster   "/run.sh"           4 minutes ago        Up 4 minutes        0.0.0.0:2182->2181/tcp   admiring_snyder
7ccab68252e7        zookeeper_cluster   "/run.sh"           4 minutes ago        Up 4 minutes        0.0.0.0:2181->2181/tcp   gifted_wilson

 

確保已經運行了5個kafka和3個zk

 

隨便進入一個kafka容器

root@jqb-node128:~# docker exec -it a5ff3c8f5c2a /bin/bash
root@a5ff3c8f5c2a:/# cd /kafka_2.12-2.1.0/

 

新增一個配置文件 kafka_client_jaas.conf 

root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# apt-get install -y vim
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# vi config/kafka_client_jaas.conf

 

內容以下:

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="writer"
password="123456";
};

 

同理咱們也要將配置文件內容傳遞給JVM, 所以須要修改。

生產者

root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# vi bin/kafka-console-producer.sh

最後一行的上面,添加 KAFKA_OPTS 變量

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M"
fi
export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_client_jaas.conf"
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

 

修改生產者配置文件,最後一行追加2行內容

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

 

使用echo 追加

root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# echo 'security.protocol=SASL_PLAINTEXT' >> config/producer.properties
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# echo 'sasl.mechanism=PLAIN' >> config/producer.properties

 

消費者

修改生產者配置文件,使用echo追加

root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# echo 'security.protocol=SASL_PLAINTEXT' >> config/consumer.properties
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# echo 'sasl.mechanism=PLAIN' >> config/consumer.properties

 

編輯測試腳本

root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# vi bin/kafka-console-consumer.sh

最後一行的上面,添加 KAFKA_OPTS 變量

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M"
fi
export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_client_jaas.conf"
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

 

測試生產者

目前尚未topic,先來建立一個topic

root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-topics.sh --create --zookeeper 172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 --topic test --partitions 1 --replication-factor 1

Created topic "test".
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# 

 

進入生產者模式,指定kafka的服務器爲第一個kafka。固然,只要是5個kafka中的任意一個便可!

輸入消息 fdsa,回車

root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test --producer.config config/producer.properties
>fdsa
[2018-12-17 08:45:15,455] ERROR Error when sending message to topic test with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []
[2018-12-17 08:45:15,457] ERROR [Producer clientId=console-producer] Connection to node -1 (d9cd45c62e86.br1/172.168.0.5:9092) failed authentication due to: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are [] (org.apache.kafka.clients.NetworkClient)

 

會出現報錯,則說明配置的security 已生效, 要想普通用戶能讀寫消息,須要配置ACL

 

配置ACL

kafka的ACL規則,是存儲在zookeeper中的,只須要鏈接zookeeper便可!

topic權限

容許writer用戶有全部權限,訪問全部topic

--operation All 表示全部權限,

--topic=* 表示全部topic

root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 --add --allow-principal User:writer --operation All --topic=*
Adding ACLs for resource `Topic:LITERAL:*`: 
     User:writer has Allow permission for operations: All from hosts: * 

Current ACLs for resource `Topic:LITERAL:*`: 
     User:writer has Allow permission for operations: All from hosts: * 

 

組權限

root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 --add --allow-principal User:writer --operation All -group=*
Adding ACLs for resource `Group:LITERAL:*`: 
     User:writer has Allow permission for operations: All from hosts: * 

Current ACLs for resource `Group:LITERAL:*`: 
     User:writer has Allow permission for operations: All from hosts: * 

 

再次測試

root@e0bb740ac0ce:/kafka_2.12-2.1.0# bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test --producer.config config/producer.properties
>123>

 

注意:在config/server.properties 文件中,設置了

advertised.listeners=SASL_PLAINTEXT://kafka-1.default.svc.cluster.local:9092

因此鏈接地址,必須是指定域名才能夠!

 

再開一個窗口,鏈接一樣的容器

root@jqb-node128:~# docker exec -it a5ff3c8f5c2a /bin/bash
root@a5ff3c8f5c2a:/# cd /kafka_2.12-2.1.0/

 

啓動消費者模式

root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-console-consumer.sh --bootstrap-server kafka-1.default.svc.cluster.local:9092 --topic test --from-beginning  --consumer.config config/consumer.properties
123

 

收到123表示成功了!

 

python客戶端測試

因爲真實主機沒法直接鏈接到網橋的地址172.168.0.5,那麼所以代碼須要在

建立空目錄

mkdir /opt/py_test

 

放2個文件

sources.list

deb http://mirrors.aliyun.com/ubuntu/ xenial main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial main

deb http://mirrors.aliyun.com/ubuntu/ xenial-updates main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates main

deb http://mirrors.aliyun.com/ubuntu/ xenial universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial universe
deb http://mirrors.aliyun.com/ubuntu/ xenial-updates universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates universe

deb http://mirrors.aliyun.com/ubuntu/ xenial-security main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security main
deb http://mirrors.aliyun.com/ubuntu/ xenial-security universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security universe
View Code

 

produer_consumer_acl_test.py

#!/usr/bin/env python3
# coding: utf-8
# 注意:須要手動建立topic才行執行此腳本

import sys
import io

def setup_io():  # 設置默認屏幕輸出爲utf-8編碼
    sys.stdout = sys.__stdout__ = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8', line_buffering=True)
    sys.stderr = sys.__stderr__ = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8', line_buffering=True)
setup_io()

import time
from kafka import KafkaProducer
from kafka import KafkaConsumer


class KafkaClient(object):  # kafka客戶端程序
    def __init__(self, kafka_server, port, topic,content,username,password):
        self.kafka_server = kafka_server  # kafka服務器ip地址
        self.port = port  # kafka端口
        self.topic = topic  # topic名
        self.content = content  # 發送內容
        self.username = username  # 用戶名
        self.password = password  # 密碼

    def producer(self):
        """
        生產者模式
        :return: object
        """

        # 鏈接kafka服務器,好比['192.138.150.193:9092']
        producer = KafkaProducer(bootstrap_servers=['%s:%s' % (self.kafka_server, self.port)],
                                 security_protocol="SASL_PLAINTEXT",  # 指定SASL安全協議
                                 sasl_mechanism='PLAIN',  # 配置SASL機制
                                 sasl_plain_username=self.username,  # 認證用戶名
                                 sasl_plain_password=self.password,  # 密碼
                                 )

        producer.send(self.topic, self.content)  # 發送消息,必須是二進制
        producer.flush()  # flush確保全部meg都傳送給broker
        producer.close()
        return producer

    def consumer(self):
        """
        消費者模式
        :return: object
        """

        # 鏈接kafka,指定組爲test_group
        consumer = KafkaConsumer(topic, group_id='test_group', bootstrap_servers=['%s:%s' % (kafka_server, port)],
                                 sasl_mechanism="PLAIN",
                                 security_protocol='SASL_PLAINTEXT',
                                 sasl_plain_username=self.username,
                                 sasl_plain_password=self.password,
                                 )

        return consumer
        # for msg in consumer:
        #     recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
        #     print(recv)

    def main(self):
        startime = time.time()  # 開始時間

        client = KafkaClient(self.kafka_server, self.port, self.topic, self.content,self.username,self.password)  # 實例化客戶端
        client.producer()  # 執行生產者
        print('執行生產者')
        consumer = client.consumer()  # 執行消費者
        print('執行消費者')
        print('等待結果....')
        flag = False
        for msg in consumer:
            # recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
            # 判斷生產的消息和消費的消息是否一致
            print(msg.value)
            # print(self.content)
            if msg.value == self.content:
                flag = True
                break

        consumer.close()  # 關閉消費者對象
        endtime = time.time()  # 結束時間

        if flag:
            # %.2f %(xx) 表示保留小數點2位
            return "kafka驗證消息成功,花費時間", '%.2f 秒' % (endtime - startime)
        else:
            return "kafka驗證消息失敗,花費時間", '%.2f 秒' % (endtime - startime)


if __name__ == '__main__':
    kafka_server = "kafka-1.default.svc.cluster.local"
    port = "9092"
    topic = "test"
    content = "hello honey".encode('utf-8')

    username = "writer"
    password = "123456"

    client = KafkaClient(kafka_server,port,topic,content,username,password)  # 實例化客戶端
    print(client.main())
View Code

 

此時目錄結構以下:

./
├── produer_consumer_acl_test.py
└── sources.list

 

進入容器,更新ubuntu更新源

root@jqb-node128:/opt/py_test# docker run -it -v /opt/py_test:/mnt --network br1 --ip=172.168.0.10 ubuntu:16.04
root@064f2f97aad2:/# cp /mnt/sources.list /etc/apt/ 
root@064f2f97aad2:/# apt-get update  

 

安裝python3-pip

root@064f2f97aad2:/# apt-get install -y python3-pip

 

安裝kafka模塊

root@064f2f97aad2:/# pip3 install kafka

 

添加hosts記錄

echo "172.168.0.5 kafka-1.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.6 kafka-2.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.7 kafka-3.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.8 kafka-4.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.9 kafka-5.default.svc.cluster.local" >> /etc/hosts

 

執行Python文件

root@064f2f97aad2:/# python3 /mnt/produer_consumer_acl_v1_test.py 
執行生產者
執行消費者
等待結果....
b'hello honey'
('kafka驗證消息成功,花費時間', '28.59 秒')

 

注意:第一次執行時,會很是慢。等待30秒,若是沒有輸出hello honey。終止掉,再次執行。

反覆5次。就能夠了!

 

以後再次執行幾回,就會很快了!

root@064f2f97aad2:/# python3 /mnt/produer_consumer_acl_v1_test.py 
執行生產者
執行消費者
等待結果....
b'hello honey'
('kafka驗證消息成功,花費時間', '5.37 秒')
root@064f2f97aad2:/# python3 /mnt/produer_consumer_acl_v1_test.py 
執行生產者
執行消費者
等待結果....
b'hello honey'
('kafka驗證消息成功,花費時間', '0.43 秒')

 

爲啥,前面幾回會很慢。以後就很快了,什麼緣由,我也不知道!

總之,只要經歷過慢的階段,以後就很快了!

 

本文參考連接:

http://blog.51cto.com/xiaoyouyou/2061143

相關文章
相關標籤/搜索