使用kafka-python客戶端進行kafka kerberos認證

    以前說過python confluent kafka客戶端作kerberos認證的過程,若是使用kafka python客戶端的話一樣也能夠進行kerberos的認證,具體的認證機制這裏再也不描述,主要敘述配置認證的過程python

    須要的模塊有下面這些:bootstrap

    kafka-python:https://pypi.org/project/kafka-python/api

    gssapi:https://pypi.org/project/gssapi/dom

    decorator:https://pypi.org/project/decorator/oop

    six:https://pypi.org/project/six/測試

    kerberos環境ui

    kafka python開啓GSSAPI須要模塊gssapi的支持,而gssapi模塊須要依賴於decorator模塊和six模塊,可是安裝時不會校驗和提示,若是不安裝的話kafka python運行是會提示找不到gssapi lib,真正的緣由仍是由於decorator或者six沒有安裝,這裏要注意. spa

    首先安裝decorator和six這兩個模塊.調試

    而後安裝gssapi模塊,安裝的時候要確保decorator和six模塊正常安裝而且kerberos須要的開發包正常安裝,不然gssapi會編譯失敗,安裝kerberos庫可使用yum命令以下:code

yum install krb5-server krb5-libs krb5-auth-dialog

    而後編譯並安裝gssapi,這裏是gssapi-1.6.1.tar.gz,安裝以下:

tar -xvzf gssapi-1.6.1.tar.gz
cd gssapi-1.6.1
python3 setup.py build
python3 setup.py install
cd ..

    完成以後要退出源碼目錄,由於導入模塊可能會出現衝突,而後進入python解釋器,測試一下模塊的安裝狀況:

from gssapi.raw.misc import GSSError

    若是導入模塊沒問題,則說明gssapi安裝成功. 

    最後直接安裝kafka-python模塊便可. 

    而後能夠開始測試python腳本認證是否正常,注意執行以前要先kinit保證klist有對應的用戶,而後再使用下面的代碼調試:

#!/usr/bin/env python3
# coding=utf-8
import time

from kafka import KafkaProducer
from kafka import KafkaConsumer

def kafka_python_producer_main():
    producer = KafkaProducer(bootstrap_servers='192.168.0.3:9092,192.168.0.4:9092,192.168.0.5:9092',
                             security_protocol='SASL_PLAINTEXT',
                             sasl_mechanism='GSSAPI',
                             sasl_kerberos_service_name='kafka',
                             sasl_kerberos_domain_name='hadoop.hadoop.com',
                             sasl_plain_username='kafkaclient')
    producer.send('testTopic', 'kafka python test'.encode('utf-8'))
    producer.flush()
    producer.close()
    print('done')

def kafka_python_consumer_main():
    consumer = KafkaConsumer('testTopic',
                             bootstrap_servers='192.168.0.3:9092,192.168.0.4:9092,192.168.0.5:9092',
                             group_id='kafka-test-20191014',
                             auto_offset_reset='earliest',
                             security_protocol='SASL_PLAINTEXT',
                             sasl_mechanism='GSSAPI',
                             sasl_kerberos_service_name='kafka',
                             sasl_kerberos_domain_name='hadoop.hadoop.com',
                             sasl_plain_username='kafkaclient')
    for msg in consumer:
        print(msg.value)
        print(msg.partition)

if __name__ == '__main__':
    kafka_python_producer_main()
    time.sleep(1)
    kafka_python_consumer_main()

    而後執行腳本測試,若是生產和消費消息都正常,說明kafka kerberos認證成功. 

相關文章
相關標籤/搜索