I am creating a python script to make Kafka ACL manipulation easier. I am not using zookeeper. I am trying to run this command to remove ACLs from consumer groups:
$KH/kafka-acls.sh --bootstrap-server $BS --command-config $KP --remove --allow-principal User:TEST_CONSUMER --consumer --group TEST_GROUP1.CG --topic TEST_TOPIC.TOPIC
The command asks for confirmation for each ACL and it works when running it manually. However, if I try to run the same command with a python script, it gives this error:
python3 temp.py
Error encountered while running command. Return code: 1
Output:
Are you sure you want to remove ACLs:
(principal=User:TEST_CONSUMER, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:TEST_CONSUMER, host=*, operation=READ, permissionType=ALLOW)
from resource filter `ResourcePattern(resourceType=TOPIC, name=TEST_TOPIC.TOPIC, patternType=LITERAL)`? (y/n)
Are you sure you want to remove ACLs:
(principal=User:TEST_CONSUMER, host=*, operation=READ, permissionType=ALLOW)
from resource filter `ResourcePattern(resourceType=GROUP, name=TEST_GROUP1.CG, patternType=LITERAL)`? (y/n)
Error while executing ACL command: null
java.lang.NullPointerException
at kafka.admin.AclCommand$.kafka$admin$AclCommand$$confirmAction(AclCommand.scala:480)
at kafka.admin.AclCommand$AdminClientService.$anonfun$removeAcls$3(AclCommand.scala:131)
at kafka.admin.AclCommand$AdminClientService.$anonfun$removeAcls$3$adapted(AclCommand.scala:126)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
at scala.collection.AbstractIterable.foreach(Iterable.scala:926)
at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:896)
at kafka.admin.AclCommand$AdminClientService.$anonfun$removeAcls$1(AclCommand.scala:126)
at kafka.admin.AclCommand$AdminClientService.removeAcls(AclCommand.scala:123)
at kafka.admin.AclCommand$.main(AclCommand.scala:75)
at kafka.admin.AclCommand.main(AclCommand.scala)
This is a test script that just executes the command and that gives the mentioned error:
import subprocess
command = "$KH/kafka-acls.sh --bootstrap-server $BS --command-config $KP --remove --allow-principal User:TEST_CONSUMER --consumer --group TEST_GROUP1.CG --topic TEST_TOPIC.TOPIC"
try:
proc = subprocess.Popen(command, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
proc.stdin.write('y\n')
proc.stdin.flush()
stdout, stderr = proc.communicate()
print(f"stdout: {stdout}")
print(f"stderr: {stderr}")
except Exception as e:
print(f"Command execution failed: {e}")
sys.exit(1)
What could be the issue? Will appreciate any useful answers!
--force
option, to bypass all prompts - that might avoid the problem, whatever it might actually be.