kerberos简单介绍
kerberos这一名词来源于希腊神话“三个头的狗---地狱之门守护者”后来沿用作为安全认证的概念,该系统设计上
采用客户端/服务器结构与des(data encryption standard标准加密技术),aes(advanced encryption standerd
高级加密技术)等加密技术,并且能够进行相互认证,即客户端和服务端均可对对方进行身份认证。可以防止窃听、
防止replay攻击、保护数据完整性等场合,是一种应对对称密钥体制进行密钥管理的系统。
基本概念
票据授权票(tgt ticket granting):
用于应用程序与kdc(key distribution center 密钥分发中心)服务器建立安全会话的票据,票据授权票存在有效期,
当票据授权票失效后,应用侧需要重新建立与kdc服务器的安全会话。会话有效期为24小时,不可配置。
服务票据(st service ticket):
用于应用程序与服务端建立安全会话的票据,服务票据存在有效期,当服务票据失效后,应用侧需要重新建立于
服务端的安全会话。默认有效期5分钟,不可配置。
kerberos 配置
详见网络说明在这里不再赘述:http://orchome.com/500
kafka kerberos producer客户端配置
1)使用配置文件kafka-console-producer.sh生产数据
cat kafka_client_jaas.conf 文件配置
kafkaclient {
com.sun.security.auth.module.krb5loginmodule required
useticketcache=false
usekeytab=true
keytab="/hbase/test.keytab"
principal="test@kerberos.test"
servicename="kafka"
client=true;
};
配置环境变量kafka_opts 举例说明
export kafka_opts="-djava.security.krb5.conf=/etc/krb5.conf -djava.security.auth.login.config=/kafka_client_jaas_acl.conf"
cat producer.properties
security.protocol=sasl_plaintext
sasl.mechanism=gssapi
sasl.kerberos.service.name=kafka
生产脚本kafka-console-producer.sh
./kafka-console-producer.sh --broker-list hostname:9092 --topic test_acl1 --producer.config producer.properties
kafka kerberos producer客户端java代码设置
import org.apache.kafka.clients.producer.*;
import java.util.properties; /**
* created by administrator on 2018-05-24.
*/
public class mykafkaprocucerwithkerberos { private static final string broker_list = "127.0.0.1:9093"; public static void main(string[] args) throws interruptedexception { string rootpath = system.getproperty("user.dir");
system.setproperty("java.security.krb5.conf", rootpath "/src/main/resources/krb5.conf");
system.setproperty("java.security.auth.login.config",rootpath "/src/main/resources/kafka_client_jaas.conf"); properties props1 = new properties();
producerproducer1=null;
props1.put("bootstrap.servers", broker_list);
props1.put("security.protocol","sasl_plaintext");
props1.put("sasl.mechanism","gssapi");
props1.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");
props1.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer"); producer1= new kafkaproducer(props1);
int line = 1;
while (line <=4) {
producerrecordmessage1= new producerrecord ("test_acl1","test_topic_data");
producer1.send(message1, new callback() {
public void oncompletion(recordmetadata recordmetadata, exception e) {
if( e!=null){
e.printstacktrace();
system.out.println("failed");
}else {
system.out.println(recordmetadata.topic());
}
}
});
line ;
}
producer1.close();
}
}