在kafka server端建立client機princ: addprinc test/192.168.1.124@YLH.COMjava
在kafka server端根據client機的ip生成keytab: xst -k ylh.keytab test/192.168.1.124apache
拷貝kafka server的/etc/krb5.conf到/Users/yaolihua/Documentsbootstrap
生成/Users/yaolihua/Documents/kafka-client-sasl.confide
KafkaClient{ui
com.sun.security.auth.module.Krb5LoginModule required.net
useKeyTab=truedebug
storeKey=trueorm
keyTab="/Users/yaolihua/Documents/ylh124.keytab"server
principal="test/192.168.1.124@YLH.COM";blog
};
在kafka server受權producer權限給test
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --producer --topic ylh-acl-test
在kafka server受權consumer權限給test
./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --consumer --group=ylh-group --topic ylh-acl-test
代碼:
public static void main(String args[]) {
System.setProperty("java.security.krb5.conf", "/Users/yaolihua/Documents/krb5.conf");
System.setProperty("java.security.auth.login.config", "/Users/yaolihua/Documents/kafka-client-sasl.conf");
System.setProperty("sun.security.krb5.debug", "true");
testProducer();
testConsumer();
}
private static void testProducer() {
Properties props = new Properties();
props.put("security.protocol", "SASL_PLAINTEXT");
// props.put("sasl.mechanism", "PLAIN");
props.put("sasl.mechanism", "GSSAPI");
props.put("sasl.kerberos.service.name", "test");
// props.put("bootstrap.servers", "134.129.98.33:9012");
props.put("bootstrap.servers", "192.168.1.170:9093");
props.put("acks", "all");
props.put("retries", 1);
props.put("batch.size", 1684);
props.put("linger.ms", 0);
props.put("buffer.memory", 33554432); // buffer空間32M
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 10; i++) {
String dvalue = "hello " + i;
ProducerRecord record = new ProducerRecord<String, String>("ylh-acl-test", null, dvalue);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata paramRecordMetadata, Exception paramException) {
if (paramRecordMetadata == null) {
System.out.println("paramRecordMetadata is null ");
paramException.printStackTrace();
return;
}
System.out.println("發送的消息信息 topic:" + paramRecordMetadata.topic() + ", partition:" + paramRecordMetadata.partition());
}
});
}
producer.close();
}
private static void testConsumer() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.1.170:9093");
properties.put("group.id", "ylh-group");
properties.put("enable.auto.commit", "true");
properties.put("auto.offset.reset", "earliest");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// properties.put("auto.offset.reset", "none");
properties.put("security.protocol", "SASL_PLAINTEXT");
// properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.mechanism", "GSSAPI");
properties.put("sasl.kerberos.service.name", "test");
KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList("ylh-acl-test"));
/* 讀取數據,讀取超時時間爲100ms */
while (true) {
ConsumerRecords<Object, Object> records = consumer.poll(100);
records.forEach(record->{
String format = String.format("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
System.out.println(format);
});
}
}
Congratulations!!!
安裝kerberos: https://my.oschina.net/u/185335/blog/2963061
kafka sever配置kerberos: https://my.oschina.net/u/185335/blog/2963062
kafka client使用kerberos: https://my.oschina.net/u/185335/blog/2963063