
Adapting to insurance market shifts: How technology is reshaping operations
Today’s asset owners are facing increasing challenges, greater regulatory scrutiny, shifting investment trends, and economic uncertainty.
Amazon Simple Queue Service (Amazon SQS) is a vital tool for decoupling and integrating distributed software systems. It provides a reliable and scalable way to manage message queues, ensuring that application components can communicate asynchronously without tight coupling. This decoupling is crucial in microservices architectures, where independent services interact without directly depending on each other’s availability or performance.
In my recent work, I needed to implement an SQS listener in Python. While Java offers straightforward annotations to enable SQS listeners, Python lacks an equivalent out-of-the-box solution. To address this, I chose to implement an SQS listener using FastAPI, a modern high-performance web framework for building APIs, and Uvicorn, an ASGI web server that efficiently handles asynchronous tasks.
Amazon SQS excels in maintaining the resilience and scalability of distributed systems. By placing messages in a queue, it allows sending components to operate independently of receiving components, which is crucial for load levelling and message durability. Even if a receiving component is temporarily unavailable, SQS ensures that messages are not lost. Furthermore, SQS integrates seamlessly with other AWS services like Lambda, S3, and SNS, making it a versatile choice for various use cases, including event-driven architectures, data processing pipelines, and serverless applications.
Python’s versatility and extensive library support make it a popular choice for cloud-native applications. FastAPI, in particular, offers a modern and async-friendly approach to building web APIs, which is ideal for I/O-bound tasks like listening to SQS queues. Unlike traditional synchronous frameworks, FastAPI with Uvicorn allows the application to handle multiple requests concurrently without blocking, which is critical when dealing with message queues.
Compared to Java, where SQS listeners can be implemented using simple annotations, Python provides more flexibility, especially when you need to customize the behavior of the listener or integrate it with other async processes.
Before diving into the Python code, you need to set up an SQS queue. Here’s a step-by-step guide:
1. Create an SQS queue:
2. Configure IAM Roles and Permissions:
With the queue set up, we can now implement the SQS listener. Here’s a basic example using FastAPI and Uvicorn:
import threading import uvicorn as uvicorn from fastapi import FastAPI from app.services.sqs_listener import listen_sqs from build.lib.app.main import asofdate_api, init_service example_api = FastAPI() SERVICE_NAME = "Example Service" class BackgroundTasks(threading.Thread): def run(self, *args, **kwargs): while True: # an infinite loop acting as a listener for SQS listen_sqs() if __name__ == "__main__": uvicorn.run("main:example_api", host='0.0.0.0', port=8080, workers=5, timeout_keep_alive=600, reload=False) # here the number of workers specified should be number of cores * 2 +1 @asofdate_api.on_event("startup") # this will get triggered when the application has started up on uvicorn server async def startup_event(): t = BackgroundTasks() # this will add a always running background task t.start()
Now coming to SQS listener.
import json import logging import os import time import boto3 logger = logging.getLogger() def listen_sqs(): sqs = boto3.client('sqs') # here to make a sqs client you might need access id and key too queue_url = os.environ['SQS_URL'] # getting the queue URL from environment variable try: logger.info("Waiting for sqs message") response = sqs.receive_message( QueueUrl=queue_url, AttributeNames=['All'], MaxNumberOfMessages=10, WaitTimeSeconds=20 ) # the recieve message config can be set as required if 'Messages' in response: logger.info(f"Response is: {response}") message_receipt_handle = '' body = '' messages = response['Messages'] for val in messages: body = val['Body'] message_receipt_handle = val['ReceiptHandle'] logging.info(f"Read from SQS: {body}") message = json.loads(body) # your code how to handle the message goes here sqs.delete_message( QueueUrl=queue_url, ReceiptHandle=message_receipt_handle ) # deleting the message so that it does not process again time.sleep(10) # making the loop for 10 seconds except Exception as e: logging.info('An error occurred while reading from sqs: %s' % str(e)) time.sleep(10) # making the loop for 10 seconds
Once the SQS listener is implemented, the next step is to deploy it on AWS. The deployment process involves containerizing the application and deploying it to Amazon EKS (Elastic Kubernetes Service).
First, create a Dockerfile to containerize the application:
FROM python:3.11-slim WORKDIR /app COPY . /app RUN pip install --no-cache-dir -r requirements.txt CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
After building and pushing the Docker image to a container registry (like Amazon ECR), you can deploy it to Amazon EKS. This process typically involves creating an EKS cluster, setting up necessary node groups, and deploying your application using Kubernetes manifests. However, detailed steps for this process, including configuration and scaling considerations, are beyond the scope of this document.
For more information on deploying applications to Kubernetes, consider referring to Kubernetes documentation or relevant AWS resources.
In my project, this SQS listener was a crucial part of a data ingestion pipeline. Files uploaded to an S3 bucket triggered an event that placed a message in the SQS queue. The listener then processed these messages, extracting and transforming the data before saving it to a database. This setup allowed for a robust and scalable data ingestion process, capable of handling large volumes of data with ease.
Monitoring and logging are vital for ensuring the reliability of your SQS listener. AWS CloudWatch can be used to track metrics such as the number of messages received, processing times, and errors. Integrating CloudWatch logs with your FastAPI application allows you to capture detailed logs for each message processed, which is invaluable for debugging and auditing.
response = sqs.receive_message( QueueUrl=queue_url, AttributeNames=['All'], MaxNumberOfMessages=10, WaitTimeSeconds=20, VisibilityTimeout=30 # Set visibility timeout to 30 seconds ) # the recieve message config can be set as required
if 'Messages' in response: messages = response['Messages'] entries = [{'Id': str(index), 'ReceiptHandle': message['ReceiptHandle']} for index, message in enumerate(messages)] # Process messages and delete them in batches for message in messages: body = message['Body'] logging.info(f"Read from SQS: {body}") # Your processing logic here # Delete messages in batch sqs.delete_message_batch(QueueUrl=queue_url, Entries=entries)
Implementing an SQS listener in Python using FastAPI and Uvicorn offers a flexible and scalable solution for processing messages in distributed systems. By leveraging Python’s async capabilities, you can build a responsive and efficient application that handles high-throughput message processing with ease.
This comprehensive approach to implementing an SQS listener not only addresses the immediate need for message processing, but also positions your application for future growth and scalability in a cloud-native environment.
Ayush Sharma is a Software Development Engineer with over 4 years of experience, known for his passion for exploring new challenges in technology and development. With a strong technical background, he continually seeks opportunities to innovate and grow in the ever-evolving tech landscape. Outside of his professional life, Ayush enjoys watching anime and is an avid cricket enthusiast.