7 Feb 2024
About
Need to quickly delete all empty kafka topics with a SASL config? This is a (meh) way.
Steps
Extract the keystore
Look in config file for info
Grab all values in "<< >>" for later use from FIXME PATH
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="<<SOMEUSERNAME>>" password="<<SOMEPASSWORD0>>";
sasl.mechanism=<<SCRAM-SHA-512>>
security.protocol=<<SASL_SSL>>
ssl.keystore.location=<<JKS PATH>>
ssl.keystore.password=<<SOMEPASSWORD1>>
ssl.key.password=<<SOMEPASSWORD2>>
Extract CA/Cert/Key From Keystore
The keystore is located in the <<JKS PATH>> from the config file.
keytool -importkeystore -srckeystore <<JKS PATH>> -destkeystore keystore.p12 -srcstoretype jks -deststoretype pkcs12
openssl pkcs12 -in keystore.p12 -nokeys -out ca.pem
cp ca.pem cert.pem
openssl pkcs12 -in keystore.p12 -nodes -nocerts -out private_key.pem
Install script deps
You'll need to install `apt install python3-confluent-kafka` and `apt install python3-kafka` via apt, dnf, and/or pip3.
apt install python3-confluent-kafka python3-kafka
The script
Fill in the script into a file called `delete_empty_topcs.py`
# Delete empty topics
# TODO: Stop using two different clients!
from kafka import KafkaConsumer
from kafka import KafkaAdminClient
from confluent_kafka import Consumer, TopicPartition
from concurrent.futures import ThreadPoolExecutor
import kafka
import ssl
import logging
import os,sys
import certifi
logging.basicConfig(level=logging.INFO)
topic = "<<your name or other identifier>>-test"
sasl_mechanism = "<<SCRAM-SHA-512>>"
username = "<<SOMEUSERNAME>>"
password = "<<SOMEPASSWORD0>>"
security_protocol = "<<SASL_SSL>>"
broker = '<<BROKER HOST>>:<<BROKER PORT>>'
conf = {
'bootstrap.servers': broker,
'security.protocol': security_protocol,
'sasl.mechanisms': sasl_mechanism,
'sasl.username': username,
'sasl.password': password,
'ssl.key.location': 'private_key.pem',
'ssl.ca.location': 'ca.pem',
'ssl.certificate.location': 'cert.pem',
"group.id": "<<your name or other identifier>>-test",
}
consumer = Consumer(conf)
a = KafkaAdminClient(bootstrap_servers=broker,
#api_version=(0, 10),
security_protocol=security_protocol,
#ssl_context=context,
#ssl_cafile='ca.pem',
sasl_mechanism = sasl_mechanism,
sasl_plain_username = username,
sasl_plain_password = password,
#ssl_certfile='cert.pem',
ssl_check_hostname=False,
ssl_keyfile='private_key.pem')
c = KafkaConsumer(bootstrap_servers=broker,
#api_version=(0, 10),
security_protocol=security_protocol,
#ssl_context=context,
#ssl_cafile='ca.pem',
sasl_mechanism = sasl_mechanism,
sasl_plain_username = username,
sasl_plain_password = password,
#ssl_certfile='cert.pem',
ssl_check_hostname=False,
ssl_keyfile='private_key.pem')
def get_partition_size(topic_name: str, partition_key: int):
topic_partition = TopicPartition(topic_name, partition_key)
low_offset, high_offset = consumer.get_watermark_offsets(topic_partition)
partition_size = high_offset - low_offset
return partition_size
def get_topic_size(topic_name: str):
topic = consumer.list_topics(topic=topic_name)
partitions = topic.topics[topic_name].partitions
workers, max_workers = [], len(partitions) or 1
with ThreadPoolExecutor(max_workers=max_workers) as e:
for partition_key in list(topic.topics[topic_name].partitions.keys()):
job = e.submit(get_partition_size, topic_name, partition_key)
workers.append(job)
topic_size = sum([w.result() for w in workers])
return topic_size
count_ttl=0
count_empty=0
count_tp=0
for topic in c.topics():
count_ttl+=1
if topic.startswith("<< SOME PREFIX>>"): # Update with topic check here
count_tp+=1
if get_topic_size(topic)==0:
count_empty+=1
print(f"Deleting topic {topic}")
a.delete_topics(topics=[topic,])
print(f"Deleted topic {topic}")
#print(f"Topic is empty {topic}")
print(f"Total={count_ttl} TP={count_tp} Empty={count_empty}")
Run The Script
python3 delete_empty_topcs.py