Skip to content

Commit 55295bf

Browse files
authored
[ESM] Fix polling of SQS queue when batch size exceeds 10 (#11945)
1 parent 1bd7dca commit 55295bf

File tree

6 files changed

+2248
-4
lines changed

6 files changed

+2248
-4
lines changed

localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,9 @@ def poll_events(self) -> None:
6666
# https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html#pipes-sqs-scaling
6767
response = self.source_client.receive_message(
6868
QueueUrl=self.queue_url,
69-
MaxNumberOfMessages=self.sqs_queue_parameters["BatchSize"],
69+
MaxNumberOfMessages=min(
70+
self.sqs_queue_parameters["BatchSize"], DEFAULT_MAX_RECEIVE_COUNT
71+
), # BatchSize cannot exceed 10
7072
MessageAttributeNames=["All"],
7173
MessageSystemAttributeNames=[MessageSystemAttributeName.All],
7274
)

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py

Lines changed: 178 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import pytest
55
from botocore.exceptions import ClientError
6-
from localstack_snapshot.snapshots.transformer import KeyValueBasedTransformer
6+
from localstack_snapshot.snapshots.transformer import KeyValueBasedTransformer, SortingTransformer
77

88
from localstack import config
99
from localstack.aws.api.lambda_ import InvalidParameterValueException, Runtime
@@ -15,6 +15,7 @@
1515
from localstack.utils.testutil import check_expected_lambda_log_events_length, get_lambda_log_events
1616
from tests.aws.services.lambda_.functions import FUNCTIONS_PATH, lambda_integration
1717
from tests.aws.services.lambda_.test_lambda import (
18+
TEST_LAMBDA_EVENT_SOURCE_MAPPING_SEND_MESSAGE,
1819
TEST_LAMBDA_PYTHON,
1920
TEST_LAMBDA_PYTHON_ECHO,
2021
TEST_LAMBDA_PYTHON_ECHO_VERSION_ENV,
@@ -1042,6 +1043,182 @@ def test_sqs_event_source_mapping(
10421043
rs = aws_client.sqs.receive_message(QueueUrl=queue_url_1)
10431044
assert rs.get("Messages", []) == []
10441045

1046+
@pytest.mark.parametrize("batch_size", [15, 100, 1_000, 10_000])
1047+
@markers.aws.validated
1048+
def test_sqs_event_source_mapping_batch_size(
1049+
self,
1050+
create_lambda_function,
1051+
sqs_create_queue,
1052+
sqs_get_queue_arn,
1053+
lambda_su_role,
1054+
snapshot,
1055+
cleanups,
1056+
aws_client,
1057+
batch_size,
1058+
):
1059+
snapshot.add_transformer(snapshot.transform.sqs_api())
1060+
snapshot.add_transformer(SortingTransformer("Records", lambda s: s["body"]), priority=-1)
1061+
1062+
destination_queue_name = f"destination-queue-{short_uid()}"
1063+
function_name = f"lambda_func-{short_uid()}"
1064+
source_queue_name = f"source-queue-{short_uid()}"
1065+
mapping_uuid = None
1066+
1067+
destination_queue_url = sqs_create_queue(QueueName=destination_queue_name)
1068+
create_lambda_function(
1069+
func_name=function_name,
1070+
handler_file=TEST_LAMBDA_EVENT_SOURCE_MAPPING_SEND_MESSAGE,
1071+
runtime=Runtime.python3_12,
1072+
envvars={"SQS_QUEUE_URL": destination_queue_url},
1073+
role=lambda_su_role,
1074+
)
1075+
1076+
queue_url = sqs_create_queue(QueueName=source_queue_name)
1077+
queue_arn = sqs_get_queue_arn(queue_url)
1078+
1079+
create_event_source_mapping_response = aws_client.lambda_.create_event_source_mapping(
1080+
EventSourceArn=queue_arn,
1081+
FunctionName=function_name,
1082+
MaximumBatchingWindowInSeconds=10 if is_aws_cloud() else 2,
1083+
BatchSize=batch_size,
1084+
)
1085+
mapping_uuid = create_event_source_mapping_response["UUID"]
1086+
cleanups.append(lambda: aws_client.lambda_.delete_event_source_mapping(UUID=mapping_uuid))
1087+
snapshot.match("create-event-source-mapping-response", create_event_source_mapping_response)
1088+
_await_event_source_mapping_enabled(aws_client.lambda_, mapping_uuid)
1089+
1090+
reponse_batch_send_10 = aws_client.sqs.send_message_batch(
1091+
QueueUrl=queue_url,
1092+
Entries=[{"Id": f"{i}-0", "MessageBody": f"{i}-0-message-{i}"} for i in range(10)],
1093+
)
1094+
snapshot.match("send-message-batch-result-10", reponse_batch_send_10)
1095+
1096+
reponse_batch_send_5 = aws_client.sqs.send_message_batch(
1097+
QueueUrl=queue_url,
1098+
Entries=[{"Id": f"{i}-1", "MessageBody": f"{i}-1-message-{i}"} for i in range(5)],
1099+
)
1100+
snapshot.match("send-message-batch-result-5", reponse_batch_send_5)
1101+
1102+
batches = []
1103+
1104+
def get_msg_from_q():
1105+
messages_to_delete = []
1106+
receive_message_response = aws_client.sqs.receive_message(
1107+
QueueUrl=destination_queue_url,
1108+
MaxNumberOfMessages=10,
1109+
VisibilityTimeout=120,
1110+
WaitTimeSeconds=5 if is_aws_cloud() else 1,
1111+
)
1112+
messages = receive_message_response.get("Messages", [])
1113+
for message in messages:
1114+
received_batch = json.loads(message["Body"])
1115+
batches.append(received_batch)
1116+
messages_to_delete.append(
1117+
{"Id": message["MessageId"], "ReceiptHandle": message["ReceiptHandle"]}
1118+
)
1119+
1120+
aws_client.sqs.delete_message_batch(
1121+
QueueUrl=destination_queue_url, Entries=messages_to_delete
1122+
)
1123+
assert sum([len(batch) for batch in batches]) == 15
1124+
return [message for batch in batches for message in batch]
1125+
1126+
events = retry(get_msg_from_q, retries=15, sleep=5)
1127+
snapshot.match("Records", events)
1128+
1129+
# FIXME: this fails due to ESM not correctly collecting and sending batches
1130+
# where size exceeds 10 messages.
1131+
@markers.snapshot.skip_snapshot_verify(paths=["$..total_batches_received"])
1132+
@markers.aws.validated
1133+
def test_sqs_event_source_mapping_batching_reserved_concurrency(
1134+
self,
1135+
create_lambda_function,
1136+
sqs_create_queue,
1137+
sqs_get_queue_arn,
1138+
lambda_su_role,
1139+
snapshot,
1140+
cleanups,
1141+
aws_client,
1142+
):
1143+
snapshot.add_transformer(snapshot.transform.sqs_api())
1144+
snapshot.add_transformer(SortingTransformer("Records", lambda s: s["body"]), priority=-1)
1145+
1146+
destination_queue_name = f"destination-queue-{short_uid()}"
1147+
function_name = f"lambda_func-{short_uid()}"
1148+
source_queue_name = f"source-queue-{short_uid()}"
1149+
mapping_uuid = None
1150+
1151+
destination_queue_url = sqs_create_queue(QueueName=destination_queue_name)
1152+
create_lambda_function(
1153+
func_name=function_name,
1154+
handler_file=TEST_LAMBDA_EVENT_SOURCE_MAPPING_SEND_MESSAGE,
1155+
runtime=Runtime.python3_12,
1156+
envvars={"SQS_QUEUE_URL": destination_queue_url},
1157+
role=lambda_su_role,
1158+
)
1159+
1160+
# Prevent more than 2 Lambdas from being spun up at a time
1161+
put_concurrency_resp = aws_client.lambda_.put_function_concurrency(
1162+
FunctionName=function_name, ReservedConcurrentExecutions=2
1163+
)
1164+
snapshot.match("put_concurrency_resp", put_concurrency_resp)
1165+
1166+
queue_url = sqs_create_queue(QueueName=source_queue_name)
1167+
queue_arn = sqs_get_queue_arn(queue_url)
1168+
1169+
create_event_source_mapping_response = aws_client.lambda_.create_event_source_mapping(
1170+
EventSourceArn=queue_arn,
1171+
FunctionName=function_name,
1172+
MaximumBatchingWindowInSeconds=10,
1173+
BatchSize=20,
1174+
ScalingConfig={
1175+
"MaximumConcurrency": 2
1176+
}, # Prevent more than 2 concurrent SQS workers from being spun up at a time
1177+
)
1178+
mapping_uuid = create_event_source_mapping_response["UUID"]
1179+
cleanups.append(lambda: aws_client.lambda_.delete_event_source_mapping(UUID=mapping_uuid))
1180+
snapshot.match("create-event-source-mapping-response", create_event_source_mapping_response)
1181+
_await_event_source_mapping_enabled(aws_client.lambda_, mapping_uuid)
1182+
1183+
for b in range(3):
1184+
aws_client.sqs.send_message_batch(
1185+
QueueUrl=queue_url,
1186+
Entries=[{"Id": f"{i}-{b}", "MessageBody": f"{i}-{b}-message"} for i in range(10)],
1187+
)
1188+
1189+
batches = []
1190+
1191+
def get_msg_from_q():
1192+
messages_to_delete = []
1193+
receive_message_response = aws_client.sqs.receive_message(
1194+
QueueUrl=destination_queue_url,
1195+
MaxNumberOfMessages=10,
1196+
VisibilityTimeout=120,
1197+
WaitTimeSeconds=5,
1198+
)
1199+
messages = receive_message_response.get("Messages", [])
1200+
for message in messages:
1201+
received_batch = json.loads(message["Body"])
1202+
batches.append(received_batch)
1203+
messages_to_delete.append(
1204+
{"Id": message["MessageId"], "ReceiptHandle": message["ReceiptHandle"]}
1205+
)
1206+
1207+
if messages_to_delete:
1208+
aws_client.sqs.delete_message_batch(
1209+
QueueUrl=destination_queue_url, Entries=messages_to_delete
1210+
)
1211+
assert sum([len(batch) for batch in batches]) == 30
1212+
return [message for batch in batches for message in batch]
1213+
1214+
events = retry(get_msg_from_q, retries=15, sleep=5)
1215+
1216+
# We expect to receive 2 batches where each batch contains some proportion of the
1217+
# 30 messages we sent through, divided by the 20 ESM batch size. How this is split is
1218+
# not determinable a priori so rather just snapshots the events and the no. of batches.
1219+
snapshot.match("batch_info", {"total_batches_received": len(batches)})
1220+
snapshot.match("Records", events)
1221+
10451222
@markers.aws.validated
10461223
@pytest.mark.parametrize(
10471224
"filter, item_matching, item_not_matching",

0 commit comments

Comments
 (0)