Kubernetes 部署kafka ACL(單機版)

1、概述

在Kafka0.9版本以前,Kafka集羣時沒有安全機制的。Kafka Client應用能夠經過鏈接Zookeeper地址,例如zk1:2181:zk2:2181,zk3:2181等。來獲取存儲在Zookeeper中的Kafka元數據信息。拿到Kafka Broker地址後,鏈接到Kafka集羣,就能夠操做集羣上的全部主題了。因爲沒有權限控制,集羣核心的業務主題時存在風險的。html

 

權限控制類型

kafka權限控制總體能夠分爲三種類型:java

  • 基於SSL
  • 基於Kerberos(此認證通常基於CDH,本文不與討論)
  • 基於acl的

第一種類型,須要建立ca,給證書籤名,server和client配置SSL通信。實現比較麻煩!node

第二種類型,須要搭建一臺Kerberos認證服務器,實現較複雜!python

第三種類型,是kakfa內置的,實現簡單。docker

 

本文將重點介紹基於ACL的認證明現。apache

身份認證

Kafka的認證範圍包含以下:bootstrap

  • Client與Broker之間
  • Broker與Broker之間
  • Broker與Zookeeper之間

 

當前Kafka系統支持多種認證機制,如SSL、SASL(Kerberos、PLAIN、SCRAM)。ubuntu

本文所使用的是基於SASL,認證範圍主要是Client與Broker之間。vim

 

SASL認證流程

在Kafka系統中,SASL機制包含三種,它們分別是Kerberos、PLAIN、SCRAM。api

以PLAIN認證爲示例,下面給你們介紹PLAIN認證流程。

先來簡述一下核心步驟,請勿操做!

 

配置Server

要配置SASL和ACL,咱們須要在broker端進行兩個方面的設置。首先是建立包含全部認證用戶信息的JAAS文件。本例中,咱們假設有3個用戶:admin, reader和writer,其中admin是管理員,reader用戶讀取Kafka集羣中topic數據,而writer用戶則負責向Kafka集羣寫入消息。咱們假設這3個用戶的密碼分別與用戶名相同(在實際場景中,管理員須要單獨把密碼發給各自的用戶),所以咱們能夠這樣編寫JAAS文件:

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_reader="reader"
user_writer="writer";
};

 

保存該文件爲kafka_cluster_jaas.conf,以後咱們須要把該文件的完整路徑做爲一個JVM參數傳遞給Kafka的啓動腳本。不過因爲bin/kafka-server-start.sh只接收server.properties的位置,再也不接收其餘任何參數,故咱們須要修改該啓動腳本。具體作法以下:

vim bin/kafka-server-start.sh

把該文件中的這行:

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

修改成下面這行,而後保存退出

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/path/kafka_cluster_jaas.conf kafka.Kafka "$@"

 

配置好JAAS文件後,咱們開始修改broker啓動所需的server.properties文件,你至少須要配置(或修改)如下這些參數:

# 配置ACL入口類
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 本例使用SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://:9092
# 指定SASL安全協議
security.inter.broker.protocol= SASL_PLAINTEXT
# 配置SASL機制
sasl.mechanism.inter.broker.protocol=PLAIN
# 啓用SASL機制
sasl.enabled.mechanisms=PLAIN
# 設置本例中admin爲超級用戶
super.users=User:admin

 

Ok,如今咱們能夠啓動broker了(當前確定要先啓動Zookeeper)

bin/ kafka-server-start.sh ../config/server.properties

 

可見,Kafka broker已經成功啓動了。不過當前該broker只會接收已認證client發來的請求。下面咱們繼續clients端的配置。

 

Client端配置

當Kafka Server端配置啓用了SASL/PLAIN,那麼Client鏈接的時候須要配置認證信息,Client配置一個kafka_client_jaas.conf文件,內容以下:

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="writer"
password="writer";
};

 

而後,在producer.properties和consumer.properties文件中設置認證協議,內容以下:

security.protocol=SASL_PLAINTEXT 
sasl.mechanism=PLAIN

 

最後,在kafka-console-producer.sh腳本和kafka-console-producer.sh腳本中添加JAAS文件的路徑,內容以下:

把該文件中的這行:

exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

修改成下面這行,而後保存退出

exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/path/writer_jaas.conf kafka.tools.ConsoleProducer "$@"

 

ACL操做

在配置好SASL後,啓動Zookeeper集羣和Kafka集羣以後,就可使用kafka-acls.sh腳原本操做ACL機制。

(1)查看:在kafka-acls.sh腳本中傳入list參數來查看ACL受權新

kafka-acls.sh --list --authorizer-properties zookeeper.connect=zookeeper_server:2181

 

(2)建立:建立待受權主題以前,在kafka-acls.sh腳本中指定JAAS文件路徑,而後在執行建立操做

kafka-topics.sh --create --zookeeper zookeeper_server:2181 --replication-factor 1 --partitions 1 --topic kafka_acl_topic

 

(3)生產者受權:對生產者執行受權操做

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.138.6.129:2181 --add --allow-principal User:writer --operation Write --topic=*

 

(4)消費者受權:對生產者執行受權後,經過消費者來進行驗證

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.138.6.129:2181 --add --allow-principal User:reader --operation Read –topic=*

 

(5)組受權:容許只讀用戶的全部組操做

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.138.6.129:2181 --add --allow-principal User:reader --operation Read –group=*

 

2、環境說明

操做系統 服務器地址 K8s角色 服務
ubuntu-16.04.5-server-amd64 192.168.0.121 master ks8主控端
ubuntu-16.04.5-server-amd64 192.168.0.88 node_1 etcd
ubuntu-16.04.5-server-amd64 192.168.0.89 node_2 docker私有庫

 

 

 

 

 

每臺服務器的硬件配置爲,1核3G,20G硬盤。請確保有2G的可用內存!

請確保已經安裝好了k8s集羣,關於k8s的安裝,請參考鏈接:

http://www.javashuo.com/article/p-tyrkcqwa-ke.html

裏面有詳細的過程,使用一鍵腳本便可。本文就是在這個環境上,操做的!

 

架構圖:

 

只須要在Kafka_server 設置ACL規則就能夠了。主要針對topic 作權限驗證!建立讀寫用戶進行驗證。

客戶端能夠隨意建立topic,可是向topic裏面讀寫內容,就須要作驗證了!

 

3、安裝zookeeper(docker)

登陸到node2服務器

mkdir /opt/zookeeper

 

目錄結構以下:

./
├── dockerfile
├── run.sh
├── sources.list
├── zoo.cfg
└── zookeeper-3.4.13.tar.gz

 

具體文件內容,請參考連接:

https://www.cnblogs.com/xiao987334176/p/10052795.html#autoid-2-2-0

 

記住,先不要把docker run起來。後面會用k8s 啓動鏡像。

 

4、安裝kafka_server(docker)

登陸到node2服務器

mkdir /opt/kafka_server

 

目錄結構以下:

./
├── dockerfile
├── kafka_2.12-2.1.0.tgz
├── kafka_cluster_jaas.conf
├── run.sh
└── sources.list

 

具體文件內容,請參考連接:

https://www.cnblogs.com/xiao987334176/p/10052795.html#autoid-3-6-0

 

記住,先不要把docker run起來。後面會用k8s 啓動鏡像。

 

5、安裝kafka_client(docker)

登陸到node2服務器

mkdir /opt/kafka_client

 

目錄結構以下:

./
├── consumer.config
├── dockerfile
├── kafka_2.12-2.1.0.tgz
├── producer.config
├── reader_jaas.conf
├── run.sh
├── sources.list
└── writer_jaas.conf

 

具體文件內容,請參考連接:

https://www.cnblogs.com/xiao987334176/p/10052795.html#autoid-4-6-0

 

記住,先不要把docker run起來。後面會用介紹如何啓動鏡像。

 

6、推送鏡像到私有倉庫

登陸到node2 服務器,將zookeeper和kafka_server鏡像推送到私有倉庫

docker tag zookeeper 192.168.0.89:5000/zookeeper_v1
docker push 192.168.0.89:5000/zookeeper_v1

docker tag kafka_server 192.168.0.89:5000/kafka_server_v1
docker push 192.168.0.89:5000/kafka_server_v1

 

7、使用k8s部署服務

zookeeper

登陸到k8s主控制服務器,新建zookeeper.yaml

apiVersion: extensions/v1beta1
kind: Deployment 
metadata: 
  name: zookeeper-1
spec: 
  replicas: 1
  template: 
    metadata: 
      labels: 
        name: zookeeper-1 
    spec: 
      containers: 
        - name: zookeeper-1
          image: 192.168.0.89:5000/zookeeper_v1
          ports:
          - containerPort: 2128

---
apiVersion: v1 
kind: Service 
metadata: 
  name: zookeeper-1
  labels:
    name: zookeeper-1
spec:
  #type: NodePort
  ports:
  - name: client
    port: 2181
    protocol: TCP
    #nodePort: 12182
  - name: followers
    port: 2888
    protocol: TCP
  - name: leader
    port: 3888
    protocol: TCP
  - name: jmx
    port: 7071
    protocol: TCP
    #nodePort: 17072
  selector:
    name: zookeeper-1
View Code

 

kafka_server

新建文件kafka_server.yaml

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: kafka-server-1
spec:
  replicas: 1
  template:
    metadata:
      labels:
        name: kafka-server-1
    spec:
      containers:
        - name: kafka-server-1
          image: 192.168.0.89:5000/kafka_server_v1
          env:
          - name: zookeeper
            value: "zookeeper-1.default.svc.cluster.local"
          - name: kafka
            valueFrom:
              fieldRef:
                fieldPath: status.podIP
          ports:
          - containerPort: 9092

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-server-1
  labels:
    name: kafka-server-1
spec:
  type: NodePort
  ports:
    targetPort: 9092
    protocol: TCP
    nodePort: 9092
  selector:
    name: kafka-server-1
View Code

 

注意:這裏的kafka_server的listeners地址由kafka變量決定,它是pod ip。

在以前的文章,連接以下:

https://www.cnblogs.com/xiao987334176/p/10052795.html#autoid-3-6-0

 

啓動kafka時,依賴2個變量。一個是zookeeper地址,一個是kafka監聽地址。

看下面這段,就是制定了2個變量,分別是zookeeper和kafka。它對應就是run.sh中的2個變量

env:
    - name: zookeeper
    value: "zookeeper-1.default.svc.cluster.local"
    - name: kafka
    valueFrom:
      fieldRef:
        fieldPath: status.podIP

 

env表示環境變量。

kafka_server.yaml沒法直接獲取zookeeper的pod ip。因此使用 zookeeper-1.default.svc.cluster.local 來獲取。其中zookeeper-1對應的是zookeeper.yaml中的name,後面的值,是固定的。

要想獲取kafka_server的pod id,須要使用這種寫法

valueFrom:
  fieldRef:
    fieldPath: status.podIP

 

建立應用

kubectl create -f zookeeper.yaml --validate
kubectl create -f kafka_server.yaml --validate

 

等待1分鐘,查看狀態

root@k8s-master001:~# kubectl get pods -o wide
NAME                              READY     STATUS    RESTARTS   AGE       IP                NODE
kafka-server-1-5c58954d49-kxgj6   1/1       Running   0          2h        192.138.150.193   k8s-node001
zookeeper-1-f84745dd8-84xr8       1/1       Running   0          2h        192.138.6.129     k8s-node002

 

若是啓動失敗,使用如下命令查看日誌

kubectl describe po zookeeper-1-f84745dd8-84xr8

 

8、客戶端測試

Shell客戶端測試

使用docker run一個鏡像

docker run -it -e zookeeper=192.169.6.131 -e kafka=192.169.150.195 kafka_client

 

注意:-e 參數後面的ip地址要正確,就是pod ip

 

進入容器

docker exec -it ada31484e3d6 /bin/bash

 

建立一個測試topic,名爲test,單分區,副本因子是1

cd /kafka_2.12-2.1.0/
bin/kafka-topics.sh --create --zookeeper 192.169.6.131:2181 --topic test --partitions 1 --replication-factor 1

 

配置ACL來讓writer用戶有權限寫入全部topic

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.138.6.129:2181 --add --allow-principal User:writer --operation Write --topic=*

 

爲reader用戶設置全部topic的讀取權限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.138.6.129:2181 --add --allow-principal User:reader --operation Read –topic=*

 

而後設置reader用戶訪問group的權限,-group=* 表示容許全部組

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.138.6.129:2181 --add --allow-principal User:reader --operation Read –group=*

 

登陸到kafka_client,再開一個窗口。

第一個窗口進入生產者模式,輸入342

bin/writer-kafka-console-producer.sh --broker-list 192.138.150.193:9092 --topic test --producer.config config/producer.config
>342

 

第二個窗口,運行消費者

cd /kafka_2.12-2.1.0/
bin/reader-kafka-console-consumer.sh --bootstrap-server 192.138.150.193:9092 --topic test --from-beginning --consumer.config config/consumer.config

 

這個時候會接收到

342

 

Shell腳本的客戶端,測試完成。

若是須要給writer 用戶全部權限,可使用如下命令:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.138.6.129:2181 --add --allow-principal User:writer --operation All --topic=*

 

Java 客戶端測試

在使用java 客戶端測試以前,確保客戶端能直接鏈接k8s 中的 pod ip。

登陸k8s 主控端,增長一條iptables規則。192.138.0.0/16是pod網段

iptables -t nat -I POSTROUTING -s 192.168.0.0/24 -d 192.138.0.0/16 -o tunl0 -j MASQUERADE

 

客戶端是window 10電腦,增長一條路由,確保有管理權限

route add 192.138.0.0 MASK 255.255.0.0 192.168.0.121

 

測試是否可以ping通 kafka_server的ip地址

ping 192.138.150.193

 

使用 java客戶端的測試,代碼以下:

public void send() {
    String jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";";
    String jaasCfg = String.format(jaasTemplate, "writer", "writer");
    Properties props = new Properties();
    props.put("bootstrap.servers", "192.138.150.193:9092");
    props.put("acks", "all");
    props.put("batch.size", 16384);
    props.put("buffer.memory", 33554432);
    
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "PLAIN");
    props.put("sasl.jaas.config", jaasCfg);

    Producer<String, String> producer = new KafkaProducer<>(props);
    for (int i = 0; i < 20; i++) {
        producer.send(new ProducerRecord<String, String>("test", "game", Integer.toString(i))); 
    }

    producer.close();
}


public void receive() {
    String jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";";
    String jaasCfg = String.format(jaasTemplate1, "reader", "reader");

    Properties props = new Properties();
    props.put("bootstrap.servers", "192.138.150.193:9092");
    props.put("group.id", "xxx");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");

    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "PLAIN");
    props.put("sasl.jaas.config", jaasCfg);
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("test"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
        }
    }
}
View Code

 

若是輸出0~19,則測試生產者和消費者正常。

使用Python代碼測試

先安裝模塊,本文使用的python版本爲3.5.2

pip3 install kafka

 

新建文件kafka_client.py,代碼以下:

#!/usr/bin/env python3
# coding: utf-8

from kafka import KafkaProducer
from kafka import KafkaConsumer


class KafkaClient(object):  # kafka客戶端程序
    def __init__(self, kafka_server, port, topic):
        self.kafka_server = kafka_server  # kafka服務器ip地址
        self.port = port  # kafka端口
        self.topic = topic  # topic名

    def producer(self, username, password, content):
        """
        生產者模式
        :param username: 用戶名 
        :param password: 密碼
        :param content: 發送內容
        :return: object
        """
        
        # 鏈接kafka服務器,好比['192.138.150.193:9092']
        producer = KafkaProducer(bootstrap_servers=['%s:%s' % (self.kafka_server, self.port)],
                                 security_protocol="SASL_PLAINTEXT",  # 指定SASL安全協議
                                 sasl_mechanism='PLAIN',  # 配置SASL機制
                                 sasl_plain_username=username,  # 認證用戶名
                                 sasl_plain_password=password,  # 密碼
                                 )

        producer.send(self.topic, content.encode('utf-8'))  # 發送消息,必須是二進制
        producer.flush()  # flush確保全部meg都傳送給broker
        # producer.close()
        return producer

    def consumer(self, username, password):
        """
        消費者模式
        :param username: 用戶名 
        :param password: 密碼
        :return: object
        """
        
        # 鏈接kafka,指定組爲test_group
        consumer = KafkaConsumer(topic, group_id='test_group', bootstrap_servers=['%s:%s' % (kafka_server, port)],
                                 sasl_mechanism="PLAIN",
                                 security_protocol='SASL_PLAINTEXT',
                                 sasl_plain_username=username,
                                 sasl_plain_password=password,
                                 )
        return consumer
        # for msg in consumer:
        #     recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
        #     print(recv)


kafka_server = "192.138.150.193"  
port = "9092"
topic = "test"

### 生產者######################################################
username = "writer"
password = "writer"
kafka_client = KafkaClient(kafka_server, port, topic)
result = kafka_client.producer(username, password, "hello")  # 發送消息hello
print("生產者執行完畢!")

### 消費者######################################################
username = "reader"
password = "reader"
consumer = kafka_client.consumer(username, password)  # 消費消息
print("消費者已執行,等待輸出結果:")
for msg in consumer:  # 遍歷結果
    # 輸出topic,partition,offset,key,value
    recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
    print(recv)
View Code

 

執行代碼,輸出:

生產者執行完畢!
消費者已執行,等待輸出結果:
test:0:218: key=None value=b'hello'

 

若是出現hello,表示成功!

相關文章
相關標籤/搜索