欢迎访问 生活随笔!

生活随笔

当前位置: 首页 >

Apache Kafka API AdminClient Scram账户的创建与删除

发布时间:2023/12/14 37 豆豆
生活随笔 收集整理的这篇文章主要介绍了 Apache Kafka API AdminClient Scram账户的创建与删除 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

前言

由于Apache官方一直没有提供AdminClient中对账户这一块的操作,因此这部分大多数时候都是用命令行去操作的,但是命令行毕竟不是很方便。为了解决这部分问题,笔者去读了Kafka Scala的源码,从中梳理出来这部分内容供给大家参考。重要:如果你的版本升级到2.7.0及其以上,请参考【Apache Kafka API AdminClient Scram账户的操作(增删改查)】。更多内容请点击【Apache Kafka API AdminClient 目录】。

Scala版本

为了操作Scala源码,必须有相应版本的包,怎么看你的Scala版本呢?这个就是在Kafka核心包的<artifactId>键值对里面,如下kafka_2.13后面对应的2.13就是Scala的版本,这个2.13版本同样也是Kafka官方推荐使用的版本,因此我们也就以这个版本为例子去操作账户的创建与删除。要提醒的是如果你使用的版本是Scala 2.12大概率会报错。

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId> <!--此处对应的就是Scala版本--><version>2.7.0</version> </dependency>

获取JAAS认证文件

在开始之前首先得知道什么是Scram账户认证,如果不清楚建议参考【Kafka 如何给集群配置Scram账户认证】,这篇帖子里对Scram认证以及配置有很详细的介绍,这里就不多说废话了,我们已经有了一个kafka-broker-jaas.conf文件用来登陆Zookeeper。

Client{org.apache.zookeeper.server.auth.DigestLoginModule requiredusername="kafka"password="kafka1234"; };

为什么要登陆Zookeeper呢?因为Kafka把认证机制做到了Zookeeper里,而要操作这些需要一个Zookeeper的登陆认证。在这里Zookeeper相当于一个分布式注册中心,随者Kafka的不断升级,Kafka官方也在不断地减少对Zookeeper的依赖。截止到2.7.0版本,当使用命令行创建账户的时候就会收到提示说未来版本可能会下线--zookeeper参数,转而使用参数--broker.server,但是目前还是兼容的。而且Kafka API的2.7.0版本,似乎也引入了操作Scram的API,但是之前的版本还是需要操作Zookeeper,因此我们还是需要这样进行账号的操作。希望2.7.0版本的Scram API会比较好用吧,等笔者弄明白了再分享出来。言归正传,我们可以通过下面几行代码先行把登陆Zookeeper的认证文件kafka-broker-jaas.conf加载到系统中来。

static {//获取文件路径,这里笔者使用的是项目路径,也可以用绝对路径,目的是访问到文件,什么方法自由选择String path = KafkaCreateUser.class.getClass().getResource("/").getPath();//拿到文件对象File f = new File(path+"kafka-broker-jaas.conf");//存储到系统参数对象中,以备后续使用System.setProperty("java.security.auth.login.config", f.getAbsolutePath()); }

使用Scala方法创建账户

获取了文件对象以后,就可以调用Scala中的方法了,我们主要调用的方法是AdminZkClient.changeConfigs(entityType: String, entityName: String, configs: Properties)方法,这个语法是Scala中的语法,有点类似Java,我们直接用就可以了,Sample如下。

public void createAccount() throws NoSuchAlgorithmException {//获取ZookeeperClient对象,这里的/kafka是笔者建了一个zk上的目录,如果直接用192.168.33.101:2181ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//获取Kafka Scala AdminZkClient对象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);//构造PropertiesProperties properties=new Properties();//构造Scram认证机制ScramMechanism为SCRAM_SHA_512ScramMechanism scramMechanism=ScramMechanism.valueOf("SCRAM_SHA_512");//构造Scram证书credential,这里"password_1234"就是真实的密码ScramCredential credential = new ScramFormatter(scramMechanism).generateCredential("password_1234", scramMechanism.minIterations());//转化为认证串String credentialString=ScramCredentialUtils.credentialToString(credential);//添加到properties中备用properties.put(scramMechanism.mechanismName(),credentialString);//创建名为kaf_aaa的账户,并且把properties传递进去,那么kaf_aaa账户的密码就是上面设置的password_1234字符串adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",properties); }

使用Scala方法查询存在的账户

和创建一样也需要使用Scala中的方法,这次使用的是依然是AdminZkClient类中的方法,一个是fetchEntityConfig(rootEntityType: String, sanitizedEntityName: String)去查找指定的账户信息,其次是用fetchAllEntityConfigs(entityType: String)查找Kafka服务器中所有的账户信息,Sample如下。

public void findAccount() {//获取ZookeeperClient对象ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//获取Kafka Scala AdminZkClient对象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);//为了查看结果,构造一个Properties对象用来承接返回值Properties properties=new Properties();//指定账号查询properties=adminZkClient.fetchEntityConfig(ConfigType.User(),"kaf_aaa");//构建一个接收参数Map<String, Properties> propertiesAll=new HashMap<>();//查询所有信息propertiesAll =adminZkClient.fetchAllEntityConfigs(ConfigType.User()); }

使用Scala方法删除账户

说完创建和查找,那就剩删除了。删除对象其实分为两步,第一步清空Kafka集群上保存的信息,第二部删除Zookeeper上对应的节点。清空信息用的还是changeConfigs()方法,删除节点用的则是Zookeeper包里的delete(final String path, int version)方法,Sample如下。

public void deleteAccount() throws InterruptedException, KeeperException {//获取ZookeeperClient对象ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//获取Kafka Scala AdminZkClient对象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",new Properties()); //获取Zookeeper对象ZooKeeper zooKeeper=kafkaZkClient.currentZooKeeper();//调用delete方法把对应的节点删除,版本设置为-1是不检测版本号//注意"/config/users/"这个路径是固定的,Zk里面就是这样存的,"kaf_aaa"是自己拼的账号名字zooKeeper.delete("/config/users/" +"kaf_aaa", -1); }

总结

到此Kafka Scram账户相关的操作告一段落,如果想要删除账户同时清除账号下的权限,可以参考【Apache Kafka API AdminClient 账号对Topic权限赋予与移除】,自己做一个循环删除即可。

附:完整的Sample和注释

public class KafkaUserOperation {//加载zookeeper sasl机制授权登陆的配置文件static {String path = KafkaUserOperation.class.getClass().getResource("/").getPath();File f = new File(path+"zk-client-jaas.conf");System.setProperty("java.security.auth.login.config", f.getAbsolutePath());}public void createAccount() throws NoSuchAlgorithmException {//获取ZookeeperClient对象ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//获取Kafka Scala AdminZkClient对象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);//构造PropertiesProperties properties=new Properties();//构造Scram认证机制ScramMechanism为SCRAM_SHA_512ScramMechanism scramMechanism=ScramMechanism.valueOf("SCRAM_SHA_512");//构造Scram证书credential,这里"password_1234"就是真实的密码ScramCredential credential = new ScramFormatter(scramMechanism).generateCredential("password_1234", scramMechanism.minIterations());//转化为认证串String credentialString=ScramCredentialUtils.credentialToString(credential);//添加到properties中备用properties.put(scramMechanism.mechanismName(),credentialString);//创建名为kaf_aaa的账户,并且把properties传递进去,那么kaf_aaa账户的密码就是上面设置的password_1234字符串adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",properties);}public void findAccount() {//获取ZookeeperClient对象ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//获取Kafka Scala AdminZkClient对象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);//为了查看是否成功,构造一个Properties对象用来承接返回值Properties properties=new Properties();//指定账号查询properties=adminZkClient.fetchEntityConfig(ConfigType.User(),"kaf_aaa");Map<String, Properties> proAll=new HashMap<>();//查询所有信息proAll=adminZkClient.fetchAllEntityConfigs(ConfigType.User());}public void deleteAccount() throws InterruptedException, KeeperException {//获取ZookeeperClient对象ZooKeeperClient zooKeeperClient=new ZooKeeperClient("192.168.33.101:2181/kafka",30000, 30000, Int.MaxValue(), Time.SYSTEM,"","");//通过ZookeeperClient对象和JDK自带的JaasUtils加载Sasl认证规则KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSaslEnabled(), Time.SYSTEM);//获取Kafka Scala AdminZkClient对象AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);adminZkClient.changeConfigs(ConfigType.User(),"kaf_aaa",new Properties()); //获取Zookeeper对象ZooKeeper zooKeeper=kafkaZkClient.currentZooKeeper();//调用delete方法把对应的节点删除,版本设置为-1是不检测版本号zooKeeper.delete("/config/users/"+"kaf_aaa", -1);} }

总结

以上是生活随笔为你收集整理的Apache Kafka API AdminClient Scram账户的创建与删除的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。