Kafka安全模式之身份认证

一、简介

Kafka作为一个分布式的发布-订阅消息系统,在日常项目中被频繁使用,通常情况下无论是生产者还是消费者只要订阅Topic后,即可进行消息的发送和接收。而kafka在0.9.0.0版本后添加了身份认证和权限控制两种安全服务,本文主要介绍在实际项目使用过程中遇到第三方kafka需身份认证时如何解决,以及对可能会碰到的问题进行总结。

二、原理介绍

Kafka身份认证主要分为以下几种:

(1)客户端与broker之间的连接认证

(2)broker与broker之间的连接认证

(3)broker与zookeeper之间的连接认证

日常项目中,无论是生产者还是消费者,我们都是作为客户端与kafka进行交互,因此使用的最多的是客户端与broker之间的连接认证。图1是客户端与服务端broker之间的认证过程图,客户端提交认证数据,服务端会根据认证数据对当前客户端进行身份校验,校验成功后的客户端即可成功登录kafka,进行后续操作。

图1 客户端与broker之间认证过程图

目前Kafka提供了SASL、SSL、Delegation Tokem三种安全认证机制,而SASL认证又分为了以下几种方式:

(1)基于Kerberos的GSSAPI

SASL-GSSAPI提供了一种非常安全的身份验证方法,但使用前提是企业中有Kerberos基础,一般使用随机密码的keytab认证方式,密码是加密的,在0.9版本中引入,目前是企业中使用最多的认证方式。

(2)SASL-PLAIN

SASL-PLAIN方式是一个经典的用户名/密码的认证方式,其中用户名和密码是以明文形式保存在服务端的JAAS配置文件中的,当客户端使用PLAIN模式进行认证时,密码是明文传输的,因此安全性较低,但好处是足够简单,方便我们对其进行二次开发,在0.10版本引入。

(3)SASL-SCRAM

SASL-SCRAM是针对SASL-PLAIN方式的不足而提供的另一种认证方式,它将用户名/密码存储在zookeeper中,并且可以通过脚本动态增减用户,当客户端使用SCRAM模式进行认证时,密码会经过SHA-256或SHA-512哈希加密后传输到服务器,因此安全性较高,在0.10.2版本中引入。

对Kafka集群来说,要想实现完整的安全模式,首先为集群中的每台机器生成密钥和证书是第一步,其次利用SASL对客户端进行身份验证是第二步,最后对不同客户端进行读写操作的授权是第三步,这些步骤即可以单独运作也可以同时运作,从而提高kafka集群的安全性。

三、具体实现

本文主要介绍作为kafka生产者,如何基于Kerberos进行身份认证给第三方kafka发送数据。

Kerberos主要由三个部分组成:密钥分发中心Key Distribution Center(即KDC)、客户端Client、服务端Service,大致关系图如下图2所示,其中KDC是实现身份认证的核心组件,其包含三个部分:

  1. Kerberos Database:储存用户密码以及其他信息
  2. Authentication Service(AS):进行用户身份信息验证,为客户端提供Ticket Granting Tickets(TGT)
  3. Ticket Granting Service(TGS):验证TGT,为客户端提供Service Tickets

我们作为生产者向第三方kafka发送数据,因此需要第三方提供以下安全认证文件:

  • 用户名principle:标识客户端的用户身份,也即用于登录的用户名
  • 指定用户名对应的秘钥文件xx.keytab:存储了用户的加密密码
  • 指定安全认证的服务配置文件krb5.conf:客户端根据该文件中的信息去访问KDC

获取以上安全认证文件后,即可编写java代码连接第三方kafka,步骤如下:

1、将安全认证文件xx.keytabkrb5.conf放置于某一路径下,确保后续java代码可进行读取

2、添加kafka配置文件,开启安全模式认证,其中kerberos.path是第一步中认证文件所在的目录

3、修改Kafka生产者配置,开启安全连接

4、调用认证工具类进行登录认证

LoginUtil认证工具类的核心是根据第一步中提供的安全认证文件自动生成jaas配置文件,该文件是kafka安全模式下认证的核心。代码如下:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;

/**
 * @ProjectName: stdp-security-demo
 * @Package: 
 * @ClassName: LoginUtil
 * @Author: stdp
 * @Description: ${description}
 */
public class LoginUtil {
	
	public enum Module {
		KAFKA("KafkaClient"), ZOOKEEPER("Client");

		private String name;

		Module(String name) {
			this.name = name;
		}

		public String getName() {
			return name;
		}
	}

	private static final Logger LOGGER = LoggerFactory.getLogger(LoginUtil.class);

	/**
	 * line operator string
	 */
	private static final String LINE_SEPARATOR = System.getProperty("line.separator");

	/**
	 * jaas file postfix
	 */
	private static final String JAAS_POSTFIX = ".jaas.conf";

	private static final String JAVA_SECURITY_KRB5_CONF_KEY = "java.security.krb5.conf";

	public static final String JAVA_SECURITY_LOGIN_CONF_KEY = "java.security.auth.login.config";

	private static final String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";

	
	private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM");

	/**
	 * oracle jdk login module
	 */
	private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required";


	public synchronized static void login(String userPrincipal, String userKeytabPath, String krb5ConfPath)
			throws IOException
	{
		// 1.check input parameters
		if ((userPrincipal == null) || (userPrincipal.length() <= 0))
		{
			LOGGER.error("input userPrincipal is invalid.");
			throw new IOException("input userPrincipal is invalid.");
		}

		if ((userKeytabPath == null) || (userKeytabPath.length() <= 0))
		{
			LOGGER.error("input userKeytabPath is invalid.");
			throw new IOException("input userKeytabPath is invalid.");
		}

		if ((krb5ConfPath == null) || (krb5ConfPath.length() <= 0))
		{
			LOGGER.error("input krb5ConfPath is invalid.");
			throw new IOException("input krb5ConfPath is invalid.");
		}

		// 2.check file exsits
		File userKeytabFile = new File(userKeytabPath);
		if (!userKeytabFile.exists())
		{
			LOGGER.error("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") does not exsit.");
			throw new IOException("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") does not exsit.");
		}
		if (!userKeytabFile.isFile())
		{
			LOGGER.error("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") is not a file.");
			throw new IOException("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") is not a file.");
		}

		File krb5ConfFile = new File(krb5ConfPath);
		if (!krb5ConfFile.exists())
		{
			LOGGER.error("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") does not exsit.");
			throw new IOException("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") does not exsit.");
		}
		if (!krb5ConfFile.isFile())
		{
			LOGGER.error("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") is not a file.");
			throw new IOException("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") is not a file.");
		}

		// 3.set and check krb5config
		setKrb5Config(krb5ConfFile.getAbsolutePath());

//        LOGGER.info("check zookeeper server Principal =============================================");
        setZookeeperServerPrincipal(userPrincipal);
//        LOGGER.info("check jaas.conf +++++++++++++++++++++++++++++++++++++++++++++++++");
        setJaasFile(userPrincipal,userKeytabPath);
		LOGGER.info("Login success!!!!!!!!!!!!!!");
	}


	public static void setKrb5Config(String krb5ConfigFile) throws IOException {
		System.setProperty(JAVA_SECURITY_KRB5_CONF_KEY,krb5ConfigFile);
		String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF_KEY);
		if (ret == null) {
			LOGGER.error(JAVA_SECURITY_KRB5_CONF_KEY + " is null.");
			throw new IOException(JAVA_SECURITY_KRB5_CONF_KEY + " is null.");
		}
		if (!ret.equals(krb5ConfigFile)){
			LOGGER.error(JAVA_SECURITY_KRB5_CONF_KEY + " is " + ret + " is not " + krb5ConfigFile + ".");
			throw new IOException(JAVA_SECURITY_KRB5_CONF_KEY + " is " + ret + " is not " + krb5ConfigFile + ".");
		}
	}

	public static void setJaasFile(String userPrincipal,String userKeytabPath) throws IOException {
		String jaasPath = new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name") + JAAS_POSTFIX;
		LOGGER.info("jaasPath = {}",jaasPath);
		//windows路径下分隔符替换
		jaasPath = jaasPath.replace("\\","\\\\");
		userKeytabPath = userKeytabPath.replace("\\","\\\\");
		//删除jaas文件
		deleteJaasFile(jaasPath);
		writeJaasFile(jaasPath,userPrincipal,userKeytabPath);
		System.setProperty(JAVA_SECURITY_LOGIN_CONF_KEY,jaasPath);
	}

	private static void deleteJaasFile(String jaasPath) throws IOException {
		File jaasFile = new File(jaasPath);
		if (jaasFile.exists()){
			if (!jaasFile.delete()){
				throw new IOException("failed to delete exists jaas file.");
			}
		}
	}

	private static void writeJaasFile(String jaasPath,String userPrincipal,String userKeytabPath) throws IOException {
		FileWriter writer = new FileWriter(new File(jaasPath));
		try{
			writer.write(getJaasConfContext(userPrincipal,userKeytabPath));
			writer.flush();
		}catch (IOException e){
			throw new IOException("Failed to create jaas.conf File.");
		}finally {
			writer.close();
		}
	}


	private static String getJaasConfContext(String userPrincipal,String userKeytabPath) throws IOException{
		Module[] allModule = Module.values();
		StringBuffer builder = new StringBuffer();
		for (Module module: allModule){
			String serviceName = null;
			if ("Client".equals(module.getName())){
				serviceName = "zookeeper";
			}else if ("KafkaClient".equals(module.getName())){
				serviceName = "kafka";
			}
			builder.append(getModuleContext(userPrincipal,userKeytabPath,module,serviceName));
		}
		return builder.toString();
	}

	private static String getModuleContext(String userPrincipal,String userKeytabPath,Module module,String serviceName) throws IOException {
		StringBuffer builder = new StringBuffer();
		if (IS_IBM_JDK){
			builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
			builder.append("credsType=both").append(LINE_SEPARATOR);
			builder.append("principal=\"" + userPrincipal.trim() + "\"").append(LINE_SEPARATOR);
			builder.append("useKeytab=\"" + userKeytabPath + "\"").append(LINE_SEPARATOR);
            builder.append("serviceName=\""+serviceName + "\"").append(LINE_SEPARATOR);
			builder.append("debug=true;").append(LINE_SEPARATOR);
			builder.append("};").append(LINE_SEPARATOR);
		}else {
			builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);
			builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR);
			builder.append("useKeyTab=true").append(LINE_SEPARATOR);
			builder.append("keyTab=\"" + userKeytabPath + "\"").append(LINE_SEPARATOR);
			builder.append("principal=\"" + userPrincipal.trim() + "\"").append(LINE_SEPARATOR);
            builder.append("serviceName=\""+serviceName + "\"").append(LINE_SEPARATOR);
			builder.append("useTicketCache=false").append(LINE_SEPARATOR);
			builder.append("storeKey=true").append(LINE_SEPARATOR);
			builder.append("debug=true;").append(LINE_SEPARATOR);
			builder.append("};").append(LINE_SEPARATOR);
		}
		return builder.toString();
	}


	public static void setZookeeperServerPrincipal(String zkServerPrincipal) throws IOException {
		System.setProperty(ZOOKEEPER_SERVER_PRINCIPAL_KEY,zkServerPrincipal);
		String ret = System.getProperty(ZOOKEEPER_SERVER_PRINCIPAL_KEY);
		if (ret == null) {
			LOGGER.error(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is null.");
			throw new IOException(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is null.");
		}
		if (!ret.equals(zkServerPrincipal)){
			LOGGER.error(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is " + ret + " is not " + zkServerPrincipal + ".");
			throw new IOException(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is " + ret + " is not " + zkServerPrincipal + ".");
		}
	}
}

经过以上四步的配置,启动项目后即可自动连接kafka进行身份校验,若登录成功,会输出如下提示信息:Login success,并且会将生成的jaas文件路径打印出来。

四、常见问题

1、认证文件找不到

这是因为步骤1中kerberos.path配置有问题,检查path路径下是否存在认证文件keytab和krb5.conf。

2、 principal和keytab不匹配

不同的用户名对应不同的密码,在身份校验时,需保证用户名principle和密码keytab的一致性,否则无法验证通过。而principal和keytab不匹配可能存在以下两种场景:

  •  配置文件中出现问题:检查kerberos.principle和kerberos.keytab中的用户名(即hkjj)是否一致。

  •  检查生成的jaas文件中用户名和配置的用户名是否相同

如果步骤1检查没用问题,则可根据日志中输出的jaas文件路径查看自动生成的jaas文件中的principal和配置文件中的kerberos.principle是否一致。比如我的这个项目中,就是由于现场技术配置kerberos.principle时后面多打了一个空格,导致自动生成的jaas文件中的principle后多一个空格,因此和keytab认证失败。

为了彻底解决这个误打空格的问题,可以直接修改认证工具类LoginUtil,在生成jaas文件的principle时去掉可能存在的空格。

3、用户密码keytab更新,导致出现checksum failed

这是由于principal对应的密码修改了,但是程序中使用的还是旧的密码,就会出现这个问题。解决办法是找第三方提供principal对应的最新的密码文件keytab

4jaas文件找不到

该问题是由于找不到jaas.conf 这个文件导致的,而基于kerberos认证时一般不会出现,这是因为kerberos认证时jaas文件是由LoginUtil工具类根据安全认证文件自动生成并且存储在指定路径下的。

该问题通常出现在SASL-PLAIN方式的认证中,因为该方式需要添加一个配置参数java.security.auth.login.config来标识jaas文件的路径,如果文件路径出错则会报以上错误。

五、总结

在kafka身份认证的过程中,需要的principal,keytab,ServiceName等信息均配置在jaas文件中,因此保证认证的服务可以读取到正确的文件及正确的配置是kafka安全模式下认证的核心。

基于kerberos认证时,可根据安全认证文件自动生成jaas配置文件,从而保证了密码加密传输,相比于SASL-PLAIN模式更具安全性,并且认证实现过程也较为简单。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/415084.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

10 Redis之SB整合Redis

7. SB整合Redis Spring Boot 中可以直接使用 Jedis 实现对 Redis 的操作&#xff0c;但一般不这样用&#xff0c;而是使用 Redis操作模板 RedisTemplate 类的实例来操作 Redis。 RedisTemplate 类是一个对 Redis 进行操作的模板类。该模板类中具有很多方法&#xff0c;这些方…

stable-diffusion-webui+sadTalker开启GFPGAN as Face enhancer

接上一篇&#xff1a;在autodl搭建stable-diffusion-webuisadTalker-CSDN博客 要开启sadTalker gfpgan as face enhancer&#xff0c; 需要将 1. stable-diffusion-webui/extensions/SadTalker/gfpgan/weights 目录下的文件拷贝到 :~/autodl-tmp/models/GFPGAN/目录下 2.将G…

杰理-按键多次按下识别多击

杰理-按键多次按下识别多击 #define ALL_KEY_EVENT_CLICK_ONLY 0 //是否全部按键只响应单击事件

ansys计算结果保存

100 &#xff1a; 图片质量 ON&#xff1a;白色背景 右键设置保存图片的背景格式&#xff1a;

基于Python网络爬虫的IT招聘就业岗位数据分析可视化推荐系统

文章目录 基于Python网络爬虫的IT招聘就业岗位数据分析可视化推荐系统项目概述招聘岗位数据爬虫分析系统展示用户注册登录系统首页IT招聘数据开发岗-javaIT招聘数据开发岗-PythonIT招聘数据开发岗-AndroidIT招聘数据开发岗-其它招聘岗位数据分析算法方面运维方面测试方面招聘岗…

redis是单线程,为什么这么快?

redis是纯内存操作&#xff0c;C语言编写&#xff0c;执行速度非常快。 采用单线程&#xff0c;避免不必要的上下文切换&#xff0c;不用考虑线程安全问题。 采用I/O多路复用模型&#xff0c;非阻塞I/O。 例如&#xff1a;bgsave和bgrewriteaof都是在后台执行操作&#xff0…

农业四情监测设备为什么符合高标准农田建设

TH-Q3随着科技的不断进步&#xff0c;智慧农业正逐渐成为现代农业发展的重要方向。其中&#xff0c;农业四情监测系统以其独特的功能和优势&#xff0c;在高标准农田建设中发挥着越来越重要的作用。 一、农业四情监测系统的概念及功能 农业四情监测系统&#xff0c;顾名思义&am…

C++之queue和dqueue

1、queue queue&#xff08;队列&#xff09;&#xff0c;一种数据结构&#xff0c;可以让某些数据结构的操作变得简单。队列&#xff08;queue&#xff09;最大的特点就是先进先出。就是说先放入queue容器的元素一定是要先出队列之后&#xff0c;比它后进入队列的元素才能够出…

算法沉淀——动态规划之回文串问题(上)(leetcode真题剖析)

算法沉淀——动态规划之回文串问题 01.回文子串02.最长回文子串03.分割回文串 IV04.分割回文串 II05.最长回文子序列06.让字符串成为回文串的最少插入次数 01.回文子串 题目链接&#xff1a;https://leetcode.cn/problems/palindromic-substrings/ 给你一个字符串 s &#xf…

08 MyBatis之查询专题(返回对象/Map/List封装Map/Map封装Map)+列名与属性名映射的三种方法

准备: INSERT INTO t_car (id, car_num, brand, guide_price, produce_time, car_type) VALUES (165, 6666, 丰田霸道, 32.00, 2020-11-11, 燃油车); INSERT INTO t_car (id, car_num, brand, guide_price, produce_time, car_type) VALUES (166, 1202, 大众速腾, 30.00, 2020…

IntelliJ IDEA 2023:创新不止步,开发更自由 mac/win版

IntelliJ IDEA 2023激活版是一款强大而智能的集成开发环境(IDE)&#xff0c;为开发者提供了一系列先进的功能和工具&#xff0c;帮助他们更高效地编写、调试和测试代码。 IntelliJ IDEA 2023 软件获取 IntelliJ IDEA 2023继承了其前代版本的优秀基因&#xff0c;并在此基础上进…

基于AMDGPU-ROCm的深度学习环境搭建

在风起云涌的AI江湖&#xff0c;NVIDIA凭借其CUDA生态和优秀的硬件大杀四方&#xff0c;立下赫赫战功&#xff0c;而另一家公司AMD也不甘示弱&#xff0c;带着他的生态解决方案ROCm开始了与不世出的NVIDA的正面硬钢&#xff0c;"ROCm is the answer to CUDA", AMD官网…

177基于matlab的基于S函数的变步长自适应滤波算法

基于matlab的基于S函数的变步长自适应滤波算法&#xff0c;比传统的算法收敛速度更快。传统的LMS算法中&#xff0c;权值向量实时地被更新。这些更新可能会由于噪声的影响而变得不稳定。SVSLMS算法是一种改进的LMS算法&#xff0c;它采用了矢量处理的概念&#xff0c;利用信号和…

分布式锁的应用与疑惑

文章目录 一、为什么需要用分布式锁二、Redis实现分布式锁三、Zookeeper实现分布式锁 一、为什么需要用分布式锁 集群下&#xff0c;普通的锁&#xff0c;无法解决问题 集群下&#xff0c;保证安全需要使用分布式锁 二、Redis实现分布式锁 Redisson内部封装的RedLock实现分…

yolov9,使用自定义的数据训练推理

[源码 &#x1f40b;]( GitHub - WongKinYiu/yolov9: Implementation of paper - YOLOv9: Learning What You Want to Learn Using Programmable Gradient Information) [论文 &#x1f4d8;](arxiv.org/pdf/2402.13616.pdf) 论文摘要&#xff1a;本文介绍了一种新的目标检测…

Web服务器群集:OpenEuler 部署 LAMP(LNMP) 基础服务

目录 一、实验 1.环境 2. 网络配置 3. MobaXterm远程连接 4. apache 2.4.58 源码编译安装 5. php 8.3.1源码编译安装 6.配置httpd 连接 php-fpm 6. nginx 1.24.0源码编译安装 7. mysql 8.0.36安装 二、问题 1.MobaXterm设置右键复制粘贴 2.OpenEuler如何查看CPU的核…

网络防御保护3

一、双击热备 1&#xff0c;根据网段划分配置IP地址和安全区域 2&#xff0c;配置双机热备场景 主备场景配置 抢占延时仅对主设备生效。 hello报文周期时间--- 默认为1S&#xff0c;可以修改&#xff0c;但是&#xff0c;主备设备需要同时修改为相同值。 同步配置 双机热备的…

【网站项目】424学报稿件管理系统

&#x1f64a;作者简介&#xff1a;拥有多年开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以私聊博主获取。&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0c;帮助大学选题。赠送开题报告模板&#xff…

一个注解实现频率控制

1.概述 抹茶项目是一个即时的IM通信项目&#xff0c;并且有着万人大群。但凡有几个人刷屏&#xff0c;那消息爆炸的场景&#xff0c;都不敢想象。如果我们需要对项目特定的接口进行频率控制&#xff0c;不仅是业务上的功能&#xff0c;同样也保护了项目的监控运行。而频控又是…

幻兽帕鲁(1.5.0)可视化管理工具(0.5.7 docker版)安装教程

文章目录 局域网帕鲁服务器部署教程帕鲁服务可视化工具安装配置服务器地址&#xff08;可跳过&#xff09;使用工具管理面板 1.5.0服务端RCON错误1.5.0服务端无法启动RCON端口 解决方法第一步&#xff1a;PalWorldSettings.ini配置第二步&#xff1a;修改PalServer.sh配置 局域…
最新文章