Python
-
Navigate to your project directory.
-
Create a virtual environment and activate it.
-
Install
boto3
in your virtual environment:
pip install boto3
- Make a connection with the EQS client.
To make the connection with the EQS client , you need to pass configurations and credentials.
Configurations-
region_name- your_region
endpoint_url - eqs_endpoint_url (provided by e2e networks eqs service)
Credentials-
aws_access_key_id - your_access_key_id
aws_secret_key_id - your_secret_key_id
Make connection with EQS
You can make the connection with the EQS client in three ways:
a) You can directly pass the credentials and configurations at the time of making a session with the EQS client.
Hardcoded credentials in the application are not recommended.
import boto3
session = boto3.Session()
eqs_client=session.client(
“sqs”,
aws_access_key_id = your_access_key_id,
aws_secret_access_key = your_secret_key,
region_name = your_region,
endpoint_url = eqs_endpoint_url
)
You can also pass credentials and configurations inside boto3.Session() instead of session.client()
b) you can put credentials and configurations in the config and credentials file , and these files should be located at ~/.aws/config, ~/.aws/credentials location.
put into ~/.aws/config file
[default]
region =REGION
put into ~/.aws/credentials file
[default]
aws_access_key_id = ACCESS_KEY_ID
aws_secret_access_key = SECRET_KEY
import boto3
session = boto3.Session()
eqs_client=session.client(
“sqs”,
endpoint_url = EQS_ENDPOINT_URLl
)
This will include configurations and credentials in the client by default.
c) you can make the connection with the EQS client by passing the credentials and configuration into the environment variables file.
To install the python dotenv package, you can use the command
pip install python-dotenv
using terminal (Linux, OS X or Unix)
$ export AWS_ACCESS_KEY_ID = your_access_key_id
$ export AWS_SECRET_ACCESS_KEY = your_secret_key
$ export AWS_DEFAULT_REGION = your_eqs_region
using terminal (Windows)
> set AWS_ACCESS_KEY_ID = your_access_key_id
> set AWS_SECRET_ACCESS_KEY = your_secret_key
> set AWS_DEFAULT_REGION = your_eqs_region
or, edit your enviourment valiable file
AWS_ACCESS_KEY_ID = your_access_key_id
AWS_SECRET_ACCESS_KEY = your_secret_access_key
AWS_DEFAULT_REGION = your_eqs_region
import boto3
import os
from dotenv import load_dotenv
load_dotenv()
session = boto3.Session(aws_access_key_id = os.environ.get(“AWS_ACCESS_KEY_ID”),
aws_secret_access_key = os.environ.get(“AWS_SECRET_ACCESS_KEY”),
region_name = os.environ.get(“AWS_DEFAULT_REGION”)
)
eqs_client=session.client(
“sqs”,
endpoint_url = EQS_ENDPOINT_URL
)
above eqs_client is an object which has all methods related to queue operations.
response of session.client()
Methods available in eqs_client
Change VisibilityTimeout
Request Syntax
response = eqs_client.change_message_visibility(
QueueUrl='string',
ReceiptHandle='string',
VisibilityTimeout=123
)
Parameters
QueueUrl (string)- [Required]
The URL of the E2E EQS queue whose message's visibility is changed.
ReceiptHandle (string) - [Required]
The receipt handle is associated with the message whose visibility timeout is changed. This parameter is returned by the ReceiveMessage action
VisibilityTimeout (integer) - [Required] The new value for the message's visibility timeout (in seconds)
Response Syntax
None
Change VisibilityTimeout in batch
Request Syntax
response = eqs_client.change_message_visibility_batch(
QueueUrl="string",
Entries=[
{"Id": "string", "ReceiptHandle": "string", "VisibilityTimeout": 123},
],
)
Parameters
QueueUrl (string) - [Required]
The URL of the E2E EQS queue whose message's visibility is changed.
Entries (list) - [Required] (Dictionary)
id [Required]-(string) An identifier for this particular receipt handle used to communicate the result
ReceiptHandle [Required] -(string) A receipt handle
VisibilityTimeout - (string) The new value (in seconds) for the message’s visibility timeout
Response Syntax
{
"Successful": [{"Id": "string"}, ],
"Failed": [
{
"Id": "string",
"SenderFault": True | False,
"Code": "string",
"Message": "string",
},
],
}
Close connection
Request Syntax
eqs_client.close()
Response Syntax
None
Create queue
Request Syntax
response = eqs_client.create_queue(
QueueName="string",
Attributes={"string": "string"},
tags={"string": "string"}
)
Parameters
QueueName (string)- [Required] 1 - A queue name can have up to 80 characters
2 - Valid values: alphanumeric characters, hyphens ( -), and underscores ( _).
Attributes (dictionary)- Attributes could be
1 - DelaySeconds - Time(second), to which extent delivery of messages is delayed in the queue
Valid values - 0 to 900 (second)
default - 0 (second)
2 - MaximumMessageSize - The size of the message after which the queue will reject the message
Valid values - 1,024 bytes (1 KiB) to 262,144 bytes (256 KiB)
default - 262,144 bytes(256 KiB)
3 - MessageRetentionPeriod - Time to which extent a queue will retain a message
Valid values - 60 seconds (1 minute) to 1,209,600 seconds (14 days)
default - 345,600 (4 days)
4 - ReceiveMessageWaitTimeSeconds - Time to which extent ReceiveMessage action waits before receiving a message
Valid values - 0 to 20 (seconds)
default - 0 (second)
5 - VisibilityTimeout - The amount of time that a message in a queue is invisible to other consumers after a consumer retrieves it.
Valid values - 0 to 43,200 seconds (12 hours)
default - 30 second
tags (dictionary ) -
1 - Adding more than 50 tags to a queue isn't recommended.
2 - A new tag with a key identical to that of an existing tag overwrites the existing tag.
Response Syntax
{ 'QueueUrl': 'string' }
Delete message
Request Syntax
response = eqs_client.delete_message(
QueueUrl='string',
ReceiptHandle='string'
)
Parameters
QueueUrl (string) - [Required] The URL of the E2E EQS queue from which messages are deleted.
ReceiptHandle (string) - [Required] The receipt handle is associated with the message to delete.
Response Syntax
None
Delete message in batch
Request Syntax
response = eqs_client.delete_message_batch(
QueueUrl="string",
Entries=[
{"Id": "string", "ReceiptHandle": "string"},
],
)
Parameters
QueueUrl (string) - [Required] The URL of the E2E EQS queue from which messages are deleted.
Entries (list) - [Required] (Dictionary) id [Required]-(string) An identifier for this particular receipt handle used to communicate the result ReceiptHandle [Required] -(string) A receipt handle
Response Syntax
{
"Successful": [{"Id": "string"}, ],
"Failed": [
{
"Id": "string",
"SenderFault": True | False,
"Code": "string",
"Message": "string",
},
],
}
Delete queue
Request Syntax
response = eqs_client.delete_queue(
QueueUrl='string'
)
Parameters
QueueUrl (string) - [Required]
The URL of the E2E EQS queue to delete.
Response Syntax
None
Get queue attributes
Request Syntax
response = eqs_client.get_queue_attributes(
QueueUrl="string",
AttributeNames=[ "All", ],
)
Parameters
QueueUrl (string) - [Required] The URL of the E2E EQS queue from which attributes are get.
AttributeNames (list) - [Optional] A list of attributes for which to retrieve information
All - it specifies, returns all values
you can also specify specific attributes in the list
As - ['VisibilityTimeout' , 'MaximumMessageSize' , 'MessageRetentionPeriod' , 'ApproximateNumberOfMessages' ,'ApproximateNumberOfMessagesNotVisible' , 'CreatedTimestamp' , 'LastModifiedTimestamp' , 'QueueArn' , 'ApproximateNumberOfMessagesDelayed' , 'DelaySeconds' , 'ReceiveMessageWaitTimeSeconds']
Response Syntax
{
"Attributes": {"string": "string"}
}
Get queue url
Request Syntax
response = eqs_client.get_queue_url(QueueName='string',)
Parameters
QueueName (string) -[Required] The name of the queue whose URL must be fetched.
Response Syntax
{'QueueUrl': 'string'}
List queue tags
Request Syntax
response = eqs_client.list_queue_tags(QueueUrl='string')
Parameters
QueueUrl (string) - [Required] The URL of the E2E EQS queue of which all the tags must be listed.
Response Syntax
{
'Tags': {'string': 'string'}
}
List queues
Request Syntax
response = eqs_client.list_queues(
QueueNamePrefix='string',
MaxResults=123
)
Parameters QueueNamePrefix (string) - [Optional] A string to use for filtering the list results. Only those queues whose name begins with the specified string are returned.
MaxResults (integer) - [Optional] The maximum number of results to include in the response. Valid values - 1 to 1000
Response Syntax
{
"QueueUrls": ["string", ],
}
Purge queue
delete all the messages from queue.
Request Syntax
response = eqs_client.purge_queue(QueueUrl='string')
Parameters
QueueUrl (string) - [Required] The URL of the E2E EQS queue of which all the messages must be deleted.
Response Syntax
None
Receive message
Request Syntax
::
response = eqs_client.receive_message( QueueUrl="string", AttributeNames=[All], MessageAttributeNames=[ "string", ], MaxNumberOfMessages=2, VisibilityTimeout=25, WaitTimeSeconds=10, ReceiveRequestAttemptId="string", )
Parameters
QueueUrl (string) - [Required] The URL of the E2E EQS queue from which messages are received.
AttributeNames (list) - A list of attributes that need to be returned along with each message.
All - it specifies, returns all values
you can also specify specific attributes in the list
As - ['VisibilityTimeout' , 'MaximumMessageSize' , 'MessageRetentionPeriod' , 'ApproximateNumberOfMessages' ,'ApproximateNumberOfMessagesNotVisible' , 'CreatedTimestamp' , 'LastModifiedTimestamp' , 'QueueArn' , 'ApproximateNumberOfMessagesDelayed' , 'DelaySeconds' , 'ReceiveMessageWaitTimeSeconds']
MessageAttributeNames (list) - The name of the message attribute.
MaxNumberOfMessages (integer) - The maximum number of messages to return.
Valid range - 1 to 10
default - 1
VisibilityTimeout (integer) - The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request.
WaitTimeSeconds (integer) - The duration (in seconds) for which the call waits for a message to arrive in the queue before returning.
Response Syntax
{
"Messages": [
{
"MessageId": "string",
"ReceiptHandle": "string",
"MD5OfBody": "string",
"Body": "string",
"Attributes": {"string": "string"},
"MD5OfMessageAttributes": "string",
"MessageAttributes": {
"string": {
"StringValue": "string",
"BinaryValue": b"bytes",
"DataType": "string",
}
},
},
]
}
Send message
Request Syntax
response = eqs_client.send_message(
QueueUrl="string",
MessageBody="string",
DelaySeconds=123,
MessageAttributes={
"string": {
"StringValue": "string",
"BinaryValue": b"bytes",
"DataType": "string",
}
},
MessageSystemAttributes={
"string": {
"StringValue": "string",
"BinaryValue": b"bytes",
"DataType": "string",
}
},
)
Parameters
QueueUrl (string) - [Required] The URL of the E2E EQS queue to which messages are send.
MessageBody (string) - [Required] The message to send. The minimum size is one character. The maximum size is 256 KB.
DelaySeconds (integer) - The length of time, in seconds, for which to delay a specific message.
MessageAttributes (dictionary) - Each message attribute consists of a Name, Type, and Value .
Name, type, value and the message body must not be empty or null .
(string) - name of the message attributes
(dictionary) - Value of message attributes
1 - StringValue (string) - Strings are Unicode with UTF-8 binary encoding.
2 - BinaryValue (bytes) - Binary type attributes can store any binary data, such as compressed data, encrypted data, or images.
(string) - [Required] Type of message attributes
EQS support the following data type
Binary, Number, String
MessageSystemAttributes (dictionary) - Each message system attribute consists of a Name, Type, and Value.
The name, type, value and message body must not be empty or null.
(string) - the name of the message attributes
(dictionary) - Value of message attributes
1 - StringValue (string) - Strings are Unicode with UTF-8 binary encoding.
2 - BinaryValue (bytes) - Binary type attributes can store any binary data, such as compressed data, encrypted data, or images.
(string) - Type of message attributes
EQS support the following data type
Binary , Number , String
Response Syntax
{
'MD5OfMessageBody': 'string',
'MD5OfMessageAttributes': 'string',
'MD5OfMessageSystemAttributes': 'string',
'MessageId': 'string',
'SequenceNumber': 'string'
}
Send message in batch
Request Syntax
response = eqs_client.send_message_batch(
QueueUrl="string",
Entries=[
{
"Id": "string",
"MessageBody": "string",
"DelaySeconds": 123,
"MessageAttributes":
{
"string": {
"StringValue": "string",
"BinaryValue": b"bytes",
"DataType": "string",
}
},
"MessageSystemAttributes":
{
"string": {
"StringValue": "string",
"BinaryValue": b"bytes",
"DataType": "string",
}
},
},
],
)
Parameters
QueueUrl (string) - [Required] The URL of the E2E EQS queue to which messages are sent.
Entries (list) - [Required] A list of SendMessageBatchRequestEntry items.
(Dict) -
Contains the details of a single E2E EQS message along with an Id.
Id (string) - [REQUIRED] An identifier for a message in this batch is used to communicate the result.
MessageBody (string) - [Required] The message to send. The minimum size is one character. The maximum size is 256 KB.
DelaySeconds (integer) - The length of time, in seconds, for which to delay a specific message.
MessageAttributes (dictionary) - Each message attribute consists of a Name, Type, and Value.
The Name, type, value and message body must not be empty or null.
(string) - The name of the message attributes
(Dictionary) - Value of message attributes
1 - StringValue (string) - Strings are Unicode with UTF-8 binary encoding.
2 - BinaryValue (bytes) - Binary type attributes can store any binary data, such as compressed data, encrypted data, or images.
(string) -[Required] Type of message attributes
EQS support the following data type
Binary, Number, String
MessageSystemAttributes (dictionary) - Each message system attribute consists of a Name, Type, and Value.
Name, type, value and the message body must not be empty or null.
(string) - The name of the message attributes
(Dictionary) - Value of message attributes
1 - StringValue (string) - Strings are Unicode with UTF-8 binary encoding.
2 - BinaryValue (bytes) - Binary type attributes can store any binary data, such as compressed data, encrypted data, or images.
(string) - Type of message attributes
EQS support following data type
Binary , Number , String
Response Syntax
{
"Successful": [
{
"Id": "string",
"MessageId": "string",
"MD5OfMessageBody": "string",
"MD5OfMessageAttributes": "string",
"MD5OfMessageSystemAttributes": "string",
"SequenceNumber": "string",
},
],
"Failed": [
{
"Id": "string",
"SenderFault": True | False,
"Code": "string",
"Message": "string",
},
],
}
Set queue Attributes
Request Syntax
response = eqs_client.set_queue_attributes(
QueueUrl='string',
Attributes={
'string': 'string'
}
)
Parameters
QueueUrl (string) - [Required] The URL of the E2E EQS queue of which attributes should be set.
Attributes (Dictionary) : [Required] A map of attributes to set.
Response Syntax
None
Tag queue
Request Syntax
response = eqs_client.tag_queue(
QueueUrl='string',
Tags={'string': 'string', }
)
Parameters
QueueUrl (string) - [Required] The URL of the E2E EQS queue.
Tags (Dict) - [Required] The list of tags to be added to the specified queue.
Response Syntax
None
Untag queue
Request Syntax
response = eqs_client.untag_queue(
QueueUrl="string",
TagKeys=["string", ],
)
Parameters
QueueUrl (string) - [Required] The URL of the E2E EQS queue.
TagKeys (list) - [Required] The list of tags to be removed from the specified queue.
Response Syntax
None
Reference code
import boto3
ACCESS_KEY = "your_access_key_id"
SECRET_KEY = "your-secret-key"
SERVICE_NAME = "sqs"
REGION_NAME = "your_region"
class EqsClient:
# make a connection with the E2E EQS client
def make_connection(self):
try:
session = boto3.Session()
eqs_client = session.client(
SERVICE_NAME,
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
region_name=REGION_NAME,
endpoint_url="your_eqs_endpoint_url",
)
except:
print(
"problem in making connection with eqs client , please check your credentials"
)
return eqs_client
# change message visibility timeout of the message
def change_message_visibility_timeout(
self, eqs_client, queue_url, receipt_handle, visibility_time
):
try:
response = eqs_client.change_message_visibility(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle,
VisibilityTimeout=visibility_time,
)
except:
print(" error in changing visibility timeout ")
return response
# change message visibility timeout of the messages
def change_message_visibility_timeout_batch(
self, eqs_client, queue_url, receipt_handle1, receipt_handle2
):
try:
response = eqs_client.change_message_visibility_batch(
QueueUrl=queue_url,
Entries=[
{
"Id": "1",
"ReceiptHandle": receipt_handle1,
"VisibilityTimeout": 123,
},
{
"Id": "2",
"ReceiptHandle": receipt_handle2,
"VisibilityTimeout": 221,
},
],
)
except:
print(" error in changing message visibility in batch ")
return response
# close connections with E2E EQS endpoint URL
def close_connection(self, eqs_client):
try:
return eqs_client.close()
except:
print("error in closing connection with eqs")
# delete messages
def delete_message_batch(self, eqs_client, queue_url, rh1, rh2):
try:
response = eqs_client.delete_message_batch(
QueueUrl=queue_url,
Entries=[
{"Id": "1", "ReceiptHandle": rh1},
{"Id": "2", "ReceiptHandle": rh2},
],
)
except:
print("error in deleting messages in batch ")
return response
# delete queue
def delete_queue(self, eqs_client, queue_url):
try:
response = eqs_client.delete_queue(QueueUrl=queue_url)
except:
print("failed to delete queue")
return response
# get queue URL
def get_queue_url(self, eqs_client, queue_name):
try:
response = eqs_client.get_queue_url(QueueName=queue_name)
except:
print(
"error in getting url of the queue , please make sure you are using correct queue name"
)
return response
# list all the queues
def list_queues(self, eqs_client):
try:
response = eqs_client.list_queues()
except:
print("error in listing queues")
return response
# list queue tags based on queue url
def list_queue_tags(self, eqs_client, queue_url):
try:
response = eqs_client.list_queue_tags(QueueUrl=queue_url)
except:
print(
"failed to list queue tags, make sure you are using correct queue url"
)
return response
# make queue
def make_queue(self, eqs_client, queue_name):
try:
response = eqs_client.create_queue(QueueName=queue_name)
except:
print("error in making queue")
return response
# get attributes of a queue based on queue url
def get_all_attributes(self, eqs_client, queue_url):
try:
response = eqs_client.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=[
"All",
],
)
except:
print("error in getting all attributes")
return response
# receive a message and delete it from the queue
def process_messeges(self, eqs_client, queue_url):
try:
response = eqs_client.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1,
VisibilityTimeout=30,
WaitTimeSeconds=0,
)
if "Messages" in response:
message = response["Messages"][0]
print("Received message: %s" % message["Body"])
try:
response = eqs_client.delete_message(
QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"]
)
except:
print("failed to delete messagae")
else:
print("No messages in queue")
except:
print("failed to process message")
# delete all the messages from the queue
def purge_queue(self, eqs_client, queue_url):
try:
response = eqs_client.purge_queue(QueueUrl=queue_url)
except:
print("failed to delete all messages from queue")
return response
# push message to a queue
def push_message_queue(self, eqs_client, queue_url, message):
try:
response = eqs_client.send_message(QueueUrl=queue_url, MessageBody=message)
except:
print("failed to push message to queue ")
return response
# send messages to a queue
def send_message_batch(self, eqs_client, queue_url):
try:
response = eqs_client.send_message_batch(
QueueUrl=queue_url,
Entries=[
{
"Id": "1",
"MessageBody": "message1",
"DelaySeconds": 13,
"MessageAttributes": {
"name1": {
"StringValue": "string_message",
"BinaryValue": "bytes_message",
"DataType": "String",
}
},
"MessageSystemAttributes": {
"name_system1": {
"StringValue": "string_system_message",
"BinaryValue": b"bytes_system_message",
"DataType": "String",
}
},
},
{
"Id": "2",
"MessageBody": "message2",
"DelaySeconds": 12,
"MessageAttributes": {
"name2": {
"StringValue": "string_message",
"BinaryValue": b"bytes_message",
"DataType": "String",
}
},
"MessageSystemAttributes": {
"name_system2": {
"StringValue": "string_system_message",
"BinaryValue": b"bytes_system_message",
"DataType": "String",
}
},
},
],
)
except:
print("error in sending messages")
return response
# add tag to a queue
def tag_queue(self, eqs_client, queue_url, tags):
try:
response = eqs_client.tag_queue(QueueUrl=queue_url, Tags=tags)
except:
print("failed to tag queue")
return response
# untag tags form a queue
def untag_queue(self, eqs_client, queue_url, tagkey):
try:
response = eqs_client.untag_queue(
QueueUrl=queue_url,
TagKeys=[
tagkey,
],
)
except:
print(
"falied to untag queue , make sure you are using correct tagkey and queue url"
)
return response