Skip to content

Commit 95c099f

Browse files
author
Brian Chen
committed
WIP: test pass individually, concurrency error
1 parent eb099d5 commit 95c099f

File tree

5 files changed

+170
-168
lines changed

5 files changed

+170
-168
lines changed

google-cloud-firestore/src/main/java/com/google/cloud/firestore/BatchWriteResult.java

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,59 +17,33 @@
1717
package com.google.cloud.firestore;
1818

1919
import com.google.cloud.Timestamp;
20-
import io.grpc.Status;
2120
import javax.annotation.Nullable;
2221

2322
/**
2423
* A BatchWriteResult wraps the write time and status returned by Firestore when making
2524
* BatchWriteRequests.
2625
*/
2726
public final class BatchWriteResult {
28-
private final String key;
27+
private final DocumentReference documentReference;
2928
@Nullable private final Timestamp writeTime;
30-
private final Status status;
31-
private final String message;
3229
@Nullable private final Exception exception;
3330

34-
BatchWriteResult(String key, @Nullable Timestamp timestamp, Status status, String message) {
35-
this.key = key;
36-
this.writeTime = timestamp;
37-
this.status = status;
38-
this.message = message;
39-
this.exception = null;
40-
}
41-
4231
BatchWriteResult(
43-
String key,
44-
@Nullable Timestamp timestamp,
45-
Status status,
46-
String message,
47-
Exception exception) {
48-
this.key = key;
32+
DocumentReference documentReference, @Nullable Timestamp timestamp, Exception exception) {
33+
this.documentReference = documentReference;
4934
this.writeTime = timestamp;
50-
this.status = status;
51-
this.message = message;
5235
this.exception = exception;
5336
}
5437

55-
public String getKey() {
56-
return key;
38+
public DocumentReference getDocumentReference() {
39+
return documentReference;
5740
}
5841

5942
@Nullable
6043
public Timestamp getWriteTime() {
6144
return writeTime;
6245
}
6346

64-
@Nullable
65-
public Status getStatus() {
66-
return status;
67-
}
68-
69-
public String getMessage() {
70-
return message;
71-
}
72-
7347
@Nullable
7448
public Exception getException() {
7549
return exception;

google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java

Lines changed: 103 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
package com.google.cloud.firestore;
1818

19+
import com.google.api.core.ApiAsyncFunction;
1920
import com.google.api.core.ApiFuture;
21+
import com.google.api.core.ApiFutureCallback;
2022
import com.google.api.core.ApiFutures;
2123
import com.google.api.core.CurrentMillisClock;
2224
import com.google.api.core.SettableApiFuture;
@@ -31,8 +33,7 @@
3133
import java.util.ArrayList;
3234
import java.util.List;
3335
import java.util.Map;
34-
import java.util.concurrent.Callable;
35-
import java.util.concurrent.ScheduledFuture;
36+
import java.util.concurrent.ScheduledExecutorService;
3637
import java.util.concurrent.TimeUnit;
3738
import java.util.logging.Level;
3839
import java.util.logging.Logger;
@@ -103,6 +104,8 @@ public class BulkWriter {
103104

104105
private final FirestoreImpl firestore;
105106

107+
// private final ScheduledExecutorService firestoreExecutor;
108+
106109
private final ExponentialRetryAlgorithm backoff;
107110
private TimedAttemptSettings nextAttempt;
108111

@@ -112,6 +115,7 @@ public class BulkWriter {
112115
new ExponentialRetryAlgorithm(
113116
firestore.getOptions().getRetrySettings(), CurrentMillisClock.getDefaultClock());
114117
this.nextAttempt = backoff.createFirstAttempt();
118+
// this.firestoreExecutor = firestore.getClient().getExecutor();
115119

116120
if (enableThrottling) {
117121
rateLimiter =
@@ -508,7 +512,7 @@ private BulkCommitBatch getEligibleBatch(DocumentReference documentReference) {
508512
if (batchQueue.size() > 0) {
509513
BulkCommitBatch lastBatch = batchQueue.get(batchQueue.size() - 1);
510514
if (lastBatch.getState() == UpdateBuilder.BatchState.OPEN
511-
&& !lastBatch.hasPath(documentReference.getPath())) {
515+
&& !lastBatch.hasDocument(documentReference)) {
512516
return lastBatch;
513517
}
514518
}
@@ -559,18 +563,15 @@ public boolean apply(BulkCommitBatch batch) {
559563
if (delayMs == 0) {
560564
sendBatch(batch);
561565
} else {
562-
firestore
563-
.getClient()
564-
.getExecutor()
565-
.schedule(
566-
new Runnable() {
567-
@Override
568-
public void run() {
569-
sendBatch(batch);
570-
}
571-
},
572-
delayMs,
573-
TimeUnit.MILLISECONDS);
566+
firestore.getClient().getExecutor().schedule(
567+
new Runnable() {
568+
@Override
569+
public void run() {
570+
sendBatch(batch);
571+
}
572+
},
573+
delayMs,
574+
TimeUnit.MILLISECONDS);
574575
break;
575576
}
576577

@@ -585,82 +586,97 @@ public void run() {
585586
private void sendBatch(final BulkCommitBatch batch) {
586587
boolean success = rateLimiter.tryMakeRequest(batch.getPendingOperationCount());
587588
Preconditions.checkState(success, "Batch should be under rate limit to be sent.");
588-
MoreExecutors.directExecutor()
589-
.execute(
590-
new Runnable() {
591-
public void run() {
592-
bulkCommit(batch);
593-
boolean removed = batchQueue.remove(batch);
594-
Preconditions.checkState(removed, "The batch should be in the BatchQueue.");
595-
sendReadyBatches();
596-
}
597-
});
589+
590+
// Schedule the actual RPC call on Firestore's executor so that it does not block the main
591+
// thread.
592+
ApiFuture<Void> commitFuture = bulkCommit(batch);
593+
commitFuture.addListener(
594+
new Runnable() {
595+
public void run() {
596+
boolean removed = batchQueue.remove(batch);
597+
Preconditions.checkState(removed, "The batch should be in the BatchQueue.");
598+
sendReadyBatches();
599+
}
600+
},
601+
MoreExecutors.directExecutor());
598602
}
599603

600-
private void bulkCommit(BulkCommitBatch batch) {
601-
List<BatchWriteResult> results = new ArrayList<>();
602-
for (int attempt = 0; attempt < MAX_RETRY_ATTEMPTS; ++attempt) {
603-
final BulkCommitBatch finalBatch = batch;
604-
ScheduledFuture<List<BatchWriteResult>> attemptBulkCommit =
605-
firestore
606-
.getClient()
607-
.getExecutor()
608-
.schedule(
609-
new Callable<List<BatchWriteResult>>() {
610-
public List<BatchWriteResult> call() {
611-
List<BatchWriteResult> results = new ArrayList<>();
612-
try {
613-
return finalBatch.bulkCommit().get();
614-
} catch (Exception e) {
615-
// Map the failure to each individual write's result.
616-
for (String path : finalBatch.getPendingDocs()) {
617-
if (e instanceof FirestoreException) {
618-
results.add(
619-
new BatchWriteResult(
620-
path,
621-
null,
622-
((FirestoreException) e).getStatus(),
623-
e.getMessage()));
624-
} else {
625-
results.add(new BatchWriteResult(path, null, null, e.getMessage(), e));
626-
}
627-
}
628-
return results;
629-
}
630-
}
631-
},
632-
nextAttempt.getRandomizedRetryDelay().toMillis(),
633-
TimeUnit.MILLISECONDS);
634-
635-
try {
636-
results = attemptBulkCommit.get();
637-
batch.processResults(results, null);
638-
} catch (Exception e) {
639-
for (String path : batch.getPendingDocs()) {
640-
results.add(new BatchWriteResult(path, null, null, e.getMessage(), e));
641-
}
642-
batch.processResults(new ArrayList<BatchWriteResult>(), e);
643-
}
604+
private ApiFuture<Void> bulkCommit(BulkCommitBatch batch) {
605+
return bulkCommit(batch, 0);
606+
}
644607

645-
if (batch.getPendingOperationCount() > 0) {
646-
logger.log(
647-
Level.WARNING,
648-
String.format(
649-
"Current batch failed at retry #%d. Num failures: %d",
650-
attempt, batch.getPendingOperationCount()));
651-
batch.sliceBatchForRetry(batch.getPendingDocs());
652-
batch.markReadyToSend();
608+
ApiFuture<List<BatchWriteResult>> invokeBulkCommit(final BulkCommitBatch batch) {
609+
return batch.bulkCommit();
610+
}
653611

654-
} else {
655-
batch.markComplete();
656-
return;
612+
private ApiFuture<Void> bulkCommit(final BulkCommitBatch batch, final int attempt) {
613+
final BulkCommitBatch finalBatch = batch;
614+
final SettableApiFuture<Void> backoffFuture = SettableApiFuture.create();
615+
616+
class ProcessBulkCommitCallback implements ApiAsyncFunction<List<BatchWriteResult>, Void> {
617+
@Override
618+
public ApiFuture<Void> apply(List<BatchWriteResult> results) {
619+
finalBatch.processResults(results);
620+
if (finalBatch.getPendingOperationCount() > 0) {
621+
logger.log(
622+
Level.WARNING,
623+
String.format(
624+
"Current batch failed at retry #%d. Num failures: %d",
625+
attempt, finalBatch.getPendingOperationCount()));
626+
finalBatch.sliceBatchForRetry(finalBatch.getPendingDocuments());
627+
finalBatch.markReadyToSend();
628+
629+
if (attempt < MAX_RETRY_ATTEMPTS) {
630+
nextAttempt = backoff.createNextAttempt(nextAttempt);
631+
return bulkCommit(finalBatch, attempt + 1);
632+
} else {
633+
finalBatch.failRemainingOperations(results);
634+
finalBatch.markComplete();
635+
}
636+
} else {
637+
finalBatch.markComplete();
638+
}
639+
return ApiFutures.immediateFuture(null);
657640
}
641+
}
658642

659-
nextAttempt = backoff.createNextAttempt(nextAttempt);
643+
class BackoffCallback implements ApiAsyncFunction<Void, Void> {
644+
@Override
645+
public ApiFuture<Void> apply(Void ignored) {
646+
647+
// If the BatchWrite RPC fails, map the exception to each individual result.
648+
return ApiFutures.transformAsync(
649+
ApiFutures.catchingAsync(
650+
invokeBulkCommit(finalBatch),
651+
Exception.class,
652+
new ApiAsyncFunction<Exception, List<BatchWriteResult>>() {
653+
public ApiFuture<List<BatchWriteResult>> apply(Exception exception) {
654+
List<BatchWriteResult> results = new ArrayList<>();
655+
for (DocumentReference documentReference : finalBatch.getPendingDocuments()) {
656+
results.add(new BatchWriteResult(documentReference, null, exception));
657+
}
658+
return ApiFutures.immediateFuture(results);
659+
}
660+
},
661+
MoreExecutors.directExecutor()),
662+
new ProcessBulkCommitCallback(),
663+
MoreExecutors.directExecutor());
664+
}
660665
}
661666

662-
batch.failRemainingOperations(results);
663-
batch.markComplete();
667+
// Add a backoff delay. At first, this is 0.
668+
firestore.getClient().getExecutor().schedule(
669+
new Runnable() {
670+
@Override
671+
public void run() {
672+
backoffFuture.set(null);
673+
}
674+
},
675+
nextAttempt.getRandomizedRetryDelay().toMillis(),
676+
TimeUnit.MILLISECONDS);
677+
678+
return ApiFutures.transformAsync(
679+
backoffFuture, new BackoffCallback(), MoreExecutors.directExecutor());
664680
}
665681

666682
/**
@@ -672,14 +688,15 @@ private boolean isBatchSendable(BulkCommitBatch batch) {
672688
return false;
673689
}
674690

675-
for (final String path : batch.getPendingDocs()) {
691+
for (final DocumentReference documentReference : batch.getPendingDocuments()) {
676692
boolean isRefInFlight =
677693
FluentIterable.from(batchQueue)
678694
.anyMatch(
679695
new Predicate<BulkCommitBatch>() {
680696
@Override
681697
public boolean apply(BulkCommitBatch batch) {
682-
return batch.getState().equals(BatchState.SENT) && batch.hasPath(path);
698+
return batch.getState().equals(BatchState.SENT)
699+
&& batch.hasDocument(documentReference);
683700
}
684701
});
685702

@@ -690,7 +707,7 @@ public boolean apply(BulkCommitBatch batch) {
690707
"Duplicate write to document %s detected. Writing to the same document multiple"
691708
+ " times will slow down BulkWriter. Write to unique documents in order to "
692709
+ "maximize throughput.",
693-
path));
710+
documentReference.getPath()));
694711
return false;
695712
}
696713
}

0 commit comments

Comments
 (0)