python kafka kerberos 驗證 消費 生產

[toc]html

安裝

pykafka githubjava

$ pip install pykafka

$ conda install -c conda-forge pykafka

注意kafka版本只支持 kafka 1.1, 1.0,0.11, 0.10, 0.9,0.8 (201902)

該做者在https://github.com/dpkp/kafka-python/pull/1152 這個推送增長了kerberos支持

驗證kerberos

java或者文件中 對應python參數 描述
security.protocol security_protocol 安全協議
kerberos.domain.name sasl_kerberos_domain_name 域名
sasl.kerberos.service.name sasl_kerberos_service_name 服務名
sasl.enabled.mechanisms&sasl.mechanism.inter.broker.protocol sasl_mechanism 認證機制
principal sasl_plain_username 用戶租戶名稱

kerberos知識

配置通常在consumer.properties中
拆解一個Principal:
xianglei/dmp-master1.hadoop@HADOOP.COM
一個完整的Principal由3個部分構成。

用戶名/FQDN(Full Quafilied Domain Name)的主機名@REALM(受保護的域,全大寫)

固然這個用戶名須要是Linux下存在的用戶

FQDN全限定域名,就是必定要帶上hostname.domain這種形式,固然,若是你的主機並無給出domain,那麼不寫域名也能夠。反正就是要所有的主機名加域名(若是存在域名的話)。但實際上,在Kerberos裏面,這個並不稱之爲主機名,而是叫作Instance,實例名,他能夠不是任何服務器的主機名稱,可是便於理解和認識,咱們仍是先把他當初主機名來看待吧。

REALM,受到Kerberos保護的域名稱,就是一類或一組受到Kerberos保護服務的服務器集合,你能夠想象成Windows裏面的域。因爲一個KDC能夠同時保護多個域,好比你能夠在一個KDC上既保護HADOOP服務器組,也保護MYSQL服務器組,因此咱們一般會使用域名來進行區別。

若是你的hostname裏面使用了domain name,那麼你必須在Principal的第二部分寫完整,不然KDC將沒法驗證主機的合法性,加密的tgt是要帶着主機名信息的。

還有,特別須要注意的是,這裏面第二部分的domain(域名),第三部分的realm(域),在中文裏的字是同樣,可是英文單詞徹底不一樣,他們所表達的含義也徹底不一樣。因爲一般Kerberos的Realm部分也會寫成域名的形式,因此就會讓人迷惑,而實際上,你把realm部分理解成windows裏面的workgroup或者home這種域也是能夠的。名稱能夠隨便起,不必定用你的真實域名。只是個區分不一樣服務集合的代號。

使用

資料python

我是用來鏈接華爲kafka的,測試能夠經過kerberos驗證。具體代碼就不貼了,引用一下其餘做者的,感謝

#coding=utf8
 
from kafka import KafkaProducer
 
producer = KafkaProducer(bootstrap_servers=["xxxx:9200"],
                         security_protocol="SASL_PLAINTEXT",
                         sasl_mechanism="GSSAPI",
                         sasl_kerberos_service_name="kafka")
print "connect success."
future = producer.send("xxxx", "test")
result = future.get(timeout=60)
print "send success."

其餘示例代碼

原貼git

kafka簡介(摘自百度百科)
 
1、簡介:
詳見:https://blog.csdn.net/Beyond_F4/article/details/80310507
 
2、安裝
詳見博客:https://blog.csdn.net/beyond_f4/article/details/80095689              
 
3、按照官網的樣例,先跑一個應用
一、生產者:
from kafka import KafkaProducer
 
producer = KafkaProducer(bootstrap_servers=['172.21.10.136:9092'])  #此處ip能夠是多個['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]
 
for i in range(3):
    msg = "msg%d" % i
    producer.send('test', msg)
producer.close()
 
二、消費者(簡單demo):
from kafka import KafkaConsumer
 
consumer = KafkaConsumer('test',bootstrap_servers=['172.21.10.136:9092'])
                         
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
 
 
啓動後生產者、消費者能夠正常消費。
 
三、消費者(消費羣組)
from kafka import KafkaConsumer
 
consumer = KafkaConsumer('test',group_id='my-group',bootstrap_servers=['172.21.10.136:9092'])
                         
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
                                          
啓動多個消費者,只有其中能夠能夠消費到,知足要求,消費組能夠橫向擴展提升處理能力
 
四、消費者(讀取目前最先可讀的消息)
from kafka import KafkaConsumer
 
consumer = KafkaConsumer('test',auto_offset_reset='earliest',bootstrap_servers=['172.21.10.136:9092'])
                         
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
                                         
auto_offset_reset:重置偏移量,earliest移到最先的可用消息,latest最新的消息,默認爲latest
源碼定義:{'smallest': 'earliest', 'largest': 'latest'}
 
五、消費者(手動設置偏移量)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
 
consumer = KafkaConsumer('test',bootstrap_servers=['172.21.10.136:9092'])
 
print consumer.partitions_for_topic("test")  #獲取test主題的分區信息
print consumer.topics()  #獲取主題列表
print consumer.subscription()  #獲取當前消費者訂閱的主題
print consumer.assignment()  #獲取當前消費者topic、分區信息
print consumer.beginning_offsets(consumer.assignment()) #獲取當前消費者可消費的偏移量
consumer.seek(TopicPartition(topic=u'test', partition=0), 5)  #重置偏移量,從第5個偏移量消費
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
                                          
六、消費者(訂閱多個主題)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
 
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test','test0'))  #訂閱要消費的主題
print consumer.topics()
print consumer.position(TopicPartition(topic=u'test', partition=0)) #獲取當前主題的最新偏移量
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
                                          
七、消費者(手動拉取消息)
from kafka import KafkaConsumer
import time
 
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test','test0'))
while True:
    msg = consumer.poll(timeout_ms=5)   #從kafka獲取消息
    print msg
    time.sleep(1)
    
八、消費者(消息掛起與恢復)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
 
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0))
num = 0
while True:
    print num
    print consumer.paused()   #獲取當前掛起的消費者
    msg = consumer.poll(timeout_ms=5)
    print msg
    time.sleep(2)
    num = num + 1
    if num == 10:
        print "resume..."
        consumer.resume(TopicPartition(topic=u'test', partition=0))
        print "resume......"
        
pause執行後,consumer不能讀取,直到調用resume後恢復。

示例代碼2

原貼github

"""生產者"""
def produce(self):
    producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers)
    for i in range(4):
        msg = "msg%d" %i
        producer.send(self.topic,key=str(i),value=msg)
    producer.close()

"""一個消費者消費一個topic"""
def consume(self):
    #consumer = KafkaConsumer(self.topic,auto_offset_reset='earliest',group_id="testgroup",bootstrap_servers=self.bootstrap_servers)
    consumer = KafkaConsumer(self.topic,bootstrap_servers=self.bootstrap_servers)
    print consumer.partitions_for_topic(self.topic)  #獲取test主題的分區信息
print consumer.topics()  #獲取主題列表
print consumer.subscription()  #獲取當前消費者訂閱的主題
print consumer.assignment()  #獲取當前消費者topic、分區信息
print consumer.beginning_offsets(consumer.assignment()) #獲取當前消費者可消費的偏移量
consumer.seek(TopicPartition(topic=self.topic, partition=0), 1)  #重置偏移量,從第1個偏移量消費
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" 
        % (message.topic,message.partition,message.offset, message.key,message.value))

"""一個消費者訂閱多個topic """
def consume2(self):
    consumer = KafkaConsumer(bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))  #訂閱要消費的主題
print consumer.topics()
print consumer.position(TopicPartition(topic='TEST', partition=0)) #獲取當前主題的最新偏移量
for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
        
"""消費者(手動拉取消息)"""
def consume3(self):
    consumer = KafkaConsumer(group_id="mygroup",max_poll_records=3,bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))
while True:
        message = consumer.poll(timeout_ms=5)   #從kafka獲取消息
        if message:
        print message
        time.sleep(1)

示例代碼3

原貼bootstrap

#coding=utf-8
from pykafka import KafkaClient
import codecs
import logging
logging.basicConfig(level = logging.INFO)

client = KafkaClient(hosts = "172.16.82.163:9091")

#生產kafka數據,經過字符串形式
def produce_kafka_data(kafka_topic):
  with kafka_topic.get_sync_producer() as producer:
    for i in range(4):
      producer.produce('test message' + str(i ** 2))

#消費kafka數據
def consume_simple_kafka(kafka_topic, timeout):
  consumer = kafka_topic.get_simple_consumer(consumer_timeout_ms = timeout)
  for message in consumer:
    if message is not None:
      print message.offset, message.value

#消費同一份kafka topic時,建議使用 get_balanced_consumer(),暫時不能使用
#問題:kazoo.handlers.threading.KazooTimeoutError: Connection time-out
def consume_kafka(kafka_topic, zkhost):
  balanced_consumer = kafka_topic.get_balanced_consumer(
  consumer_group = "testgroup",
  auto_commit_enable = False,
  zookeeper_connect = zkhost,
  #zookeeper = zkhost,
  zookeeper_connection_timeout_ms = 6000,
  consumer_timeout_ms = 10000,
  )
  for message in balanced_consumer:
    if message is not None:
      print message.offset, message.value

#經過文件,往kafka刷數據
def produce_kafka_file(filename, kafka_topic):
  with kafka_topic.get_sync_producer() as producer:
    with codecs.open(filename, "r", "utf8") as rf:
      for line in rf:
        line = line.strip()
        if not line:
          continue
      producer.produce(line)

#===========================================================

topic = client.topics["mytest"]

#在consumer_timeout_ms內沒有任何信息返回,則中斷接受消息
cosumer = topic.get_simple_consumer(consumer_timeout_ms = 10000)
cnt = 0
for message in cosumer:
  if message is not None:
    print message.offset, message.value
  cnt += 1
print cnt

kafka實戰教程windows

相關文章
相關標籤/搜索