Build Producer

Kafka supports multiple client languages such as Java, C++, Python, Go, .NET, and JMS. In this tutorial, we will focus on using Kafka with Python. For tutorials on other languages, please refer to this documentation.

Let’s create the Python producer application by pasting the following code into a file producer.py.

#!/usr/bin/env python

from random import choice
from confluent_kafka import Producer

if __name__ == '__main__':

    config = {
        # User-specific properties that you must set
        'bootstrap.servers': '<BOOTSTRAP SERVERS>',
        'sasl.username':     '<USERNAME>',
        'sasl.password':     '<PASSWORD>',

        # Fixed properties
        'security.protocol': 'sasl_ssl',
        'sasl.mechanisms':   'SCRAM-SHA-512',
        'ssl.ca.location': '<PATH/TO/CERTIFICATE>'
        'ssl.endpoint.identification.algorithm': 'none',
    }

    # Create Producer instance
    producer = Producer(config)

    # Optional per-message delivery callback (triggered by poll() or flush())
    # when a message has been successfully delivered or permanently
    # failed delivery (after retries).
    def delivery_callback(err, msg):
        if err:
            print('ERROR: Message failed delivery: {}'.format(err))
        else:
            print("Produced event to topic {topic}: key = {key:12} value = {value:12}".format(
                topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))

    # Produce data by selecting random values from these lists.
    topic = "purchases"
    user_ids = ['eabara', 'jsmith', 'sgarcia', 'jbernard', 'htanaka', 'awalther']
    products = ['book', 'alarm clock', 't-shirts', 'gift card', 'batteries']

    count = 0
    for _ in range(10):
        user_id = choice(user_ids)
        product = choice(products)
        producer.produce(topic, product, user_id, callback=delivery_callback)
        count += 1

    # Block until the messages are sent.
    producer.poll(10000)
    producer.flush()

Fill approriate values in the config section .

After that, you can execute this file on the public IP (port 9094) and the private IP (port 9092) to produce messages to the topic.

Example:

  • Using Public Ip address :
    • bootstrap.servers = e2e-87-237.ssdcloudindia.net:9094,e2e-87-246.ssdcloudindia.net:9094,e2e-88-5.ssdcloudindia.net:9094

  • Using Private Ip address :
    • bootstrap.servers = 10.16.202.6:9092,10.16.202.7:9092,10.16.202.8:9092