Skip to content

Commit 29ff9d3

Browse files
authored
feat(GCS+gRPC): treat some kAlreadyExists errors as transient (#9564)
The service is (we think incorrectly) returning `kAlreadyExists` for some uploads. We need a workaround to unblock some testing. The GCS team is working on a fix in parallel.
1 parent 70d7f2d commit 29ff9d3

File tree

2 files changed

+85
-28
lines changed

2 files changed

+85
-28
lines changed

google/cloud/storage/internal/retry_client.cc

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,25 @@ Status ValidateCommittedSize(UploadChunkRequest const& request,
135135
return {};
136136
}
137137

138+
// For resumable uploads over gRPC we need to treat some non-retryable errors
139+
// as retryable.
140+
bool UploadChunkOnFailure(RetryPolicy& retry_policy, int& count,
141+
Status const& status) {
142+
// TODO(#9273) - use ErrorInfo when it becomes available
143+
if (status.code() == StatusCode::kAborted &&
144+
absl::StartsWith(status.message(), "Concurrent requests received.")) {
145+
return retry_policy.OnFailure(Status(
146+
StatusCode::kUnavailable, "TODO(#9273) - workaround service problems"));
147+
}
148+
// TODO(#9563) - kAlreadyExist is sometimes spurious
149+
if (status.code() == StatusCode::kAlreadyExists &&
150+
status.message() == "Requested entity already exists" && ++count == 1) {
151+
return retry_policy.OnFailure(Status(
152+
StatusCode::kUnavailable, "TODO(#9563) - workaround service problems"));
153+
}
154+
return retry_policy.OnFailure(status);
155+
}
156+
138157
} // namespace
139158

140159
std::shared_ptr<RetryClient> RetryClient::Create(
@@ -516,24 +535,17 @@ StatusOr<QueryResumableUploadResponse> RetryClient::UploadChunk(
516535
auto const expected_committed_size =
517536
request.offset() + request.payload_size();
518537

538+
int count_workaround_9563 = 0;
539+
519540
while (!retry_policy->IsExhausted()) {
520541
auto result = (*operation)(committed_size);
521542
if (!result) {
522543
// On a failure we preserve the error, then query if retry policy allows
523544
// retrying. If so, we backoff, and switch to calling
524545
// QueryResumableUpload().
525546
last_status = std::move(result).status();
526-
// For resumable uploads over gRPC some kAborted errors are retryable.
527-
// TODO(#9273) - use ErrorInfo when it becomes available
528-
auto constexpr kConcurrentMessagePrefix = "Concurrent requests received.";
529-
auto const is_concurrent_write =
530-
last_status.code() == StatusCode::kAborted &&
531-
absl::StartsWith(last_status.message(), kConcurrentMessagePrefix);
532-
auto const is_retryable =
533-
is_concurrent_write
534-
? retry_policy->OnFailure(Status(StatusCode::kUnavailable, ""))
535-
: retry_policy->OnFailure(last_status);
536-
if (!is_retryable) {
547+
if (!UploadChunkOnFailure(*retry_policy, count_workaround_9563,
548+
last_status)) {
537549
return return_error(std::move(last_status), *retry_policy, __func__);
538550
}
539551

google/cloud/storage/internal/retry_client_test.cc

Lines changed: 62 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -189,22 +189,10 @@ TEST(RetryClientTest, UploadChunkHandleTransient) {
189189
.WillOnce(
190190
Return(QueryResumableUploadResponse{2 * quantum, absl::nullopt}));
191191

192-
// Repeat the failure with kAborted. This error code is only retryable for
193-
// resumable uploads.
194-
EXPECT_CALL(*mock, UploadChunk)
195-
.WillOnce(Return(
196-
Status(StatusCode::kAborted, "Concurrent requests received.")));
197-
EXPECT_CALL(*mock, QueryResumableUpload)
198-
.WillOnce(
199-
Return(QueryResumableUploadResponse{2 * quantum, absl::nullopt}));
200-
EXPECT_CALL(*mock, UploadChunk)
201-
.WillOnce(
202-
Return(QueryResumableUploadResponse{3 * quantum, absl::nullopt}));
203-
204192
// Even simpler scenario where only the UploadChunk() calls succeeds.
205193
EXPECT_CALL(*mock, UploadChunk)
206194
.WillOnce(
207-
Return(QueryResumableUploadResponse{4 * quantum, absl::nullopt}));
195+
Return(QueryResumableUploadResponse{3 * quantum, absl::nullopt}));
208196

209197
auto response = client->UploadChunk(
210198
UploadChunkRequest("test-only-session-id", 0, {{payload}}));
@@ -220,11 +208,68 @@ TEST(RetryClientTest, UploadChunkHandleTransient) {
220208
UploadChunkRequest("test-only-session-id", 2 * quantum, {{payload}}));
221209
ASSERT_STATUS_OK(response);
222210
EXPECT_EQ(3 * quantum, response->committed_size.value_or(0));
211+
}
223212

224-
response = client->UploadChunk(
225-
UploadChunkRequest("test-only-session-id", 3 * quantum, {{payload}}));
226-
ASSERT_STATUS_OK(response);
227-
EXPECT_EQ(4 * quantum, response->committed_size.value_or(0));
213+
// TODO(#9293) - fix this test to use ErrorInfo
214+
Status TransientAbortError() {
215+
return Status(StatusCode::kAborted, "Concurrent requests received.");
216+
}
217+
218+
/// @test Verify that transient failures are handled as expected.
219+
TEST(RetryClientTest, UploadChunkAbortedMaybeIsTransient) {
220+
auto mock = std::make_shared<testing::MockClient>();
221+
auto client = RetryClient::Create(std::shared_ptr<internal::RawClient>(mock));
222+
google::cloud::internal::OptionsSpan const span(
223+
BasicTestPolicies().set<IdempotencyPolicyOption>(
224+
StrictIdempotencyPolicy().clone()));
225+
226+
auto const quantum = UploadChunkRequest::kChunkSizeQuantum;
227+
std::string const payload(quantum, '0');
228+
229+
// Verify that the workaround for "transients" (as defined in #9563) results
230+
// in calls to QueryResumableUpload().
231+
EXPECT_CALL(*mock, UploadChunk)
232+
.Times(4)
233+
.WillRepeatedly(Return(TransientAbortError()));
234+
EXPECT_CALL(*mock, QueryResumableUpload)
235+
.Times(AtLeast(2))
236+
.WillRepeatedly(Return(QueryResumableUploadResponse{0, absl::nullopt}));
237+
238+
auto response = client->UploadChunk(
239+
UploadChunkRequest("test-only-session-id", 0, {{payload}}));
240+
EXPECT_THAT(response, StatusIs(StatusCode::kAborted,
241+
HasSubstr("Concurrent requests received.")));
242+
}
243+
244+
// TODO(#9563) - remove this test once it is not needed
245+
Status Error9563() {
246+
return Status(StatusCode::kAlreadyExists, "Requested entity already exists");
247+
}
248+
249+
/// @test Verify that transient failures are handled as expected.
250+
TEST(RetryClientTest, UploadChunkWorkaround9563) {
251+
auto mock = std::make_shared<testing::MockClient>();
252+
auto client = RetryClient::Create(std::shared_ptr<internal::RawClient>(mock));
253+
google::cloud::internal::OptionsSpan const span(
254+
BasicTestPolicies().set<IdempotencyPolicyOption>(
255+
StrictIdempotencyPolicy().clone()));
256+
257+
auto const quantum = UploadChunkRequest::kChunkSizeQuantum;
258+
std::string const payload(quantum, '0');
259+
260+
// Verify that the workaround for "transients" (as defined in #9563) results
261+
// in calls to QueryResumableUpload().
262+
::testing::InSequence sequence;
263+
EXPECT_CALL(*mock, UploadChunk).WillOnce(Return(Error9563()));
264+
EXPECT_CALL(*mock, QueryResumableUpload)
265+
.WillOnce(Return(QueryResumableUploadResponse{0, absl::nullopt}));
266+
// The second error should be a permanent failure
267+
EXPECT_CALL(*mock, UploadChunk).WillOnce(Return(Error9563()));
268+
269+
auto response = client->UploadChunk(
270+
UploadChunkRequest("test-only-session-id", 0, {{payload}}));
271+
EXPECT_THAT(response, StatusIs(StatusCode::kAlreadyExists,
272+
HasSubstr("Requested entity already exists")));
228273
}
229274

230275
/// @test Verify that we can restore a session and continue writing.

0 commit comments

Comments
 (0)