Build Consumer

Kafka supports multiple client languages such as Java, C++, Python, Go, .NET, and JMS. In this tutorial, we will focus on using Kafka with Python. For tutorials on other languages, please refer to this documentation.

Next, create the Python consumer application by pasting the following code into a file consumer.py .

#!/usr/bin/env python

from confluent_kafka import Consumer

if __name__ == '__main__':

    config = {
        # User-specific properties that you must set
        'bootstrap.servers': '<BOOTSTRAP SERVERS>',
        'sasl.username':     '<USERNAME>',
        'sasl.password':     '<PASSWORD>',

        # Fixed properties
        'security.protocol': 'sasl_ssl',
        'sasl.mechanisms':   'SCRAM-SHA-512',
        'group.id':          'my_consumer_group',
        'auto.offset.reset': 'earliest',
        'ssl.endpoint.identification.algorithm': 'none',
    }

    # Create Consumer instance
    consumer = Consumer(config)

    # Subscribe to topic
    topic = "purchases"
    consumer.subscribe([topic])

    # Poll for new messages from Kafka and print them.
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                # Initial message consumption may take up to
                # `session.timeout.ms` for the consumer group to
                # rebalance and start consuming
                print("Waiting...")
            elif msg.error():
                print("ERROR: %s".format(msg.error()))
            else:
                # Extract the (optional) key and value, and print.
                print("Consumed event from topic {topic}: key = {key:12} value = {value:12}".format(
                    topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))
    except KeyboardInterrupt:
        pass
    finally:
        # Leave group and commit final offsets
        consumer.close()

Fill approriate values in the config section .

After that, you can execute this file on the public IP (port 9092) and the private port (port 9094) to consume messages from the topic.

Example:

  • Using Public Ip address :
    • bootstrap.servers = e2e-87-237.ssdcloudindia.net:9094,e2e-87-246.ssdcloudindia.net:9094,e2e-88-5.ssdcloudindia.net:9094

  • Using Private Ip address :
    • bootstrap.servers = 10.16.202.6:9092,10.16.202.7:9092,10.16.202.8:9092