Skip to content

Commit c06ae8a

Browse files
Brian ChenBenWhitehead
andauthored
fix: add retry on ABORTED errors (#286)
* fix: add retry on ABORTED errors * WIP: test pass individually, concurrency error * WIP: tests passing but hidden concurrency bug * use BulkCommitBatch constructor, remove completeFuture * use firestoreExecutor and update rate limiter test * add todo for successfulAsList * Update BulkWriterTest to check number of retry attempts * build: manually bump com.google.api:api-common to v1.10.0 This change should be cleaned up after com.google.com:google-cloud-shared-dependencies v0.8.4 is available * test: add junit timeout rule to BulkWriterTest (#312) * update to use successfulAsList * add test to verify successfulAsList * lint * resolve comments, fix concurrency issue? * resolve comments, add additional test for flush Co-authored-by: BenWhitehead <BenWhitehead@users.noreply.github.com>
1 parent 05c59dc commit c06ae8a

File tree

6 files changed

+534
-170
lines changed

6 files changed

+534
-170
lines changed

google-cloud-firestore/src/main/java/com/google/cloud/firestore/BatchWriteResult.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,34 +17,37 @@
1717
package com.google.cloud.firestore;
1818

1919
import com.google.cloud.Timestamp;
20-
import io.grpc.Status;
2120
import javax.annotation.Nullable;
2221

2322
/**
2423
* A BatchWriteResult wraps the write time and status returned by Firestore when making
2524
* BatchWriteRequests.
2625
*/
2726
public final class BatchWriteResult {
27+
private final DocumentReference documentReference;
2828
@Nullable private final Timestamp writeTime;
29-
private final Status status;
30-
private final String message;
29+
@Nullable private final Exception exception;
3130

32-
BatchWriteResult(@Nullable Timestamp timestamp, Status status, String message) {
31+
BatchWriteResult(
32+
DocumentReference documentReference,
33+
@Nullable Timestamp timestamp,
34+
@Nullable Exception exception) {
35+
this.documentReference = documentReference;
3336
this.writeTime = timestamp;
34-
this.status = status;
35-
this.message = message;
37+
this.exception = exception;
38+
}
39+
40+
public DocumentReference getDocumentReference() {
41+
return documentReference;
3642
}
3743

3844
@Nullable
3945
public Timestamp getWriteTime() {
4046
return writeTime;
4147
}
4248

43-
public Status getStatus() {
44-
return status;
45-
}
46-
47-
public String getMessage() {
48-
return message;
49+
@Nullable
50+
public Exception getException() {
51+
return exception;
4952
}
5053
}

google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java

Lines changed: 169 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@
1616

1717
package com.google.cloud.firestore;
1818

19+
import com.google.api.core.ApiAsyncFunction;
1920
import com.google.api.core.ApiFuture;
2021
import com.google.api.core.ApiFutures;
22+
import com.google.api.core.CurrentMillisClock;
2123
import com.google.api.core.SettableApiFuture;
24+
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
25+
import com.google.api.gax.retrying.TimedAttemptSettings;
2226
import com.google.cloud.firestore.UpdateBuilder.BatchState;
2327
import com.google.common.annotations.VisibleForTesting;
2428
import com.google.common.base.Preconditions;
@@ -28,6 +32,9 @@
2832
import java.util.ArrayList;
2933
import java.util.List;
3034
import java.util.Map;
35+
import java.util.Set;
36+
import java.util.concurrent.CopyOnWriteArrayList;
37+
import java.util.concurrent.ScheduledExecutorService;
3138
import java.util.concurrent.TimeUnit;
3239
import java.util.logging.Level;
3340
import java.util.logging.Logger;
@@ -41,6 +48,29 @@ class BulkCommitBatch extends UpdateBuilder<ApiFuture<WriteResult>> {
4148
super(firestore, maxBatchSize);
4249
}
4350

51+
BulkCommitBatch(
52+
FirestoreImpl firestore,
53+
BulkCommitBatch retryBatch,
54+
final Set<DocumentReference> docsToRetry) {
55+
super(firestore);
56+
this.writes.addAll(
57+
FluentIterable.from(retryBatch.writes)
58+
.filter(
59+
new Predicate<WriteOperation>() {
60+
@Override
61+
public boolean apply(WriteOperation writeOperation) {
62+
return docsToRetry.contains(writeOperation.documentReference);
63+
}
64+
})
65+
.toList());
66+
67+
Preconditions.checkState(
68+
retryBatch.state == BatchState.SENT,
69+
"Batch should be SENT when creating a new BulkCommitBatch for retry");
70+
this.state = retryBatch.state;
71+
this.pendingOperations = retryBatch.pendingOperations;
72+
}
73+
4474
ApiFuture<WriteResult> wrapResult(ApiFuture<WriteResult> result) {
4575
return result;
4676
}
@@ -55,6 +85,8 @@ public class BulkWriter {
5585
/** The maximum number of writes that can be in a single batch. */
5686
public static final int MAX_BATCH_SIZE = 500;
5787

88+
public static final int MAX_RETRY_ATTEMPTS = 10;
89+
5890
/**
5991
* The starting maximum number of operations per second as allowed by the 500/50/5 rule.
6092
*
@@ -85,8 +117,12 @@ public class BulkWriter {
85117
/** The maximum number of writes that can be in a single batch. */
86118
private int maxBatchSize = MAX_BATCH_SIZE;
87119

88-
/** A queue of batches to be written. */
89-
private final List<BulkCommitBatch> batchQueue = new ArrayList<>();
120+
/**
121+
* A queue of batches to be written. Use a synchronized list to avoid multi-thread concurrent
122+
* modification errors (as this list is modified from both the user thread and the network
123+
* thread).
124+
*/
125+
private final List<BulkCommitBatch> batchQueue = new CopyOnWriteArrayList<>();
90126

91127
/** Whether this BulkWriter instance is closed. Once closed, it cannot be opened again. */
92128
private boolean closed = false;
@@ -96,8 +132,19 @@ public class BulkWriter {
96132

97133
private final FirestoreImpl firestore;
98134

135+
private final ScheduledExecutorService firestoreExecutor;
136+
137+
private final ExponentialRetryAlgorithm backoff;
138+
private TimedAttemptSettings nextAttempt;
139+
99140
BulkWriter(FirestoreImpl firestore, boolean enableThrottling) {
100141
this.firestore = firestore;
142+
this.backoff =
143+
new ExponentialRetryAlgorithm(
144+
firestore.getOptions().getRetrySettings(), CurrentMillisClock.getDefaultClock());
145+
this.nextAttempt = backoff.createFirstAttempt();
146+
this.firestoreExecutor = firestore.getClient().getExecutor();
147+
101148
if (enableThrottling) {
102149
rateLimiter =
103150
new RateLimiter(
@@ -444,12 +491,13 @@ public ApiFuture<WriteResult> update(
444491
public ApiFuture<Void> flush() {
445492
verifyNotClosed();
446493
final SettableApiFuture<Void> flushComplete = SettableApiFuture.create();
447-
List<ApiFuture<Void>> writeFutures = new ArrayList<>();
494+
List<SettableApiFuture<WriteResult>> writeFutures = new ArrayList<>();
448495
for (BulkCommitBatch batch : batchQueue) {
449-
writeFutures.add(batch.awaitBulkCommit());
496+
batch.markReadyToSend();
497+
writeFutures.addAll(batch.getPendingFutures());
450498
}
451499
sendReadyBatches();
452-
ApiFutures.allAsList(writeFutures)
500+
ApiFutures.successfulAsList(writeFutures)
453501
.addListener(
454502
new Runnable() {
455503
public void run() {
@@ -493,7 +541,7 @@ private BulkCommitBatch getEligibleBatch(DocumentReference documentReference) {
493541
if (batchQueue.size() > 0) {
494542
BulkCommitBatch lastBatch = batchQueue.get(batchQueue.size() - 1);
495543
if (lastBatch.getState() == UpdateBuilder.BatchState.OPEN
496-
&& !lastBatch.getDocuments().contains(documentReference)) {
544+
&& !lastBatch.hasDocument(documentReference)) {
497545
return lastBatch;
498546
}
499547
}
@@ -539,23 +587,20 @@ public boolean apply(BulkCommitBatch batch) {
539587

540588
// Send the batch if it is under the rate limit, or schedule another attempt after the
541589
// appropriate timeout.
542-
long delayMs = rateLimiter.getNextRequestDelayMs(batch.getOperationCount());
590+
long delayMs = rateLimiter.getNextRequestDelayMs(batch.getPendingOperationCount());
543591
Preconditions.checkState(delayMs != -1, "Batch size should be under capacity");
544592
if (delayMs == 0) {
545593
sendBatch(batch);
546594
} else {
547-
firestore
548-
.getClient()
549-
.getExecutor()
550-
.schedule(
551-
new Runnable() {
552-
@Override
553-
public void run() {
554-
sendBatch(batch);
555-
}
556-
},
557-
delayMs,
558-
TimeUnit.MILLISECONDS);
595+
firestoreExecutor.schedule(
596+
new Runnable() {
597+
@Override
598+
public void run() {
599+
sendBatch(batch);
600+
}
601+
},
602+
delayMs,
603+
TimeUnit.MILLISECONDS);
559604
break;
560605
}
561606

@@ -568,27 +613,110 @@ public void run() {
568613
* next group of ready batches.
569614
*/
570615
private void sendBatch(final BulkCommitBatch batch) {
571-
boolean success = rateLimiter.tryMakeRequest(batch.getOperationCount());
616+
Preconditions.checkState(
617+
batch.state == BatchState.READY_TO_SEND,
618+
"The batch should be marked as READY_TO_SEND before committing");
619+
batch.state = BatchState.SENT;
620+
boolean success = rateLimiter.tryMakeRequest(batch.getPendingOperationCount());
572621
Preconditions.checkState(success, "Batch should be under rate limit to be sent.");
573-
try {
574-
final ApiFuture<List<BatchWriteResult>> commitFuture = batch.bulkCommit();
575-
commitFuture.addListener(
576-
new Runnable() {
577-
public void run() {
578-
try {
579-
batch.processResults(commitFuture.get(), null);
580-
} catch (Exception e) {
581-
batch.processResults(new ArrayList<BatchWriteResult>(), e);
582-
}
583-
// Remove the batch from BatchQueue after it has been processed.
584-
boolean removed = batchQueue.remove(batch);
585-
Preconditions.checkState(removed, "The batch should be in the BatchQueue.");
586-
sendReadyBatches();
587-
}
588-
},
622+
623+
ApiFuture<Void> commitFuture = bulkCommit(batch);
624+
commitFuture.addListener(
625+
new Runnable() {
626+
public void run() {
627+
boolean removed = batchQueue.remove(batch);
628+
Preconditions.checkState(
629+
removed, "The batch should be in the BatchQueue." + batchQueue.size());
630+
sendReadyBatches();
631+
}
632+
},
633+
MoreExecutors.directExecutor());
634+
}
635+
636+
private ApiFuture<Void> bulkCommit(BulkCommitBatch batch) {
637+
return bulkCommit(batch, 0);
638+
}
639+
640+
private ApiFuture<Void> bulkCommit(final BulkCommitBatch batch, final int attempt) {
641+
final SettableApiFuture<Void> backoffFuture = SettableApiFuture.create();
642+
643+
// Add a backoff delay. At first, this is 0.
644+
firestoreExecutor.schedule(
645+
new Runnable() {
646+
@Override
647+
public void run() {
648+
backoffFuture.set(null);
649+
}
650+
},
651+
nextAttempt.getRandomizedRetryDelay().toMillis(),
652+
TimeUnit.MILLISECONDS);
653+
654+
return ApiFutures.transformAsync(
655+
backoffFuture, new BackoffCallback(batch, attempt), firestoreExecutor);
656+
}
657+
658+
private class BackoffCallback implements ApiAsyncFunction<Void, Void> {
659+
final BulkCommitBatch batch;
660+
final int attempt;
661+
662+
public BackoffCallback(BulkCommitBatch batch, int attempt) {
663+
this.batch = batch;
664+
this.attempt = attempt;
665+
}
666+
667+
@Override
668+
public ApiFuture<Void> apply(Void ignored) {
669+
670+
return ApiFutures.transformAsync(
671+
ApiFutures.catchingAsync(
672+
batch.bulkCommit(),
673+
Exception.class,
674+
new ApiAsyncFunction<Exception, List<BatchWriteResult>>() {
675+
public ApiFuture<List<BatchWriteResult>> apply(Exception exception) {
676+
List<BatchWriteResult> results = new ArrayList<>();
677+
// If the BatchWrite RPC fails, map the exception to each individual result.
678+
for (DocumentReference documentReference : batch.getPendingDocuments()) {
679+
results.add(new BatchWriteResult(documentReference, null, exception));
680+
}
681+
return ApiFutures.immediateFuture(results);
682+
}
683+
},
684+
MoreExecutors.directExecutor()),
685+
new ProcessBulkCommitCallback(batch, attempt),
589686
MoreExecutors.directExecutor());
590-
} catch (Exception e) {
591-
batch.processResults(new ArrayList<BatchWriteResult>(), e);
687+
}
688+
}
689+
690+
private class ProcessBulkCommitCallback
691+
implements ApiAsyncFunction<List<BatchWriteResult>, Void> {
692+
final BulkCommitBatch batch;
693+
final int attempt;
694+
695+
public ProcessBulkCommitCallback(BulkCommitBatch batch, int attempt) {
696+
this.batch = batch;
697+
this.attempt = attempt;
698+
}
699+
700+
@Override
701+
public ApiFuture<Void> apply(List<BatchWriteResult> results) {
702+
batch.processResults(results);
703+
Set<DocumentReference> remainingOps = batch.getPendingDocuments();
704+
if (!remainingOps.isEmpty()) {
705+
logger.log(
706+
Level.WARNING,
707+
String.format(
708+
"Current batch failed at retry #%d. Num failures: %d",
709+
attempt, remainingOps.size()));
710+
711+
if (attempt < MAX_RETRY_ATTEMPTS) {
712+
nextAttempt = backoff.createNextAttempt(nextAttempt);
713+
BulkCommitBatch newBatch = new BulkCommitBatch(firestore, batch, remainingOps);
714+
return bulkCommit(newBatch, attempt + 1);
715+
} else {
716+
batch.failRemainingOperations(results);
717+
}
718+
}
719+
return ApiFutures.immediateFuture(null);
592720
}
593721
}
594722

@@ -601,15 +729,15 @@ private boolean isBatchSendable(BulkCommitBatch batch) {
601729
return false;
602730
}
603731

604-
for (final DocumentReference document : batch.getDocuments()) {
732+
for (final DocumentReference documentReference : batch.getPendingDocuments()) {
605733
boolean isRefInFlight =
606734
FluentIterable.from(batchQueue)
607735
.anyMatch(
608736
new Predicate<BulkCommitBatch>() {
609737
@Override
610738
public boolean apply(BulkCommitBatch batch) {
611739
return batch.getState().equals(BatchState.SENT)
612-
&& batch.getDocuments().contains(document);
740+
&& batch.hasDocument(documentReference);
613741
}
614742
});
615743

@@ -620,7 +748,7 @@ public boolean apply(BulkCommitBatch batch) {
620748
"Duplicate write to document %s detected. Writing to the same document multiple"
621749
+ " times will slow down BulkWriter. Write to unique documents in order to "
622750
+ "maximize throughput.",
623-
document));
751+
documentReference.getPath()));
624752
return false;
625753
}
626754
}

0 commit comments

Comments
 (0)