Clearwateranalytics.com is now cwan.com. Your Clearwater and Enfusion credentials have not changed.
Blog
6 min read

Implementing an SQS listener in Python using FastAPI and Uvicorn

huge
By Ayush Sharma

Introduction

Amazon Simple Queue Service (Amazon SQS) is a vital tool for decoupling and integrating distributed software systems. It provides a reliable and scalable way to manage message queues, ensuring that application components can communicate asynchronously without tight coupling. This decoupling is crucial in microservices architectures, where independent services interact without directly depending on each other’s availability or performance.

In my recent work, I needed to implement an SQS listener in Python. While Java offers straightforward annotations to enable SQS listeners, Python lacks an equivalent out-of-the-box solution. To address this, I chose to implement an SQS listener using FastAPI, a modern high-performance web framework for building APIs, and Uvicorn, an ASGI web server that efficiently handles asynchronous tasks.

Amazon SQS excels in maintaining the resilience and scalability of distributed systems. By placing messages in a queue, it allows sending components to operate independently of receiving components, which is crucial for load levelling and message durability. Even if a receiving component is temporarily unavailable, SQS ensures that messages are not lost. Furthermore, SQS integrates seamlessly with other AWS services like Lambda, S3, and SNS, making it a versatile choice for various use cases, including event-driven architectures, data processing pipelines, and serverless applications.

Choosing Python and FastAPI for SQS listeners

Python’s versatility and extensive library support make it a popular choice for cloud-native applications. FastAPI, in particular, offers a modern and async-friendly approach to building web APIs, which is ideal for I/O-bound tasks like listening to SQS queues. Unlike traditional synchronous frameworks, FastAPI with Uvicorn allows the application to handle multiple requests concurrently without blocking, which is critical when dealing with message queues.

Compared to Java, where SQS listeners can be implemented using simple annotations, Python provides more flexibility, especially when you need to customize the behavior of the listener or integrate it with other async processes.

Setting up and configuring the SQS queue

Before diving into the Python code, you need to set up an SQS queue. Here’s a step-by-step guide:

1. Create an SQS queue:

  • Log in to the AWS Management Console.
  • Navigate to the SQS service.
  • Click on “Create Queue.”
  • Choose the “Standard” queue type, as it supports high-throughput and at-least-once delivery.
  • Name the queue and leave the default configurations unless specific requirements dictate otherwise.
  • Click “Create Queue” to finalize the setup.

2. Configure IAM Roles and Permissions:

  • Ensure that your Python application has the necessary permissions to access the SQS queue.
  • Create an IAM role with sqs:ReceiveMessage, sqs:DeleteMessage, and sqs:GetQueueAttributes permissions.
  • Attach this role to the instance or container running your Python application.

Implementing the SQS listener in Python

With the queue set up, we can now implement the SQS listener. Here’s a basic example using FastAPI and Uvicorn:

import threading 
import uvicorn as uvicorn 
from fastapi import FastAPI 
 
from app.services.sqs_listener import listen_sqs 
from build.lib.app.main import asofdate_api, init_service 
 
example_api = FastAPI() 
SERVICE_NAME = "Example Service" 
 
 
class BackgroundTasks(threading.Thread): 
    def run(self, *args, **kwargs): 
        while True:  # an infinite loop acting as a listener for SQS 
            listen_sqs() 
 
 
if __name__ == "__main__": 
    uvicorn.run("main:example_api", host='0.0.0.0', port=8080, workers=5, timeout_keep_alive=600, 
                reload=False)  # here the number of workers specified should be number of cores * 2 +1 
 
 
@asofdate_api.on_event("startup")  # this will get triggered when the application has started up on uvicorn server 
async def startup_event(): 
    t = BackgroundTasks()  # this will add a always running background task 
    t.start()

Now coming to SQS listener.

import json 
import logging 
import os 
import time 
 
import boto3 
 
logger = logging.getLogger() 
 
 
def listen_sqs(): 
    sqs = boto3.client('sqs')  # here to make a sqs client you might need access id and key too 
    queue_url = os.environ['SQS_URL']  # getting the queue URL from environment variable 
    try: 
        logger.info("Waiting for sqs message") 
        response = sqs.receive_message( 
            QueueUrl=queue_url, 
            AttributeNames=['All'], 
            MaxNumberOfMessages=10, 
            WaitTimeSeconds=20 
        )  # the recieve message config can be set as required 
        if 'Messages' in response: 
            logger.info(f"Response is: {response}") 
            message_receipt_handle = '' 
            body = '' 
            messages = response['Messages'] 
            for val in messages: 
                body = val['Body'] 
                message_receipt_handle = val['ReceiptHandle'] 
                logging.info(f"Read from SQS: {body}") 
                message = json.loads(body) 
                # your code how to handle the message goes here 
                sqs.delete_message( 
                    QueueUrl=queue_url, 
                    ReceiptHandle=message_receipt_handle 
                )  # deleting the message so that it does not process again 
        time.sleep(10)  # making the loop for 10 seconds 
    except Exception as e: 
        logging.info('An error occurred while reading from sqs: %s' % str(e)) 
        time.sleep(10)  # making the loop for 10 seconds

Deploying the listener on AWS

Once the SQS listener is implemented, the next step is to deploy it on AWS. The deployment process involves containerizing the application and deploying it to Amazon EKS (Elastic Kubernetes Service).

Containerization with docker

First, create a Dockerfile to containerize the application:

FROM python:3.11-slim
WORKDIR /app
COPY . /app
RUN pip install --no-cache-dir -r requirements.txt
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Deploying to Amazon EKS

After building and pushing the Docker image to a container registry (like Amazon ECR), you can deploy it to Amazon EKS. This process typically involves creating an EKS cluster, setting up necessary node groups, and deploying your application using Kubernetes manifests. However, detailed steps for this process, including configuration and scaling considerations, are beyond the scope of this document.

For more information on deploying applications to Kubernetes, consider referring to Kubernetes documentation or relevant AWS resources.

Real-world applications

In my project, this SQS listener was a crucial part of a data ingestion pipeline. Files uploaded to an S3 bucket triggered an event that placed a message in the SQS queue. The listener then processed these messages, extracting and transforming the data before saving it to a database. This setup allowed for a robust and scalable data ingestion process, capable of handling large volumes of data with ease.

Monitoring and logging

Monitoring and logging are vital for ensuring the reliability of your SQS listener. AWS CloudWatch can be used to track metrics such as the number of messages received, processing times, and errors. Integrating CloudWatch logs with your FastAPI application allows you to capture detailed logs for each message processed, which is invaluable for debugging and auditing.

Best practices

  • Visibility Timeout: Ensure that the visibility timeout for your SQS messages is appropriately set to allow enough time for processing before a message is re-queued.
response = sqs.receive_message( 
    QueueUrl=queue_url, 
    AttributeNames=['All'], 
    MaxNumberOfMessages=10, 
    WaitTimeSeconds=20, 
    VisibilityTimeout=30  # Set visibility timeout to 30 seconds 
)  # the recieve message config can be set as required
  • Batching: If your use case allows, process messages in batches to improve efficiency and reduce the number of API calls.
if 'Messages' in response: 
    messages = response['Messages'] 
    entries = [{'Id': str(index), 'ReceiptHandle': message['ReceiptHandle']} for index, message in 
               enumerate(messages)] 
 
    # Process messages and delete them in batches 
    for message in messages: 
        body = message['Body'] 
        logging.info(f"Read from SQS: {body}") 
        # Your processing logic here 
 
    # Delete messages in batch 
    sqs.delete_message_batch(QueueUrl=queue_url, Entries=entries)
  • Error Handling: Implement robust error handling and use Dead Letter Queues (DLQs) to manage failed messages.

Future considerations

  • Advanced Features: Consider adding message filtering to your SQS setup, allowing your listener to handle specific messages based on content.
  • Service Integration: Explore integrating the SQS listener with other AWS services, such as SNS for notifications or Lambda for serverless processing.

Conclusion

Implementing an SQS listener in Python using FastAPI and Uvicorn offers a flexible and scalable solution for processing messages in distributed systems. By leveraging Python’s async capabilities, you can build a responsive and efficient application that handles high-throughput message processing with ease.

This comprehensive approach to implementing an SQS listener not only addresses the immediate need for message processing, but also positions your application for future growth and scalability in a cloud-native environment.


About the author

Ayush Sharma is a Software Development Engineer with over 4 years of experience, known for his passion for exploring new challenges in technology and development. With a strong technical background, he continually seeks opportunities to innovate and grow in the ever-evolving tech landscape. Outside of his professional life, Ayush enjoys watching anime and is an avid cricket enthusiast.