kafka client使用kerberos

在kafka server端建立client機princ: addprinc test/192.168.1.124@YLH.COMjava

 

在kafka server端根據client機的ip生成keytab: xst -k ylh.keytab test/192.168.1.124apache

 

拷貝kafka server的/etc/krb5.conf到/Users/yaolihua/Documentsbootstrap

 

生成/Users/yaolihua/Documents/kafka-client-sasl.confide

KafkaClient{ui

com.sun.security.auth.module.Krb5LoginModule required.net

useKeyTab=truedebug

storeKey=trueorm

keyTab="/Users/yaolihua/Documents/ylh124.keytab"server

principal="test/192.168.1.124@YLH.COM";blog

};

 

在kafka server受權producer權限給test

./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --producer --topic ylh-acl-test

 

在kafka server受權consumer權限給test

./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --consumer --group=ylh-group --topic ylh-acl-test

 

代碼:

public static void main(String args[]) {

System.setProperty("java.security.krb5.conf", "/Users/yaolihua/Documents/krb5.conf");

System.setProperty("java.security.auth.login.config", "/Users/yaolihua/Documents/kafka-client-sasl.conf");

System.setProperty("sun.security.krb5.debug", "true");

testProducer();

testConsumer();

}

 

private static void testProducer() {

Properties props = new Properties();

props.put("security.protocol", "SASL_PLAINTEXT");

// props.put("sasl.mechanism", "PLAIN");

props.put("sasl.mechanism", "GSSAPI");

props.put("sasl.kerberos.service.name", "test");

// props.put("bootstrap.servers", "134.129.98.33:9012");

props.put("bootstrap.servers", "192.168.1.170:9093");

props.put("acks", "all");

props.put("retries", 1);

props.put("batch.size", 1684);

props.put("linger.ms", 0);

props.put("buffer.memory", 33554432); // buffer空間32M

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 

Producer<String, String> producer = new KafkaProducer<String, String>(props);

for (int i = 0; i < 10; i++) {

String dvalue = "hello " + i;

ProducerRecord record = new ProducerRecord<String, String>("ylh-acl-test", null, dvalue);

producer.send(record, new Callback() {

@Override

public void onCompletion(RecordMetadata paramRecordMetadata, Exception paramException) {

if (paramRecordMetadata == null) {

System.out.println("paramRecordMetadata is null ");

paramException.printStackTrace();

return;

}

System.out.println("發送的消息信息 topic:" + paramRecordMetadata.topic() + ", partition:" + paramRecordMetadata.partition());

}

});

}

producer.close();

}

 

private static void testConsumer() {

Properties properties = new Properties();

properties.put("bootstrap.servers", "192.168.1.170:9093");

properties.put("group.id", "ylh-group");

properties.put("enable.auto.commit", "true");

properties.put("auto.offset.reset", "earliest");

properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// properties.put("auto.offset.reset", "none");

properties.put("security.protocol", "SASL_PLAINTEXT");

// properties.put("sasl.mechanism", "PLAIN");

properties.put("sasl.mechanism", "GSSAPI");

properties.put("sasl.kerberos.service.name", "test");

KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(properties);

consumer.subscribe(Arrays.asList("ylh-acl-test"));

/* 讀取數據,讀取超時時間爲100ms */

while (true) {

ConsumerRecords<Object, Object> records = consumer.poll(100);

records.forEach(record->{

String format = String.format("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());

System.out.println(format);

});

}

}

 

Congratulations!!!

 

安裝kerberos: https://my.oschina.net/u/185335/blog/2963061

kafka sever配置kerberos: https://my.oschina.net/u/185335/blog/2963062

kafka client使用kerberos: https://my.oschina.net/u/185335/blog/2963063

相關文章
相關標籤/搜索