Build Consumer
Kafka supports multiple client languages such as Java, C++, Python, Go, .NET, and JMS. In this tutorial, we will focus on using Kafka with Python. For tutorials on other languages, please refer to this documentation.
Next, create the Python consumer application by pasting the following code into a file consumer.py
.
#!/usr/bin/env python
from confluent_kafka import Consumer
if __name__ == '__main__':
config = {
# User-specific properties that you must set
'bootstrap.servers': '<BOOTSTRAP SERVERS>',
'sasl.username': '<USERNAME>',
'sasl.password': '<PASSWORD>',
# Fixed properties
'security.protocol': 'sasl_ssl',
'sasl.mechanisms': 'SCRAM-SHA-512',
'group.id': 'my_consumer_group',
'auto.offset.reset': 'earliest',
'ssl.endpoint.identification.algorithm': 'none',
}
# Create Consumer instance
consumer = Consumer(config)
# Subscribe to topic
topic = "purchases"
consumer.subscribe([topic])
# Poll for new messages from Kafka and print them.
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
# Initial message consumption may take up to
# `session.timeout.ms` for the consumer group to
# rebalance and start consuming
print("Waiting...")
elif msg.error():
print("ERROR: %s".format(msg.error()))
else:
# Extract the (optional) key and value, and print.
print("Consumed event from topic {topic}: key = {key:12} value = {value:12}".format(
topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))
except KeyboardInterrupt:
pass
finally:
# Leave group and commit final offsets
consumer.close()
Fill approriate values in the config section .
After that, you can execute this file on the public IP (port 9092) and the private port (port 9094) to consume messages from the topic.
Example:
Using Private Ip address:
bootstrap.servers = 10.16.202.6:9092,10.16.202.7:9092,10.16.202.8:9092
Using Public IP address:
bootstrap.servers = e2e-87-237.ssdcloudindia.net:9094,e2e-87-246.ssdcloudindia.net:9094,e2e-88-5.ssdcloudindia.net:9094