fix: add retry on ABORTED errors#286
Conversation
Codecov Report
@@ Coverage Diff @@
## bc/bulk-master #286 +/- ##
====================================================
+ Coverage 72.94% 74.02% +1.07%
- Complexity 1015 1123 +108
====================================================
Files 63 67 +4
Lines 5407 6109 +702
Branches 617 672 +55
====================================================
+ Hits 3944 4522 +578
- Misses 1266 1373 +107
- Partials 197 214 +17
Continue to review full report at Codecov.
|
| }); | ||
| } | ||
|
|
||
| private void bulkCommit(BulkCommitBatch batch) { |
There was a problem hiding this comment.
What do you think about having the retry logic live in BulkWriter instead of inside UpdateBuilder? I originally wanted to take care of everything in UpdateBuilder, but then I realized that doing so would prohibit creating a new BulkCommitBatch via constructor.
There was a problem hiding this comment.
This matches Node so I am fully supportive.
| null, | ||
| ((FirestoreException) e).getStatus(), | ||
| e.getMessage())); | ||
| } else { |
There was a problem hiding this comment.
I wasn't sure how to handle the case where a non-FirestoreException was thrown. I added an additional field for an Exception in the event that the Exception thrown does not have a Status, but I wonder if this could be cleaner.
There was a problem hiding this comment.
The code in bulkCommit should wrap all exceptions in FirestoreException.
There was a problem hiding this comment.
Post conversation: We can't wrap some of the java exceptions that getting Futures will throw. However, the exceptions will still be surfaced to the user accordingly.
| * Removes all operations not specified in documentsToRetry from the batch. Marks the batch as | ||
| * READY_TO_SEND in order to allow the next batch to be sent. | ||
| */ | ||
| void sliceBatchForRetry(final Set<String> documentsToRetry) { |
There was a problem hiding this comment.
Oh man. I really tried to use the BulkCommitBatch constructor directly to properly imitate Node. However, since the BulkCommitBatch logic lives inside UpdateBuilder, it got pretty ugly. I had to add a bunch of getters, and managing the state on a BulkCommitBatch becomes a tricky problem / potential for future bugs. I think slicing the batch here is the lesser of the two evils, and I'm open to separating out the batch state override.
There was a problem hiding this comment.
How about https://gist.github.com/schmidt-sebastian/48ed97084b74bf7e3dd5936eddbfc001, or is this only addressing half of the problem?
There was a problem hiding this comment.
I ran into two main issues: first, my currently implementation requires the batch in bulkCommit() to be marked final, which means it can't be changed directly, though I guess we could put it in an array. The second is that the completeFuture used to track awaitBulkCommit() will never resolve, unless we pipe it through somehow (is that possible?).
There was a problem hiding this comment.
Update: Ended up updating to use BulkCommitBatch as constructor instead of slicing, pending successfulAsList
google-cloud-firestore/src/main/java/com/google/cloud/firestore/UpdateBuilder.java
Show resolved
Hide resolved
google-cloud-firestore/src/main/java/com/google/cloud/firestore/BatchWriteResult.java
Show resolved
Hide resolved
google-cloud-firestore/src/main/java/com/google/cloud/firestore/BatchWriteResult.java
Outdated
Show resolved
Hide resolved
google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java
Show resolved
Hide resolved
| .execute( | ||
| new Runnable() { | ||
| public void run() { | ||
| bulkCommit(batch); |
There was a problem hiding this comment.
We previously waited for bulkCommit before removing the batch from the batch queue. Do we not need to do this anymore?
There was a problem hiding this comment.
I believe we are still waiting on bulkCommit to finish before removing the batch. We're waiting for attemptBulkCommit in bulkCommit(), which means that bulkCommit() should finish its logic before we remove batches from the batch queue.
There was a problem hiding this comment.
Update: N/A now that we are chaining with ApiFutures.
| batch.processResults(commitFuture.get(), null); | ||
| } catch (Exception e) { | ||
| batch.processResults(new ArrayList<BatchWriteResult>(), e); | ||
| MoreExecutors.directExecutor() |
There was a problem hiding this comment.
MoreExecutors.directExecutor(). execute(...) seems equivalent to just running the code in the Runnable. Let me know if I am mistaken, maybe by adding a comment.
There was a problem hiding this comment.
I thought it was always good practice to execute runnables via an executor, as I haven't seen any Runnables being executed directly. Done.
google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java
Show resolved
Hide resolved
| } catch (Exception e) { | ||
| assertTrue(e.getMessage().contains("Mock batchWrite failed in test")); | ||
| ++opCount; | ||
| } | ||
| assertEquals(1, opCount); |
There was a problem hiding this comment.
| } catch (Exception e) { | |
| assertTrue(e.getMessage().contains("Mock batchWrite failed in test")); | |
| ++opCount; | |
| } | |
| assertEquals(1, opCount); | |
| Assert.fail(...); | |
| } catch (Exception e) { | |
| assertTrue(e.getMessage().contains("Mock batchWrite failed in test")); | |
| } |
|
|
||
| @Test | ||
| public void failsWritesAfterAllRetryAttemptsFail() throws Exception { | ||
| doThrow( |
There was a problem hiding this comment.
Does the mocking layer allow us to specify that we want this function to be called 10 times?
| } | ||
| Set<Code> codes = FirestoreSettings.newBuilder().batchWriteSettings().getRetryableCodes(); | ||
| for (Code code : codes) { | ||
| if (code.toString().equals(status.getCode().toString())) { |
There was a problem hiding this comment.
| if (code.toString().equals(status.getCode().toString())) { | |
| if (code.equals(Code.valueOf(status.getCode().name()))) { |
| * Removes all operations not specified in documentsToRetry from the batch. Marks the batch as | ||
| * READY_TO_SEND in order to allow the next batch to be sent. | ||
| */ | ||
| void sliceBatchForRetry(final Set<String> documentsToRetry) { |
There was a problem hiding this comment.
How about https://gist.github.com/schmidt-sebastian/48ed97084b74bf7e3dd5936eddbfc001, or is this only addressing half of the problem?
60dcfbd to
95c099f
Compare
This change should be cleaned up after com.google.com:google-cloud-shared-dependencies v0.8.4 is available
|
|
||
| BatchWriteResult(@Nullable Timestamp timestamp, Status status, String message) { | ||
| BatchWriteResult( | ||
| DocumentReference documentReference, @Nullable Timestamp timestamp, Exception exception) { |
There was a problem hiding this comment.
Exception should be @Nullable
google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java
Show resolved
Hide resolved
| commitFuture.addListener( | ||
| new Runnable() { | ||
| public void run() { | ||
| System.out.println("removing batch from batch queue. size: " + batchQueue.size()); |
| } | ||
|
|
||
| private ApiFuture<List<BatchWriteResult>> invokeBulkCommit(final BulkCommitBatch batch) { | ||
| return batch.bulkCommit(); |
There was a problem hiding this comment.
This can be inlined.
| private ApiFuture<Void> bulkCommit(final BulkCommitBatch batch, final int attempt) { | ||
| final SettableApiFuture<Void> backoffFuture = SettableApiFuture.create(); | ||
|
|
||
| class ProcessBulkCommitCallback implements ApiAsyncFunction<List<BatchWriteResult>, Void> { |
There was a problem hiding this comment.
Can you move these inner classes out of this method? Static classes within methods feel strange. :)
There was a problem hiding this comment.
I originally wanted to avoid adding class members to the callback classes, but readability/style is more important. Thanks!
| @Override | ||
| public ApiFuture<Void> apply(Void ignored) { | ||
|
|
||
| // If the BatchWrite RPC fails, map the exception to each individual result. |
There was a problem hiding this comment.
Move comment closer to the code that does this.
| new BatchWriteResult( | ||
| writes.get(i).documentReference, | ||
| updateTime, | ||
| updateTime == null |
There was a problem hiding this comment.
Should this be based on the Status?
There was a problem hiding this comment.
updateTime is null if the Status is not OK from above, but I can see how using status again is more readable. Changed to an if/else block that hopefully increases readability.
| } | ||
|
|
||
| // Mark the batch as ready to send in order to allow the batch to be retried again. | ||
| state = BatchState.READY_TO_SEND; |
There was a problem hiding this comment.
Is this now unused?
| batch.processResults(results); | ||
| if (batch.getPendingOperationCount() > 0) { | ||
| logger.log( | ||
| Level.WARNING, | ||
| String.format( | ||
| "Current batch failed at retry #%d. Num failures: %d", | ||
| attempt, batch.getPendingOperationCount())); | ||
|
|
||
| if (attempt < MAX_RETRY_ATTEMPTS) { | ||
| nextAttempt = backoff.createNextAttempt(nextAttempt); | ||
| BulkCommitBatch newBatch = | ||
| new BulkCommitBatch(firestore, batch, batch.getPendingDocuments()); | ||
| return bulkCommit(newBatch, attempt + 1); | ||
| } else { | ||
| batch.failRemainingOperations(results); | ||
| } | ||
| } | ||
| return ApiFutures.immediateFuture(null); |
There was a problem hiding this comment.
This confused me during the review. There is a strange interaction model going on here.
batch.processResults changes the internal state, which reduces the number of pending operations (this is not obvious from the callsite). new BulkCommitBatch then uses this filtered list of pending operations, but also uses the unfiltered list of writes.
While this matches what Node does, a lot of the back and forth in Node happens in the same codepath, and here these interactions all cross into and out of UpdateBuilder.
One improvement suggestion:
| batch.processResults(results); | |
| if (batch.getPendingOperationCount() > 0) { | |
| logger.log( | |
| Level.WARNING, | |
| String.format( | |
| "Current batch failed at retry #%d. Num failures: %d", | |
| attempt, batch.getPendingOperationCount())); | |
| if (attempt < MAX_RETRY_ATTEMPTS) { | |
| nextAttempt = backoff.createNextAttempt(nextAttempt); | |
| BulkCommitBatch newBatch = | |
| new BulkCommitBatch(firestore, batch, batch.getPendingDocuments()); | |
| return bulkCommit(newBatch, attempt + 1); | |
| } else { | |
| batch.failRemainingOperations(results); | |
| } | |
| } | |
| return ApiFutures.immediateFuture(null); | |
| batch.processResults(results); | |
| List<DocumentReference> pendingOps = batch.getRemainingOperations(); | |
| if (!pendingOps.isEmpty()) { | |
| logger.log( | |
| Level.WARNING, | |
| String.format( | |
| "Current batch failed at retry #%d. Num failures: %d", | |
| attempt,pendingOps.size()))); | |
| if (attempt < MAX_RETRY_ATTEMPTS) { | |
| nextAttempt = backoff.createNextAttempt(nextAttempt); | |
| BulkCommitBatch newBatch = | |
| new BulkCommitBatch(firestore, batch, pendingOps); | |
| return bulkCommit(newBatch, attempt + 1); | |
| } else { | |
| batch.failRemainingOperations(results); | |
| } | |
| } | |
| return ApiFutures.immediateFuture(null); |
There was a problem hiding this comment.
Thanks for taking the time to write it out and explain it. I remember thinking the code was kind of tricky and had a smell to it, but I couldn't pinpoint what it was. Changed and updated comments for processResults.
google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java
Show resolved
Hide resolved
| /** | ||
| * A queue of batches to be written. Use a synchronized list to avoid multi-thread concurrent | ||
| * modification errors. | ||
| */ |
There was a problem hiding this comment.
| /** | |
| * A queue of batches to be written. Use a synchronized list to avoid multi-thread concurrent | |
| * modification errors. | |
| */ | |
| /** | |
| * A queue of batches to be written. Use a synchronized list to avoid multi-thread concurrent | |
| * modification errors (as this list is modified from both the user thread and the network thread). | |
| */ |
There was a problem hiding this comment.
Done. Thanks for the clarification
| batch.processResults(results); | ||
| Set<DocumentReference> pendingOps = batch.getPendingDocuments(); |
There was a problem hiding this comment.
The logic is much cleaner now, but still not self-explanatory. I think this can be solved by naming though:
batch.processResults(results);
Set<DocumentReference> remainingOp = batch.getRemainingOperations();
Or:
batch.processResultsAndRemoveSuccesfulResultsFromPendingOperationsBernie2016(results);
Set<DocumentReference> remainingOp = batch. getPendingDocuments();
| @Nullable Timestamp updateTime; | ||
| @Nullable Exception exception; | ||
| if (code == Status.OK) { | ||
| updateTime = Timestamp.fromProto(writeResult.getUpdateTime()); | ||
| exception = null; | ||
| } else { | ||
| updateTime = null; | ||
| exception = FirestoreException.serverRejected(code, status.getMessage()); | ||
| } | ||
| result.add( | ||
| new BatchWriteResult(writes.get(i).documentReference, updateTime, exception)); |
There was a problem hiding this comment.
| @Nullable Timestamp updateTime; | |
| @Nullable Exception exception; | |
| if (code == Status.OK) { | |
| updateTime = Timestamp.fromProto(writeResult.getUpdateTime()); | |
| exception = null; | |
| } else { | |
| updateTime = null; | |
| exception = FirestoreException.serverRejected(code, status.getMessage()); | |
| } | |
| result.add( | |
| new BatchWriteResult(writes.get(i).documentReference, updateTime, exception)); | |
| Timestamp updateTime = null; | |
| Exception exception = null; | |
| if (Status.OK.equals(code)) { | |
| updateTime = Timestamp.fromProto(writeResult.getUpdateTime()); | |
| } else { | |
| exception = FirestoreException.serverRejected(code, status.getMessage()); | |
| } | |
| result.add( | |
| new BatchWriteResult(writes.get(i).documentReference, updateTime, exception)); |
| convertBatchWriteResult(results.get(i), future); | ||
| } else { | ||
| resultsMap.get(i).setException(error); | ||
| /** |
There was a problem hiding this comment.
Missing empty line.
| } | ||
|
|
||
| @Test | ||
| public void allWritesCompleteWhenFlushCompletes() throws Exception { |
There was a problem hiding this comment.
Is this more "flushCompletesWhenallWritesComplete"?
| assertTrue(e.getMessage().contains("Mock batchWrite failed in test")); | ||
| assertEquals(retryAttempts[0], BulkWriter.MAX_RETRY_ATTEMPTS + 1); | ||
| } | ||
| } |
There was a problem hiding this comment.
Can we add a test that:
- Adds two writes
- Calls flush
- Fails one write with retryable error, let's the other succeed
- Retries write which succeeds
- Test verifies that flush() only finishes now
I am not 100% convinced that this works right now.
* fix: add retry on ABORTED errors * WIP: test pass individually, concurrency error * WIP: tests passing but hidden concurrency bug * use BulkCommitBatch constructor, remove completeFuture * use firestoreExecutor and update rate limiter test * add todo for successfulAsList * Update BulkWriterTest to check number of retry attempts * build: manually bump com.google.api:api-common to v1.10.0 This change should be cleaned up after com.google.com:google-cloud-shared-dependencies v0.8.4 is available * test: add junit timeout rule to BulkWriterTest (#312) * update to use successfulAsList * add test to verify successfulAsList * lint * resolve comments, fix concurrency issue? * resolve comments, add additional test for flush Co-authored-by: BenWhitehead <[email protected]>
* fix: add retry on ABORTED errors * WIP: test pass individually, concurrency error * WIP: tests passing but hidden concurrency bug * use BulkCommitBatch constructor, remove completeFuture * use firestoreExecutor and update rate limiter test * add todo for successfulAsList * Update BulkWriterTest to check number of retry attempts * build: manually bump com.google.api:api-common to v1.10.0 This change should be cleaned up after com.google.com:google-cloud-shared-dependencies v0.8.4 is available * test: add junit timeout rule to BulkWriterTest (#312) * update to use successfulAsList * add test to verify successfulAsList * lint * resolve comments, fix concurrency issue? * resolve comments, add additional test for flush Co-authored-by: BenWhitehead <[email protected]>
Porting over from node.