Skip to main content

Python

  1. Navigate to your project directory.

  2. Create a virtual environment and activate it.

  3. Install boto3 in your virtual environment:

pip install boto3
  1. Make a connection with the EQS client.

To make the connection with the EQS client , you need to pass configurations and credentials.

Configurations-


region_name- your_region
endpoint_url - eqs_endpoint_url (provided by e2e networks eqs service)


Credentials-


aws_access_key_id - your_access_key_id
aws_secret_key_id - your_secret_key_id

Make connection with EQS

You can make the connection with the EQS client in three ways:

a) You can directly pass the credentials and configurations at the time of making a session with the EQS client.

note

Hardcoded credentials in the application are not recommended.


import boto3

session = boto3.Session()

eqs_client=session.client(
“sqs”,
aws_access_key_id = your_access_key_id,
aws_secret_access_key = your_secret_key,
region_name = your_region,
endpoint_url = eqs_endpoint_url
)

You can also pass credentials and configurations inside boto3.Session() instead of session.client()

b) you can put credentials and configurations in the config and credentials file , and these files should be located at ~/.aws/config, ~/.aws/credentials location.

put into ~/.aws/config file

[default]
region =REGION

put into ~/.aws/credentials file


[default]
aws_access_key_id = ACCESS_KEY_ID
aws_secret_access_key = SECRET_KEY

import boto3

session = boto3.Session()

eqs_client=session.client(
“sqs”,
endpoint_url = EQS_ENDPOINT_URLl
)

This will include configurations and credentials in the client by default.

c) you can make the connection with the EQS client by passing the credentials and configuration into the environment variables file.

To install the python dotenv package, you can use the command


pip install python-dotenv

using terminal (Linux, OS X or Unix)


$ export AWS_ACCESS_KEY_ID = your_access_key_id
$ export AWS_SECRET_ACCESS_KEY = your_secret_key
$ export AWS_DEFAULT_REGION = your_eqs_region

using terminal (Windows)


> set AWS_ACCESS_KEY_ID = your_access_key_id
> set AWS_SECRET_ACCESS_KEY = your_secret_key
> set AWS_DEFAULT_REGION = your_eqs_region

or, edit your enviourment valiable file


AWS_ACCESS_KEY_ID = your_access_key_id
AWS_SECRET_ACCESS_KEY = your_secret_access_key
AWS_DEFAULT_REGION = your_eqs_region

import boto3
import os
from dotenv import load_dotenv

load_dotenv()

session = boto3.Session(aws_access_key_id = os.environ.get(“AWS_ACCESS_KEY_ID”),
aws_secret_access_key = os.environ.get(“AWS_SECRET_ACCESS_KEY”),
region_name = os.environ.get(“AWS_DEFAULT_REGION”)
)

eqs_client=session.client(
“sqs”,
endpoint_url = EQS_ENDPOINT_URL
)

above eqs_client is an object which has all methods related to queue operations.

response of session.client()

Make Connection Boto3

Methods available in eqs_client

Change VisibilityTimeout

Request Syntax

response = eqs_client.change_message_visibility(
QueueUrl='string',
ReceiptHandle='string',
VisibilityTimeout=123
)

Parameters

QueueUrl (string)- [Required]
The URL of the E2E EQS queue whose message's visibility is changed.

ReceiptHandle (string) - [Required]
The receipt handle is associated with the message whose visibility timeout is changed. This parameter is returned by the ReceiveMessage action

VisibilityTimeout (integer) - [Required] The new value for the message's visibility timeout (in seconds)

Response Syntax


None

Tag Queue Boto3

Change VisibilityTimeout in batch

Request Syntax


response = eqs_client.change_message_visibility_batch(
QueueUrl="string",
Entries=[
{"Id": "string", "ReceiptHandle": "string", "VisibilityTimeout": 123},
],
)

Parameters

QueueUrl (string) - [Required]
The URL of the E2E EQS queue whose message's visibility is changed.

Entries (list) - [Required] (Dictionary)

id [Required]-(string) An identifier for this particular receipt handle used to communicate the result

ReceiptHandle [Required] -(string) A receipt handle

VisibilityTimeout - (string) The new value (in seconds) for the message’s visibility timeout

Response Syntax

	{
"Successful": [{"Id": "string"}, ],
"Failed": [
{
"Id": "string",
"SenderFault": True | False,
"Code": "string",
"Message": "string",
},
],
}

Change visibility

Close connection

Request Syntax


eqs_client.close()

Response Syntax


None

Create queue

Request Syntax


response = eqs_client.create_queue(
QueueName="string",
Attributes={"string": "string"},
tags={"string": "string"}
)

Parameters

QueueName (string)- [Required] 1 - A queue name can have up to 80 characters

2 - Valid values: alphanumeric characters, hyphens ( -), and underscores ( _).

Attributes (dictionary)- Attributes could be

1 - DelaySeconds - Time(second), to which extent delivery of messages is delayed in the queue

Valid values - 0 to 900 (second)

default - 0 (second)

2 - MaximumMessageSize - The size of the message after which the queue will reject the message

Valid values - 1,024 bytes (1 KiB) to 262,144 bytes (256 KiB)

default - 262,144 bytes(256 KiB)

3 - MessageRetentionPeriod - Time to which extent a queue will retain a message

Valid values - 60 seconds (1 minute) to 1,209,600 seconds (14 days)

default - 345,600 (4 days)

4 - ReceiveMessageWaitTimeSeconds - Time to which extent ReceiveMessage action waits before receiving a message

Valid values - 0 to 20 (seconds)

default - 0 (second)

5 - VisibilityTimeout - The amount of time that a message in a queue is invisible to other consumers after a consumer retrieves it.

Valid values - 0 to 43,200 seconds (12 hours)

default - 30 second

tags (dictionary ) -

1 - Adding more than 50 tags to a queue isn't recommended.

2 - A new tag with a key identical to that of an existing tag overwrites the existing tag.

Response Syntax


{ 'QueueUrl': 'string' }

make queue boto3

Delete message

Request Syntax


response = eqs_client.delete_message(
QueueUrl='string',
ReceiptHandle='string'
)

Parameters

QueueUrl (string) - [Required] The URL of the E2E EQS queue from which messages are deleted.

ReceiptHandle (string) - [Required] The receipt handle is associated with the message to delete.

Response Syntax


None

delete_message_boto3

Delete message in batch

Request Syntax


response = eqs_client.delete_message_batch(
QueueUrl="string",
Entries=[
{"Id": "string", "ReceiptHandle": "string"},
],
)

Parameters

QueueUrl (string) - [Required] The URL of the E2E EQS queue from which messages are deleted.

Entries (list) - [Required] (Dictionary) id [Required]-(string) An identifier for this particular receipt handle used to communicate the result ReceiptHandle [Required] -(string) A receipt handle

Response Syntax


{
"Successful": [{"Id": "string"}, ],
"Failed": [
{
"Id": "string",
"SenderFault": True | False,
"Code": "string",
"Message": "string",
},
],
}

delete_message_queue_batch

Delete queue

Request Syntax


response = eqs_client.delete_queue(
QueueUrl='string'
)

Parameters

QueueUrl (string) - [Required]

The URL of the E2E EQS queue to delete.

Response Syntax


None

Get queue attributes

Request Syntax


response = eqs_client.get_queue_attributes(
QueueUrl="string",
AttributeNames=[ "All", ],
)

Parameters

QueueUrl (string) - [Required] The URL of the E2E EQS queue from which attributes are get.

AttributeNames (list) - [Optional] A list of attributes for which to retrieve information

All - it specifies, returns all values

you can also specify specific attributes in the list

As - ['VisibilityTimeout' , 'MaximumMessageSize' , 'MessageRetentionPeriod' , 'ApproximateNumberOfMessages' ,'ApproximateNumberOfMessagesNotVisible' , 'CreatedTimestamp' , 'LastModifiedTimestamp' , 'QueueArn' , 'ApproximateNumberOfMessagesDelayed' , 'DelaySeconds' , 'ReceiveMessageWaitTimeSeconds']

Response Syntax


{
"Attributes": {"string": "string"}
}

get_queue_attributes_boto3

Get queue url

Request Syntax

	response = eqs_client.get_queue_url(QueueName='string',)

Parameters

QueueName (string) -[Required] The name of the queue whose URL must be fetched.

Response Syntax


{'QueueUrl': 'string'}

queue_url_boto3

List queue tags

Request Syntax


response = eqs_client.list_queue_tags(QueueUrl='string')

Parameters

QueueUrl (string) - [Required] The URL of the E2E EQS queue of which all the tags must be listed.

Response Syntax


{
'Tags': {'string': 'string'}
}

list_queue_tags_boto3

List queues

Request Syntax


response = eqs_client.list_queues(
QueueNamePrefix='string',
MaxResults=123
)

Parameters QueueNamePrefix (string) - [Optional] A string to use for filtering the list results. Only those queues whose name begins with the specified string are returned.

MaxResults (integer) - [Optional] The maximum number of results to include in the response. Valid values - 1 to 1000

Response Syntax


{
"QueueUrls": ["string", ],
}

list_queues_boto3

Purge queue

delete all the messages from queue.

Request Syntax


response = eqs_client.purge_queue(QueueUrl='string')

Parameters

QueueUrl (string) - [Required] The URL of the E2E EQS queue of which all the messages must be deleted.

Response Syntax

	None

purge_queue_boto3.png

Receive message

Request Syntax

::

response = eqs_client.receive_message( QueueUrl="string", AttributeNames=[All], MessageAttributeNames=[ "string", ], MaxNumberOfMessages=2, VisibilityTimeout=25, WaitTimeSeconds=10, ReceiveRequestAttemptId="string", )

Parameters

QueueUrl (string) - [Required] The URL of the E2E EQS queue from which messages are received.

AttributeNames (list) - A list of attributes that need to be returned along with each message.

All - it specifies, returns all values

you can also specify specific attributes in the list

As - ['VisibilityTimeout' , 'MaximumMessageSize' , 'MessageRetentionPeriod' , 'ApproximateNumberOfMessages' ,'ApproximateNumberOfMessagesNotVisible' , 'CreatedTimestamp' , 'LastModifiedTimestamp' , 'QueueArn' , 'ApproximateNumberOfMessagesDelayed' , 'DelaySeconds' , 'ReceiveMessageWaitTimeSeconds']

MessageAttributeNames (list) - The name of the message attribute.

MaxNumberOfMessages (integer) - The maximum number of messages to return.

Valid range - 1 to 10

default - 1

VisibilityTimeout (integer) - The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request.

WaitTimeSeconds (integer) - The duration (in seconds) for which the call waits for a message to arrive in the queue before returning.

Response Syntax


{
"Messages": [
{
"MessageId": "string",
"ReceiptHandle": "string",
"MD5OfBody": "string",
"Body": "string",
"Attributes": {"string": "string"},
"MD5OfMessageAttributes": "string",
"MessageAttributes": {
"string": {
"StringValue": "string",
"BinaryValue": b"bytes",
"DataType": "string",
}
},
},
]
}

receive_message_boto3

Send message

Request Syntax


response = eqs_client.send_message(
QueueUrl="string",
MessageBody="string",
DelaySeconds=123,
MessageAttributes={
"string": {
"StringValue": "string",
"BinaryValue": b"bytes",
"DataType": "string",
}
},
MessageSystemAttributes={
"string": {
"StringValue": "string",
"BinaryValue": b"bytes",
"DataType": "string",
}
},
)

Parameters

QueueUrl (string) - [Required] The URL of the E2E EQS queue to which messages are send.

MessageBody (string) - [Required] The message to send. The minimum size is one character. The maximum size is 256 KB.

DelaySeconds (integer) - The length of time, in seconds, for which to delay a specific message.

MessageAttributes (dictionary) - Each message attribute consists of a Name, Type, and Value .

Name, type, value and the message body must not be empty or null .

(string) - name of the message attributes

(dictionary) - Value of message attributes

1 - StringValue (string) - Strings are Unicode with UTF-8 binary encoding.

2 - BinaryValue (bytes) - Binary type attributes can store any binary data, such as compressed data, encrypted data, or images.

(string) - [Required] Type of message attributes

EQS support the following data type

Binary, Number, String

MessageSystemAttributes (dictionary) - Each message system attribute consists of a Name, Type, and Value.

The name, type, value and message body must not be empty or null.

(string) - the name of the message attributes

(dictionary) - Value of message attributes

1 - StringValue (string) - Strings are Unicode with UTF-8 binary encoding.

2 - BinaryValue (bytes) - Binary type attributes can store any binary data, such as compressed data, encrypted data, or images.

(string) - Type of message attributes

EQS support the following data type

Binary , Number , String

Response Syntax


{
'MD5OfMessageBody': 'string',
'MD5OfMessageAttributes': 'string',
'MD5OfMessageSystemAttributes': 'string',
'MessageId': 'string',
'SequenceNumber': 'string'
}

receive_message_boto3

Send message in batch

Request Syntax


response = eqs_client.send_message_batch(
QueueUrl="string",
Entries=[
{
"Id": "string",
"MessageBody": "string",
"DelaySeconds": 123,
"MessageAttributes":
{
"string": {
"StringValue": "string",
"BinaryValue": b"bytes",
"DataType": "string",
}
},
"MessageSystemAttributes":
{
"string": {
"StringValue": "string",
"BinaryValue": b"bytes",
"DataType": "string",
}
},
},
],
)

Parameters

QueueUrl (string) - [Required] The URL of the E2E EQS queue to which messages are sent.

Entries (list) - [Required] A list of SendMessageBatchRequestEntry items.

(Dict) -

Contains the details of a single E2E EQS message along with an Id.

Id (string) - [REQUIRED] An identifier for a message in this batch is used to communicate the result.

MessageBody (string) - [Required] The message to send. The minimum size is one character. The maximum size is 256 KB.

DelaySeconds (integer) - The length of time, in seconds, for which to delay a specific message.

MessageAttributes (dictionary) - Each message attribute consists of a Name, Type, and Value.

The Name, type, value and message body must not be empty or null.

(string) - The name of the message attributes

(Dictionary) - Value of message attributes

1 - StringValue (string) - Strings are Unicode with UTF-8 binary encoding.

2 - BinaryValue (bytes) - Binary type attributes can store any binary data, such as compressed data, encrypted data, or images.

(string) -[Required] Type of message attributes

EQS support the following data type

Binary, Number, String

MessageSystemAttributes (dictionary) - Each message system attribute consists of a Name, Type, and Value.

Name, type, value and the message body must not be empty or null.

(string) - The name of the message attributes

(Dictionary) - Value of message attributes

1 - StringValue (string) - Strings are Unicode with UTF-8 binary encoding.

2 - BinaryValue (bytes) - Binary type attributes can store any binary data, such as compressed data, encrypted data, or images.

(string) - Type of message attributes

EQS support following data type

Binary , Number , String

Response Syntax

	{
"Successful": [
{
"Id": "string",
"MessageId": "string",
"MD5OfMessageBody": "string",
"MD5OfMessageAttributes": "string",
"MD5OfMessageSystemAttributes": "string",
"SequenceNumber": "string",
},
],
"Failed": [
{
"Id": "string",
"SenderFault": True | False,
"Code": "string",
"Message": "string",
},
],
}

send_message_batch_boto3

Set queue Attributes

Request Syntax


response = eqs_client.set_queue_attributes(
QueueUrl='string',
Attributes={
'string': 'string'
}
)

Parameters

QueueUrl (string) - [Required] The URL of the E2E EQS queue of which attributes should be set.

Attributes (Dictionary) : [Required] A map of attributes to set.

Response Syntax


None

tag_queue_boto3

Tag queue

Request Syntax

	response = eqs_client.tag_queue(
QueueUrl='string',
Tags={'string': 'string', }
)

Parameters

QueueUrl (string) - [Required] The URL of the E2E EQS queue.

Tags (Dict) - [Required] The list of tags to be added to the specified queue.

Response Syntax


None

tag_queue_boto3

Untag queue

Request Syntax


response = eqs_client.untag_queue(
QueueUrl="string",
TagKeys=["string", ],
)

Parameters

QueueUrl (string) - [Required] The URL of the E2E EQS queue.

TagKeys (list) - [Required] The list of tags to be removed from the specified queue.

Response Syntax


None

untag_queue_boto3

Reference code


import boto3


ACCESS_KEY = "your_access_key_id"
SECRET_KEY = "your-secret-key"
SERVICE_NAME = "sqs"
REGION_NAME = "your_region"


class EqsClient:
# make a connection with the E2E EQS client
def make_connection(self):
try:
session = boto3.Session()
eqs_client = session.client(
SERVICE_NAME,
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
region_name=REGION_NAME,
endpoint_url="your_eqs_endpoint_url",
)
except:
print(
"problem in making connection with eqs client , please check your credentials"
)
return eqs_client

# change message visibility timeout of the message
def change_message_visibility_timeout(
self, eqs_client, queue_url, receipt_handle, visibility_time
):
try:
response = eqs_client.change_message_visibility(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle,
VisibilityTimeout=visibility_time,
)
except:
print(" error in changing visibility timeout ")
return response

# change message visibility timeout of the messages
def change_message_visibility_timeout_batch(
self, eqs_client, queue_url, receipt_handle1, receipt_handle2
):
try:
response = eqs_client.change_message_visibility_batch(
QueueUrl=queue_url,
Entries=[
{
"Id": "1",
"ReceiptHandle": receipt_handle1,
"VisibilityTimeout": 123,
},
{
"Id": "2",
"ReceiptHandle": receipt_handle2,
"VisibilityTimeout": 221,
},
],
)
except:
print(" error in changing message visibility in batch ")
return response

# close connections with E2E EQS endpoint URL
def close_connection(self, eqs_client):
try:
return eqs_client.close()
except:
print("error in closing connection with eqs")

# delete messages
def delete_message_batch(self, eqs_client, queue_url, rh1, rh2):
try:
response = eqs_client.delete_message_batch(
QueueUrl=queue_url,
Entries=[
{"Id": "1", "ReceiptHandle": rh1},
{"Id": "2", "ReceiptHandle": rh2},
],
)
except:
print("error in deleting messages in batch ")
return response

# delete queue
def delete_queue(self, eqs_client, queue_url):
try:
response = eqs_client.delete_queue(QueueUrl=queue_url)
except:
print("failed to delete queue")
return response

# get queue URL
def get_queue_url(self, eqs_client, queue_name):
try:
response = eqs_client.get_queue_url(QueueName=queue_name)
except:
print(
"error in getting url of the queue , please make sure you are using correct queue name"
)
return response

# list all the queues
def list_queues(self, eqs_client):
try:
response = eqs_client.list_queues()
except:
print("error in listing queues")
return response

# list queue tags based on queue url
def list_queue_tags(self, eqs_client, queue_url):
try:
response = eqs_client.list_queue_tags(QueueUrl=queue_url)
except:
print(
"failed to list queue tags, make sure you are using correct queue url"
)
return response

# make queue
def make_queue(self, eqs_client, queue_name):
try:
response = eqs_client.create_queue(QueueName=queue_name)
except:
print("error in making queue")
return response

# get attributes of a queue based on queue url
def get_all_attributes(self, eqs_client, queue_url):
try:
response = eqs_client.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=[
"All",
],
)

except:
print("error in getting all attributes")
return response

# receive a message and delete it from the queue
def process_messeges(self, eqs_client, queue_url):
try:
response = eqs_client.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1,
VisibilityTimeout=30,
WaitTimeSeconds=0,
)
if "Messages" in response:
message = response["Messages"][0]
print("Received message: %s" % message["Body"])
try:
response = eqs_client.delete_message(
QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"]
)
except:
print("failed to delete messagae")
else:
print("No messages in queue")

except:
print("failed to process message")

# delete all the messages from the queue
def purge_queue(self, eqs_client, queue_url):
try:
response = eqs_client.purge_queue(QueueUrl=queue_url)
except:
print("failed to delete all messages from queue")
return response

# push message to a queue
def push_message_queue(self, eqs_client, queue_url, message):
try:
response = eqs_client.send_message(QueueUrl=queue_url, MessageBody=message)
except:
print("failed to push message to queue ")
return response

# send messages to a queue
def send_message_batch(self, eqs_client, queue_url):
try:
response = eqs_client.send_message_batch(
QueueUrl=queue_url,
Entries=[
{
"Id": "1",
"MessageBody": "message1",
"DelaySeconds": 13,
"MessageAttributes": {
"name1": {
"StringValue": "string_message",
"BinaryValue": "bytes_message",
"DataType": "String",
}
},
"MessageSystemAttributes": {
"name_system1": {
"StringValue": "string_system_message",
"BinaryValue": b"bytes_system_message",
"DataType": "String",
}
},
},
{
"Id": "2",
"MessageBody": "message2",
"DelaySeconds": 12,
"MessageAttributes": {
"name2": {
"StringValue": "string_message",
"BinaryValue": b"bytes_message",
"DataType": "String",
}
},
"MessageSystemAttributes": {
"name_system2": {
"StringValue": "string_system_message",
"BinaryValue": b"bytes_system_message",
"DataType": "String",
}
},
},
],
)
except:
print("error in sending messages")
return response

# add tag to a queue
def tag_queue(self, eqs_client, queue_url, tags):
try:
response = eqs_client.tag_queue(QueueUrl=queue_url, Tags=tags)
except:
print("failed to tag queue")
return response

# untag tags form a queue
def untag_queue(self, eqs_client, queue_url, tagkey):
try:
response = eqs_client.untag_queue(
QueueUrl=queue_url,
TagKeys=[
tagkey,
],
)
except:
print(
"falied to untag queue , make sure you are using correct tagkey and queue url"
)
return response