|
3 | 3 |
|
4 | 4 | import pytest
|
5 | 5 | from botocore.exceptions import ClientError
|
6 |
| -from localstack_snapshot.snapshots.transformer import KeyValueBasedTransformer |
| 6 | +from localstack_snapshot.snapshots.transformer import KeyValueBasedTransformer, SortingTransformer |
7 | 7 |
|
8 | 8 | from localstack import config
|
9 | 9 | from localstack.aws.api.lambda_ import InvalidParameterValueException, Runtime
|
|
15 | 15 | from localstack.utils.testutil import check_expected_lambda_log_events_length, get_lambda_log_events
|
16 | 16 | from tests.aws.services.lambda_.functions import FUNCTIONS_PATH, lambda_integration
|
17 | 17 | from tests.aws.services.lambda_.test_lambda import (
|
| 18 | + TEST_LAMBDA_EVENT_SOURCE_MAPPING_SEND_MESSAGE, |
18 | 19 | TEST_LAMBDA_PYTHON,
|
19 | 20 | TEST_LAMBDA_PYTHON_ECHO,
|
20 | 21 | TEST_LAMBDA_PYTHON_ECHO_VERSION_ENV,
|
@@ -1042,6 +1043,182 @@ def test_sqs_event_source_mapping(
|
1042 | 1043 | rs = aws_client.sqs.receive_message(QueueUrl=queue_url_1)
|
1043 | 1044 | assert rs.get("Messages", []) == []
|
1044 | 1045 |
|
| 1046 | + @pytest.mark.parametrize("batch_size", [15, 100, 1_000, 10_000]) |
| 1047 | + @markers.aws.validated |
| 1048 | + def test_sqs_event_source_mapping_batch_size( |
| 1049 | + self, |
| 1050 | + create_lambda_function, |
| 1051 | + sqs_create_queue, |
| 1052 | + sqs_get_queue_arn, |
| 1053 | + lambda_su_role, |
| 1054 | + snapshot, |
| 1055 | + cleanups, |
| 1056 | + aws_client, |
| 1057 | + batch_size, |
| 1058 | + ): |
| 1059 | + snapshot.add_transformer(snapshot.transform.sqs_api()) |
| 1060 | + snapshot.add_transformer(SortingTransformer("Records", lambda s: s["body"]), priority=-1) |
| 1061 | + |
| 1062 | + destination_queue_name = f"destination-queue-{short_uid()}" |
| 1063 | + function_name = f"lambda_func-{short_uid()}" |
| 1064 | + source_queue_name = f"source-queue-{short_uid()}" |
| 1065 | + mapping_uuid = None |
| 1066 | + |
| 1067 | + destination_queue_url = sqs_create_queue(QueueName=destination_queue_name) |
| 1068 | + create_lambda_function( |
| 1069 | + func_name=function_name, |
| 1070 | + handler_file=TEST_LAMBDA_EVENT_SOURCE_MAPPING_SEND_MESSAGE, |
| 1071 | + runtime=Runtime.python3_12, |
| 1072 | + envvars={"SQS_QUEUE_URL": destination_queue_url}, |
| 1073 | + role=lambda_su_role, |
| 1074 | + ) |
| 1075 | + |
| 1076 | + queue_url = sqs_create_queue(QueueName=source_queue_name) |
| 1077 | + queue_arn = sqs_get_queue_arn(queue_url) |
| 1078 | + |
| 1079 | + create_event_source_mapping_response = aws_client.lambda_.create_event_source_mapping( |
| 1080 | + EventSourceArn=queue_arn, |
| 1081 | + FunctionName=function_name, |
| 1082 | + MaximumBatchingWindowInSeconds=10 if is_aws_cloud() else 2, |
| 1083 | + BatchSize=batch_size, |
| 1084 | + ) |
| 1085 | + mapping_uuid = create_event_source_mapping_response["UUID"] |
| 1086 | + cleanups.append(lambda: aws_client.lambda_.delete_event_source_mapping(UUID=mapping_uuid)) |
| 1087 | + snapshot.match("create-event-source-mapping-response", create_event_source_mapping_response) |
| 1088 | + _await_event_source_mapping_enabled(aws_client.lambda_, mapping_uuid) |
| 1089 | + |
| 1090 | + reponse_batch_send_10 = aws_client.sqs.send_message_batch( |
| 1091 | + QueueUrl=queue_url, |
| 1092 | + Entries=[{"Id": f"{i}-0", "MessageBody": f"{i}-0-message-{i}"} for i in range(10)], |
| 1093 | + ) |
| 1094 | + snapshot.match("send-message-batch-result-10", reponse_batch_send_10) |
| 1095 | + |
| 1096 | + reponse_batch_send_5 = aws_client.sqs.send_message_batch( |
| 1097 | + QueueUrl=queue_url, |
| 1098 | + Entries=[{"Id": f"{i}-1", "MessageBody": f"{i}-1-message-{i}"} for i in range(5)], |
| 1099 | + ) |
| 1100 | + snapshot.match("send-message-batch-result-5", reponse_batch_send_5) |
| 1101 | + |
| 1102 | + batches = [] |
| 1103 | + |
| 1104 | + def get_msg_from_q(): |
| 1105 | + messages_to_delete = [] |
| 1106 | + receive_message_response = aws_client.sqs.receive_message( |
| 1107 | + QueueUrl=destination_queue_url, |
| 1108 | + MaxNumberOfMessages=10, |
| 1109 | + VisibilityTimeout=120, |
| 1110 | + WaitTimeSeconds=5 if is_aws_cloud() else 1, |
| 1111 | + ) |
| 1112 | + messages = receive_message_response.get("Messages", []) |
| 1113 | + for message in messages: |
| 1114 | + received_batch = json.loads(message["Body"]) |
| 1115 | + batches.append(received_batch) |
| 1116 | + messages_to_delete.append( |
| 1117 | + {"Id": message["MessageId"], "ReceiptHandle": message["ReceiptHandle"]} |
| 1118 | + ) |
| 1119 | + |
| 1120 | + aws_client.sqs.delete_message_batch( |
| 1121 | + QueueUrl=destination_queue_url, Entries=messages_to_delete |
| 1122 | + ) |
| 1123 | + assert sum([len(batch) for batch in batches]) == 15 |
| 1124 | + return [message for batch in batches for message in batch] |
| 1125 | + |
| 1126 | + events = retry(get_msg_from_q, retries=15, sleep=5) |
| 1127 | + snapshot.match("Records", events) |
| 1128 | + |
| 1129 | + # FIXME: this fails due to ESM not correctly collecting and sending batches |
| 1130 | + # where size exceeds 10 messages. |
| 1131 | + @markers.snapshot.skip_snapshot_verify(paths=["$..total_batches_received"]) |
| 1132 | + @markers.aws.validated |
| 1133 | + def test_sqs_event_source_mapping_batching_reserved_concurrency( |
| 1134 | + self, |
| 1135 | + create_lambda_function, |
| 1136 | + sqs_create_queue, |
| 1137 | + sqs_get_queue_arn, |
| 1138 | + lambda_su_role, |
| 1139 | + snapshot, |
| 1140 | + cleanups, |
| 1141 | + aws_client, |
| 1142 | + ): |
| 1143 | + snapshot.add_transformer(snapshot.transform.sqs_api()) |
| 1144 | + snapshot.add_transformer(SortingTransformer("Records", lambda s: s["body"]), priority=-1) |
| 1145 | + |
| 1146 | + destination_queue_name = f"destination-queue-{short_uid()}" |
| 1147 | + function_name = f"lambda_func-{short_uid()}" |
| 1148 | + source_queue_name = f"source-queue-{short_uid()}" |
| 1149 | + mapping_uuid = None |
| 1150 | + |
| 1151 | + destination_queue_url = sqs_create_queue(QueueName=destination_queue_name) |
| 1152 | + create_lambda_function( |
| 1153 | + func_name=function_name, |
| 1154 | + handler_file=TEST_LAMBDA_EVENT_SOURCE_MAPPING_SEND_MESSAGE, |
| 1155 | + runtime=Runtime.python3_12, |
| 1156 | + envvars={"SQS_QUEUE_URL": destination_queue_url}, |
| 1157 | + role=lambda_su_role, |
| 1158 | + ) |
| 1159 | + |
| 1160 | + # Prevent more than 2 Lambdas from being spun up at a time |
| 1161 | + put_concurrency_resp = aws_client.lambda_.put_function_concurrency( |
| 1162 | + FunctionName=function_name, ReservedConcurrentExecutions=2 |
| 1163 | + ) |
| 1164 | + snapshot.match("put_concurrency_resp", put_concurrency_resp) |
| 1165 | + |
| 1166 | + queue_url = sqs_create_queue(QueueName=source_queue_name) |
| 1167 | + queue_arn = sqs_get_queue_arn(queue_url) |
| 1168 | + |
| 1169 | + create_event_source_mapping_response = aws_client.lambda_.create_event_source_mapping( |
| 1170 | + EventSourceArn=queue_arn, |
| 1171 | + FunctionName=function_name, |
| 1172 | + MaximumBatchingWindowInSeconds=10, |
| 1173 | + BatchSize=20, |
| 1174 | + ScalingConfig={ |
| 1175 | + "MaximumConcurrency": 2 |
| 1176 | + }, # Prevent more than 2 concurrent SQS workers from being spun up at a time |
| 1177 | + ) |
| 1178 | + mapping_uuid = create_event_source_mapping_response["UUID"] |
| 1179 | + cleanups.append(lambda: aws_client.lambda_.delete_event_source_mapping(UUID=mapping_uuid)) |
| 1180 | + snapshot.match("create-event-source-mapping-response", create_event_source_mapping_response) |
| 1181 | + _await_event_source_mapping_enabled(aws_client.lambda_, mapping_uuid) |
| 1182 | + |
| 1183 | + for b in range(3): |
| 1184 | + aws_client.sqs.send_message_batch( |
| 1185 | + QueueUrl=queue_url, |
| 1186 | + Entries=[{"Id": f"{i}-{b}", "MessageBody": f"{i}-{b}-message"} for i in range(10)], |
| 1187 | + ) |
| 1188 | + |
| 1189 | + batches = [] |
| 1190 | + |
| 1191 | + def get_msg_from_q(): |
| 1192 | + messages_to_delete = [] |
| 1193 | + receive_message_response = aws_client.sqs.receive_message( |
| 1194 | + QueueUrl=destination_queue_url, |
| 1195 | + MaxNumberOfMessages=10, |
| 1196 | + VisibilityTimeout=120, |
| 1197 | + WaitTimeSeconds=5, |
| 1198 | + ) |
| 1199 | + messages = receive_message_response.get("Messages", []) |
| 1200 | + for message in messages: |
| 1201 | + received_batch = json.loads(message["Body"]) |
| 1202 | + batches.append(received_batch) |
| 1203 | + messages_to_delete.append( |
| 1204 | + {"Id": message["MessageId"], "ReceiptHandle": message["ReceiptHandle"]} |
| 1205 | + ) |
| 1206 | + |
| 1207 | + if messages_to_delete: |
| 1208 | + aws_client.sqs.delete_message_batch( |
| 1209 | + QueueUrl=destination_queue_url, Entries=messages_to_delete |
| 1210 | + ) |
| 1211 | + assert sum([len(batch) for batch in batches]) == 30 |
| 1212 | + return [message for batch in batches for message in batch] |
| 1213 | + |
| 1214 | + events = retry(get_msg_from_q, retries=15, sleep=5) |
| 1215 | + |
| 1216 | + # We expect to receive 2 batches where each batch contains some proportion of the |
| 1217 | + # 30 messages we sent through, divided by the 20 ESM batch size. How this is split is |
| 1218 | + # not determinable a priori so rather just snapshots the events and the no. of batches. |
| 1219 | + snapshot.match("batch_info", {"total_batches_received": len(batches)}) |
| 1220 | + snapshot.match("Records", events) |
| 1221 | + |
1045 | 1222 | @markers.aws.validated
|
1046 | 1223 | @pytest.mark.parametrize(
|
1047 | 1224 | "filter, item_matching, item_not_matching",
|
|
0 commit comments