[toc]html
$ pip install pykafka $ conda install -c conda-forge pykafka 注意kafka版本只支持 kafka 1.1, 1.0,0.11, 0.10, 0.9,0.8 (201902) 該做者在https://github.com/dpkp/kafka-python/pull/1152 這個推送增長了kerberos支持
java或者文件中 | 對應python參數 | 描述 |
---|---|---|
security.protocol | security_protocol | 安全協議 |
kerberos.domain.name | sasl_kerberos_domain_name | 域名 |
sasl.kerberos.service.name | sasl_kerberos_service_name | 服務名 |
sasl.enabled.mechanisms&sasl.mechanism.inter.broker.protocol | sasl_mechanism | 認證機制 |
principal | sasl_plain_username | 用戶租戶名稱 |
配置通常在consumer.properties中 拆解一個Principal: xianglei/dmp-master1.hadoop@HADOOP.COM 一個完整的Principal由3個部分構成。 用戶名/FQDN(Full Quafilied Domain Name)的主機名@REALM(受保護的域,全大寫) 固然這個用戶名須要是Linux下存在的用戶 FQDN全限定域名,就是必定要帶上hostname.domain這種形式,固然,若是你的主機並無給出domain,那麼不寫域名也能夠。反正就是要所有的主機名加域名(若是存在域名的話)。但實際上,在Kerberos裏面,這個並不稱之爲主機名,而是叫作Instance,實例名,他能夠不是任何服務器的主機名稱,可是便於理解和認識,咱們仍是先把他當初主機名來看待吧。 REALM,受到Kerberos保護的域名稱,就是一類或一組受到Kerberos保護服務的服務器集合,你能夠想象成Windows裏面的域。因爲一個KDC能夠同時保護多個域,好比你能夠在一個KDC上既保護HADOOP服務器組,也保護MYSQL服務器組,因此咱們一般會使用域名來進行區別。 若是你的hostname裏面使用了domain name,那麼你必須在Principal的第二部分寫完整,不然KDC將沒法驗證主機的合法性,加密的tgt是要帶着主機名信息的。 還有,特別須要注意的是,這裏面第二部分的domain(域名),第三部分的realm(域),在中文裏的字是同樣,可是英文單詞徹底不一樣,他們所表達的含義也徹底不一樣。因爲一般Kerberos的Realm部分也會寫成域名的形式,因此就會讓人迷惑,而實際上,你把realm部分理解成windows裏面的workgroup或者home這種域也是能夠的。名稱能夠隨便起,不必定用你的真實域名。只是個區分不一樣服務集合的代號。
資料python
我是用來鏈接華爲kafka的,測試能夠經過kerberos驗證。具體代碼就不貼了,引用一下其餘做者的,感謝 #coding=utf8 from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=["xxxx:9200"], security_protocol="SASL_PLAINTEXT", sasl_mechanism="GSSAPI", sasl_kerberos_service_name="kafka") print "connect success." future = producer.send("xxxx", "test") result = future.get(timeout=60) print "send success."
原貼git
kafka簡介(摘自百度百科) 1、簡介: 詳見:https://blog.csdn.net/Beyond_F4/article/details/80310507 2、安裝 詳見博客:https://blog.csdn.net/beyond_f4/article/details/80095689 3、按照官網的樣例,先跑一個應用 一、生產者: from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['172.21.10.136:9092']) #此處ip能夠是多個['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ] for i in range(3): msg = "msg%d" % i producer.send('test', msg) producer.close() 二、消費者(簡單demo): from kafka import KafkaConsumer consumer = KafkaConsumer('test',bootstrap_servers=['172.21.10.136:9092']) for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) 啓動後生產者、消費者能夠正常消費。 三、消費者(消費羣組) from kafka import KafkaConsumer consumer = KafkaConsumer('test',group_id='my-group',bootstrap_servers=['172.21.10.136:9092']) for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) 啓動多個消費者,只有其中能夠能夠消費到,知足要求,消費組能夠橫向擴展提升處理能力 四、消費者(讀取目前最先可讀的消息) from kafka import KafkaConsumer consumer = KafkaConsumer('test',auto_offset_reset='earliest',bootstrap_servers=['172.21.10.136:9092']) for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) auto_offset_reset:重置偏移量,earliest移到最先的可用消息,latest最新的消息,默認爲latest 源碼定義:{'smallest': 'earliest', 'largest': 'latest'} 五、消費者(手動設置偏移量) from kafka import KafkaConsumer from kafka.structs import TopicPartition consumer = KafkaConsumer('test',bootstrap_servers=['172.21.10.136:9092']) print consumer.partitions_for_topic("test") #獲取test主題的分區信息 print consumer.topics() #獲取主題列表 print consumer.subscription() #獲取當前消費者訂閱的主題 print consumer.assignment() #獲取當前消費者topic、分區信息 print consumer.beginning_offsets(consumer.assignment()) #獲取當前消費者可消費的偏移量 consumer.seek(TopicPartition(topic=u'test', partition=0), 5) #重置偏移量,從第5個偏移量消費 for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) 六、消費者(訂閱多個主題) from kafka import KafkaConsumer from kafka.structs import TopicPartition consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092']) consumer.subscribe(topics=('test','test0')) #訂閱要消費的主題 print consumer.topics() print consumer.position(TopicPartition(topic=u'test', partition=0)) #獲取當前主題的最新偏移量 for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) 七、消費者(手動拉取消息) from kafka import KafkaConsumer import time consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092']) consumer.subscribe(topics=('test','test0')) while True: msg = consumer.poll(timeout_ms=5) #從kafka獲取消息 print msg time.sleep(1) 八、消費者(消息掛起與恢復) from kafka import KafkaConsumer from kafka.structs import TopicPartition import time consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092']) consumer.subscribe(topics=('test')) consumer.topics() consumer.pause(TopicPartition(topic=u'test', partition=0)) num = 0 while True: print num print consumer.paused() #獲取當前掛起的消費者 msg = consumer.poll(timeout_ms=5) print msg time.sleep(2) num = num + 1 if num == 10: print "resume..." consumer.resume(TopicPartition(topic=u'test', partition=0)) print "resume......" pause執行後,consumer不能讀取,直到調用resume後恢復。
原貼github
"""生產者""" def produce(self): producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers) for i in range(4): msg = "msg%d" %i producer.send(self.topic,key=str(i),value=msg) producer.close() """一個消費者消費一個topic""" def consume(self): #consumer = KafkaConsumer(self.topic,auto_offset_reset='earliest',group_id="testgroup",bootstrap_servers=self.bootstrap_servers) consumer = KafkaConsumer(self.topic,bootstrap_servers=self.bootstrap_servers) print consumer.partitions_for_topic(self.topic) #獲取test主題的分區信息 print consumer.topics() #獲取主題列表 print consumer.subscription() #獲取當前消費者訂閱的主題 print consumer.assignment() #獲取當前消費者topic、分區信息 print consumer.beginning_offsets(consumer.assignment()) #獲取當前消費者可消費的偏移量 consumer.seek(TopicPartition(topic=self.topic, partition=0), 1) #重置偏移量,從第1個偏移量消費 for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic,message.partition,message.offset, message.key,message.value)) """一個消費者訂閱多個topic """ def consume2(self): consumer = KafkaConsumer(bootstrap_servers=['192.168.124.201:9092']) consumer.subscribe(topics=('TEST','TEST2')) #訂閱要消費的主題 print consumer.topics() print consumer.position(TopicPartition(topic='TEST', partition=0)) #獲取當前主題的最新偏移量 for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) """消費者(手動拉取消息)""" def consume3(self): consumer = KafkaConsumer(group_id="mygroup",max_poll_records=3,bootstrap_servers=['192.168.124.201:9092']) consumer.subscribe(topics=('TEST','TEST2')) while True: message = consumer.poll(timeout_ms=5) #從kafka獲取消息 if message: print message time.sleep(1)
原貼bootstrap
#coding=utf-8 from pykafka import KafkaClient import codecs import logging logging.basicConfig(level = logging.INFO) client = KafkaClient(hosts = "172.16.82.163:9091") #生產kafka數據,經過字符串形式 def produce_kafka_data(kafka_topic): with kafka_topic.get_sync_producer() as producer: for i in range(4): producer.produce('test message' + str(i ** 2)) #消費kafka數據 def consume_simple_kafka(kafka_topic, timeout): consumer = kafka_topic.get_simple_consumer(consumer_timeout_ms = timeout) for message in consumer: if message is not None: print message.offset, message.value #消費同一份kafka topic時,建議使用 get_balanced_consumer(),暫時不能使用 #問題:kazoo.handlers.threading.KazooTimeoutError: Connection time-out def consume_kafka(kafka_topic, zkhost): balanced_consumer = kafka_topic.get_balanced_consumer( consumer_group = "testgroup", auto_commit_enable = False, zookeeper_connect = zkhost, #zookeeper = zkhost, zookeeper_connection_timeout_ms = 6000, consumer_timeout_ms = 10000, ) for message in balanced_consumer: if message is not None: print message.offset, message.value #經過文件,往kafka刷數據 def produce_kafka_file(filename, kafka_topic): with kafka_topic.get_sync_producer() as producer: with codecs.open(filename, "r", "utf8") as rf: for line in rf: line = line.strip() if not line: continue producer.produce(line) #=========================================================== topic = client.topics["mytest"] #在consumer_timeout_ms內沒有任何信息返回,則中斷接受消息 cosumer = topic.get_simple_consumer(consumer_timeout_ms = 10000) cnt = 0 for message in cosumer: if message is not None: print message.offset, message.value cnt += 1 print cnt
kafka實戰教程windows