Scalable Data Processing with AWS Serverless Scatter-Gather Pattern Implementation
José David Arévalo · 21 April 2023
Aws
Data engineering plays a critical role in managing, processing, and extracting value from substantial volumes of data. The Scatter-Gather pattern has become a popular integration pattern that enables data engineers to efficiently process and analyze data by breaking down sizable tasks into smaller subtasks and executing them concurrently. In this article, we will showcase the implementation of the Scatter-Gather pattern in an AWS Serverless environment, utilizing AWS Lambda, Amazon SQS, and DynamoDB Streams. By adopting this scalable, parallel, and efficient data processing solution, data engineers can effectively address the challenges of handling large volumes of data in various independent applications.
Scatter-Gather Pattern Overview
The Scatter-Gather pattern is a powerful approach in the world of data engineering, designed to enhance the efficiency and scalability of data processing tasks. This pattern functions by dividing a larger task into smaller, more manageable subtasks, which can then be executed concurrently. The parallel execution of these subtasks helps to optimize processing time and resource utilization. In the Scatter phase, the primary task is separated into multiple subtasks, which can be handled independently. These subtasks are then distributed across various processing units or in the current case a Lambda function, promoting the parallel processing of data. During the Gather phase, the results generated from the execution of each subtask are collected and combined to form the final output or status. This output can then be used for further analysis or processing, as the application requires. The Scatter-Gather pattern offers several advantages, such as improved performance and scalability. By allowing for parallel processing of subtasks, the pattern significantly reduces the time required to complete the primary task. Furthermore, it enables data engineers to design flexible and cost-effective solutions that can adapt to growing data volumes and varying workloads.
AWS Serverless Implementation
To implement the Scatter-Gather pattern using AWS Serverless components, we will utilize AWS Lambda, Amazon SQS, and DynamoDB Streams. These services seamlessly integrate and offer a scalable and cost-effective solution for data engineers aiming to process large volumes of data efficiently. The following illustration depicts the architecture that will be implemented step-by-step in the subsequent sections of this article:
AWS Lambda
AWS Lambda is a compute service that allows you to run your code without provisioning or managing servers. It automatically scales applications in response to the number of incoming requests, providing the flexibility and efficiency required for processing data. In this implementation, Lambda functions will be used to process individual subtasks in parallel during the Scatter, Processor, Aggregator, and Gather phases.
Amazon SQS
Amazon Simple Queue Service (SQS) is a fully managed message queuing service that enables decoupling components within a cloud application. In our Scatter-Gather implementation, Amazon SQS will be utilized to enqueue the subtasks and trigger the Processor Lambda function when a message is received. This ensures a reliable and scalable messaging system that can handle high throughput and dynamic workloads.
DynamoDB Streams
DynamoDB Streams is an event-driven service that captures item-level modifications in a DynamoDB table and outputs a stream of change events. In the Aggregator and Gather phases, these streams will be used to collect and aggregate the updates generated by the Lambda functions during the Scatter and Processor phases. The gathered data can then be used for further analysis or processing, as the application requires.
With these AWS Serverless components, we can build a scalable and efficient Scatter-Gather pattern implementation that addresses the challenges of processing large volumes of data.
Hands-On: Implementation Step by Step
This section will walk through the steps to set up the Scatter-Gather architecture and demonstrate its effectiveness in a data engineering context.
As a practical exercise, let’s imagine a situation where we must read data from a dataset and copy it into a new bucket, partitioning it by an index. We want the process to run parallel and know when all processes have been finished.
To achieve this, we will develop and deploy our service using AWS Serverless Application Model (SAM), an open-source framework for building serverless applications.
Prerequisites
To follow this step-by-step guide, you need to have:
An AWS account.
AWS CLI installed and configured.
Familiarity with AWS services, such as AWS Lambda, Amazon SQS, and DynamoDB Streams.
Github repo: https://github.com/jdaarevalo/ServerlessScatterGather
Acquiring Data for the Project
To begin a proof of concept or refine technical skills, it’s crucial to obtain an appropriate dataset. The ideal dataset should meet the following criteria: 1) easily replicable by anyone, 2) not too small, 3) free :) , and 4) comprised of real data.
For this project, we have selected the COVID-19 Data Lake’s “us-states” dataset from the Registry of Open Data on AWS. You can access this dataset at the following link: https://registry.opendata.aws/aws-covid19-lake/
One significant advantage of using public data stored in an S3 bucket is that there’s no need to download the data to execute queries. Instead, create a table in AWS Glue and utilize AWS Athena to consume the data.
Here’s a brief overview of how to set up this process:
1. Create an AWS Glue Database
GlueRawDataBase:
Type: 'AWS::Glue::Database'
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseInput:
Name: !Ref RawDataBaseName
Description: Database Raw Data
2. Create an AWS Glue Table
Specify the data’s location in S3 (in our case, “s3://covid19-lake/rearc-covid-19-nyt-data-in-usa/json/us-states”) and the dataset’s column names and types. You can find the metadata description at the following link: https://aws.amazon.com/marketplace/pp/prodview-jmb464qw2yg74?qid=1585594883027&sr=0-1&ref_=srh_res_product_title#overview
GlueRawDataNYTimesCovidTable:
Type: 'AWS::Glue::Table'
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseName: !Ref GlueRawDataBase
TableInput:
Description: "Raw Data on COVID-19 cases from NY Times at US state level."
TableType: "EXTERNAL_TABLE"
Retention: 0
Name: covid_nytimes_states
Parameters:
has_encrypted_data: False
classification: json
typeOfData: file
StorageDescriptor:
Compressed: False
InputFormat: "org.apache.hadoop.mapred.TextInputFormat"
Location: !Ref S3RawDataLocationURI
OutputFormat: "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"
StoredAsSubDirectories: false
SerdeInfo:
Parameters: {
"paths": "date,state,fips,cases,deaths"
}
SerializationLibrary: "org.openx.data.jsonserde.JsonSerDe"
Columns:
- Type: string
Name: date
Comment: "reporting date"
- Type: string
Name: state
Comment: ""
- Type: string
Name: fips
Comment: "FIPS code"
- Type: bigint
Name: cases
Comment: "# confirmed cases"
- Type: bigint
Name: deaths
Comment: "# deaths"
3. Create an Amazon S3 Bucket to store the data
ProcessedDataS3Bucket:
Type: 'AWS::S3::Bucket'
Properties:
BucketName: !Join
- "-"
- - "scatter-gather-processed-data"
- !Select
- 0
- !Split
- "-"
- !Select
- 2
- !Split
- "/"
- !Ref "AWS::StackId"
By following these steps, you can establish an efficient, cost-effective data acquisition process suitable for data engineers at all experience levels.
Building the Scatter-Gather Application
Now that our data source is in place let’s walk through the process of building and deploying a Scatter-Gather application using the AWS Serverless Application Model (SAM)
AWS Serverless Application Model (SAM) is an open-source framework that simplifies the process of building serverless applications on AWS. By defining your application’s infrastructure as code, you can easily deploy and manage your application using AWS CloudFormation. SAM provides an extension of AWS CloudFormation that enables you to define serverless resources like Lambda functions, API Gateway APIs, and DynamoDB tables, using a shorthand syntax that’s easy to read and write.
Step 1. Create the Processor Lambda Function
ProcessorFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/
Handler: processor_lambda.lambda_handler
Runtime: python3.9
Architectures:
- x86_64
Policies:
- AmazonAthenaFullAccess
- AmazonS3FullAccess
Timeout: 180
MemorySize: 512
Environment:
Variables:
ATHENA_RAW_DATABASE_NAME: !Ref RawDataBaseName
S3_BUCKET_NAME: !Ref ProcessedDataS3Bucket
Layers:
- !Sub 'arn:aws:lambda:${RegionName}:017000801446:layer:AWSLambdaPowertoolsPythonV2:16'
- !Sub 'arn:aws:lambda:${RegionName}:336392948345:layer:AWSSDKPandas-Python39:5'
Events:
MySQSEvent:
Type: SQS
Properties:
Queue: !GetAtt ProcessorSqsQueue.Arn
BatchSize: 1
ProcessorSqsQueue:
Type: AWS::SQS::Queue
Properties:
VisibilityTimeout: 200
QueueName: "processor_q"
Key aspects to highlight in the template definition:
Environment variables: Pass the Athena database and S3 bucket names created.
Utilize the layers AWSLambdaPowertoolsPythonV2 for writing structured JSON logs (full details at https://awslabs.github.io/aws-lambda-powertools-python/2.14.0/core/logger/). Including well-structured logs is a best practice, regardless of the application's size. Thanks for the teachings in this aspect, Oleg. It has been a significant and positive change in all my projects.
Additionally, use the AWSSDKPandas (awswrangler) layer for easy integration with Athena, Glue, S3, PostgreSQL, and other services (full details at https://github.com/aws/aws-sdk-pandas).
Policies: Provide access to S3 and Athena tables. Note that giving FullAccess is generally considered a bad security practice, but we’ll use it in this example for simplicity.
Events: Define the SQS that will invoke the Lambda function. We want an independent Lambda function to process the message for each queue message, so the BatchSize should be 1. This number represents the number of records sent to the function in each batch.
In the SQS::Queue pay attention to the Visibility Timeout variable. This is a period during which Amazon SQS prevents all consumers from receiving and processing the message. This number should equal or exceed the Lambda Timeout (more details at https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html).
Next, write the Processor Lambda function:
import os
import json
import awswrangler as wr
from datetime import datetime
from aws_lambda_powertools import Logger
logger = Logger()
ATHENA_RAW_DATABASE_NAME = os.getenv('ATHENA_RAW_DATABASE_NAME')
S3_BUCKET_NAME = os.getenv('S3_BUCKET_NAME')
datetime_now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
@logger.inject_lambda_context
def lambda_handler(event, context):
# sample event in events/processor/country.json
logger.info({"action":"invoke_lambda", "payload":{"event":event}})
json_body = json.loads(event["Records"][0]["body"])
state_event = json_body.get("state")
index_event = json_body.get("index")
# Extract data from Athena
# - Query data to the specific state
query = f"""
select *
, '{datetime_now}' as processed_time
, {index_event} as index
from covid_nytimes_states
where state = '{state_event}'
"""
logger.info({"action":"fetch_data", "payload":{"query":query, "db":ATHENA_RAW_DATABASE_NAME}})
errors = []
try:
state_data = wr.athena.read_sql_query(query, database=ATHENA_RAW_DATABASE_NAME)
except Exception as exception:
errors.append(exception)
logger.error({"action":"fetch_data", "payload":{"error":str(exception),
"query":query, "db":ATHENA_RAW_DATABASE_NAME}})
# Made your own transformations
# Load data in s3
wr.s3.to_parquet(df=state_data, path=f"s3://{S3_BUCKET_NAME}/
index={index_event}/data.parquet")
return {
"statusCode": 200 if not errors else 400,
"status": "success" if not errors else "error"
}
This Lambda function receives the event parameter, which contains variables like index (e.g., 12) as an integer and a state_name (e.g., South Carolina).
Another best practice is to include the event that triggers each Lambda, which helps us understand the event structure, run the Lambda locally, and run integration tests. In the repository, the events/processor.jsonfile is included.
At this point, our function is ready to be triggered multiple times in parallel. We can test it by deploying and sending an SQS message. The first pull request can be found at https://github.com/jdaarevalo/ServerlessScatterGather/pull/1/files.
Step 2. Create the Scatter Lambda Function
In this step, we will create the Scatter Lambda function, which is responsible for sending SQS messages to trigger the Processor for each state. Additionally, it will write items with the state_name and processor status in the DynamoDB scatter_gather_processes table and update items with the count of triggered processes in the scatter_gather_aggregate table.
First, create the DynamoDB tables:
SGProcessesDBTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Ref SGProcessesTableName
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
KeySchema:
-
AttributeName: "scatter_gather_id"
KeyType: "HASH"
-
AttributeName: "process_id"
KeyType: "RANGE"
AttributeDefinitions:
-
AttributeName: "scatter_gather_id"
AttributeType: "S"
-
AttributeName: "process_id"
AttributeType: "S"
StreamSpecification:
StreamViewType: "NEW_IMAGE"
SGAggregateDBTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Ref SGAggregateTableName
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
KeySchema:
AttributeName: "scatter_gather_id"
KeyType: "HASH"
AttributeDefinitions:
AttributeName: "scatter_gather_id"
AttributeType: "S"
StreamSpecification:
StreamViewType: "NEW_IMAGE"
For SGProcessesDBTable, define scatter_gather_id as the primary key and the process_id as the sort key, allowing multiple process_id values per scatter_gather_id. This setup tracks status changes for each processor. For SGAggregateDBTable, only the scatter_gather_id with the counter attributes are needed. In both tables, include the StreamSpecification property, representing the DynamoDB Streams configuration. The StreamViewTypedetermines what information is written to the stream when a table item is modified, which could be . In our case, we want the record of the NEW_IMAGE (the full item, as it appears after the modification).
Now, define the Scatter Lambda function in the template.yaml file:
ScatterFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/
Handler: scatter_lambda.lambda_handler
Runtime: python3.9
Architectures:
- x86_64
Policies:
- AmazonDynamoDBFullAccess
- AmazonAthenaFullAccess
- AmazonS3FullAccess
- AmazonSQSFullAccess
Timeout: 180
MemorySize: 512
Environment:
Variables:
SG_PROCESSES_TABLE_NAME: !Ref SGProcessesTableName
SG_AGGREGATE_TABLE_NAME: !Ref SGAggregateTableName
ATHENA_RAW_DATABASE_NAME: !Ref RawDataBaseName
QUEUE_URL: !GetAtt ProcessorSqsQueue.QueueUrl
Layers:
- !Sub 'arn:aws:lambda:${RegionName}:017000801446:layer:AWSLambdaPowertoolsPythonV2:16'
- !Sub 'arn:aws:lambda:${RegionName}:336392948345:layer:AWSSDKPandas-Python39:5'
Note that we need to pass the DynamoDB database names, the Athena table name, and the QueueUrl for delivering messages to trigger the Processor function. Also, verify the included policies.
Scatter Lambda:
@logger.inject_lambda_context
def lambda_handler(event, context):
logger.info({"action":"invoke_lambda", "payload":{"event":event}})
timestamp = int(time.time())
# read the contries from the athena table
query = "select distinct state from covid_nytimes_states"
unique_states = wr.athena.read_sql_query(query, database=ATHENA_RAW_DATABASE_NAME)
# write in SG_AGGREGATE_TABLE_NAME DynamoDB table how many states should be executed
item = {
"scatter_gather_id" : str(timestamp),
"total_processes": len(unique_states),
"finished_processes": 0
}
create_item(SG_AGGREGATE_TABLE_NAME, item)
# For each state send the message queue
# We want to Trigger processor_lambda by state
for index, row in unique_states.iterrows():
state = row['state']
#store dynamo data by processes triggered
item_state = {
"scatter_gather_id" : str(timestamp),
"process_id" : "{}_{}".format(index, state),
"status": "started"
}
create_item(SG_PROCESSSES_TABLE_NAME, item_state)
formatted_message = '{{"state":"{}", "index":{}, "item_state":{}}}'.format(
state, index, json.dumps(item_state))
logger.info({"action":"message_queue", "payload":{"message":formatted_message}})
send_message_queue(formatted_message)
return {
"statusCode": 200,
"status": "success",
"data": {"states": len(unique_states)}
}
def send_message_queue(message):
sqs_client =boto3.client("sqs")
response = sqs_client.send_message(
QueueUrl=QUEUE_URL,
MessageBody=str(message)
)
logger.info({"action":"send_message_queue", "payload":{"message":message,"response":response}})
The Scatter Lambda function performs the following actions:
Query the Athena table for distinct states, using these names as parameters to trigger our processor function. We used the same table for this list as an example, but other tables or custom loop rules could be used.
Write the total number of states to the DynamoDB SGAggregateDBTable, representing the target processes that should be completed. Define scatter_gather_id as the timestamp and use it as the primary key.
For each state:
Write an item in the DynamoDB SGProcessesDBTable with the scatter_gather_id as the primary key and the combination of index and state_name as the sort key, setting the status attribute as “started”.
Send a message to the SQS to trigger the Processor Function.
We created the dynamo_operations.py library to handle Dynamo operations such as create_item and update_item.
Include the following lines in the Processor Function:
from dynamo_operations import update_item_finished
SG_PROCESSES_TABLE_NAME = os.getenv('SG_PROCESSES_TABLE_NAME')
@logger.inject_lambda_context
def lambda_handler(event, context):
# Update the status item in Dynamo
item_state = json_body.get("item_state")
update_item_finished(SG_PROCESSES_TABLE_NAME, item_state)
logger.info({"action":"update_item", "payload":{"item_state":item_state}})
This way, when a Processor Function finishes, it updates the item status to “finished” in the DynamoDB SGProcessesDBTable.
Remember to update the template.yaml with the necessary Policies and Environment Variables.
Test the integration by deploying to your AWS account and triggering the Scatter Function. The DynamoDB tables should appear as follows:
You can also check the code in the following PR: https://github.com/jdaarevalo/ServerlessScatterGather/pull/2
In this section, we have demonstrated how to create a Scatter Lambda function that triggers the Processor for each state, writes items in the DynamoDB SGProcessesDBTable, and updates items in the SGAggregateDBTable.
Step 3. Create the Aggregator Lambda Function
In this step, we will create an Aggregator Lambda Function that receives data from the DynamoDB SGProcessesDBTable Stream, checks the status, and performs an upsert operation into the SGAggregateDBTable.
Update the template.yaml file
AggregatorFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/
Handler: aggregator_lambda.lambda_handler
Runtime: python3.9
Architectures:
- x86_64
Policies:
- AmazonDynamoDBFullAccess
Environment:
Variables:
SG_AGGREGATE_TABLE_NAME: !Ref SGAggregateTableName
Layers:
- !Sub 'arn:aws:lambda:${RegionName}:017000801446:layer:AWSLambdaPowertoolsPythonV2:16'
EventSourceSGProcessesDBTableStream:
Type: AWS::Lambda::EventSourceMapping
Properties:
BatchSize: 100
MaximumBatchingWindowInSeconds: 30
Enabled: True
FilterCriteria:
Filters:
- Pattern: '{"eventName":["MODIFY"]}'
EventSourceArn: !GetAtt SGProcessesDBTable.StreamArn
FunctionName: !GetAtt AggregatorFunction.Arn
StartingPosition: LATEST
The AWS::Lambda::EventSourceMapping resource establishes a connection between an event source (in our case, the Stream in the SGProcessesDBTable) and an AWS Lambda function. Lambda reads items from the event source and triggers the function.
For each action performed in a DynamoDB table (INSERT, MODIFY, REMOVE), an event is sent via DynamoDB Streams to the Lambda function. In this scenario, we are only interested in MODIFY events. We have added a filter pattern to the Lambda function’s criteria to filter these events.
Filtering evaluates events based on values within the message helps to prevent issues by adding logic evaluation or multiple if statements in the Lambda code, and reduces the number of Lambda executions.
Additionally, we configured a time window of 30 seconds and a batch size of 100. This means the Lambda function will execute every 30 seconds or once it has accumulated 100 records.
Create the Lambda Aggregator function
import os
import time
from aws_lambda_powertools import Logger
from dynamo_operations import update_finished_processes
logger = Logger()
timestamp = int(time.time())
SG_AGGREGATE_TABLE_NAME = os.getenv('SG_AGGREGATE_TABLE_NAME')
@logger.inject_lambda_context
def lambda_handler(event, context):
logger.info({"action":"invoke_lambda", "payload":{"event":event}})
agg_scatter_gather_id = {}
for record in event["Records"]:
new_image = record["dynamodb"]['NewImage']
status = new_image["status"]['S']
scatter_gather_id = new_image["scatter_gather_id"]['S']
if status == "finished":
agg_scatter_gather_id[scatter_gather_id] = agg_scatter_gather_id.get(scatter_gather_id, 0) + 1
logger.info({"action":"agg_scatter_gather_id", "payload":{"agg_scatter_gather_id":agg_scatter_gather_id}})
# update total_finished values in dynamo for each scatter_gather_id
for scatter_gather_id, value_to_sum in agg_scatter_gather_id.items():
update_finished_processes(SG_AGGREGATE_TABLE_NAME, scatter_gather_id, value_to_sum)
return {
"statusCode": 200,
"status": "success"
}
The Aggregator function is responsible for grouping the number of completed processes by scatter_gather_id and updating these values in the SG_AGGREGATE table.
For each record in the Lambda event, we check the new status update and the scatter_gather_id. If the status is “finished”, we increment the counter by scatter_gather_id. To update the value in the SGAggregateDBTable, we use the update_item method:
table.update_item(
Key={
'scatter_gather_id': scatter_gather_id
},
UpdateExpression='SET finished_processes = finished_processes + :val, #updated_at = :updated_at',
ExpressionAttributeValues={
':val': value_to_sum,
':updated_at': datetime_now
},
ExpressionAttributeNames={
'#updated_at': 'updated_at'
},
ReturnValues='ALL_NEW'
)
In this case, our update_item method utilizes the UpdateExpression to update the item with the SET action in two parts:
incrementing the finished_processes attribute by the new value (finished_processes = finished_processes + :val)
and setting the update_at attribute to the current date and time.
Learn more about UpdateExpression here https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateItem.html
To see the updates, check the following PR: https://github.com/jdaarevalo/ServerlessScatterGather/pull/3
Step 4. Create the Gather Lambda Function
In this last step, we will create the Gather Lambda Function, which is responsible for determining if the total_processes is equal to the finished_processes. To begin, create the necessary resources in the template. You can find the full details in the related PR.
https://github.com/jdaarevalo/ServerlessScatterGather/pull/4
The final Lambda function should have the following basic structure:
import os
import time
from aws_lambda_powertools import Logger
logger = Logger()
@logger.inject_lambda_context
def lambda_handler(event, context):
logger.info({"action":"invoke_lambda", "payload":{"event":event}})
finished_sg_ids = []
for record in event["Records"]:
event_name = record["eventName"]
new_image = record["dynamodb"]['NewImage']
finished_processes = new_image["finished_processes"]["N"]
total_processes = new_image["total_processes"]["N"]
if finished_processes == total_processes:
finished_sg_ids.append(new_image["scatter_gather_id"]['S'])
# report ScatterGather finished or trigger your next step
for scatter_gather_id in finished_sg_ids:
logger.info({"action":"finished_scatter_gather_id", "payload":{"scatter_gather_id":scatter_gather_id}})
Once the Gather function identifies that finished_processes == total_processes, it signifies that the workflow is ready to proceed with the next step. In this example, we log all possible scatter_gather_idsthat as finalized the processor tasks. However, you could trigger subsequent steps or incorporate additional logic depending on your specific use case.
Github repo: https://github.com/jdaarevalo/ServerlessScatterGather
In conclusion, adopting the scatter-gather pattern in AWS Serverless architectures has proven to be a game-changer for data processing and workflow management. By leveraging this powerful pattern, developers can optimize their applications, ensuring efficient data handling and improved performance. As more businesses migrate to serverless computing, the scatter-gather pattern will undoubtedly play a significant role in shaping the future of data engineering. Integrating the scatter-gather pattern into AWS Serverless has streamlined data processing and enabled developers to focus on core business logic. As the serverless ecosystem evolves, we can expect further advancements in data processing techniques that will ultimately benefit both developers and end-users. In this ever-changing landscape, the scatter-gather pattern has firmly established itself as a vital component of modern data engineering solutions. In summary, the scatter-gather pattern in AWS Serverless environments represents a pivotal shift in handling data processing. By incorporating this pattern, developers can build scalable, efficient, and cost-effective applications that meet the demands of today’s data-driven world. As the serverless ecosystem continues to mature, the scatter-gather pattern will stand as a testament to the power of innovation and the boundless potential of cloud computing.