1616
1717package com .google .cloud .firestore ;
1818
19+ import com .google .api .core .ApiAsyncFunction ;
1920import com .google .api .core .ApiFuture ;
2021import com .google .api .core .ApiFutures ;
22+ import com .google .api .core .CurrentMillisClock ;
2123import com .google .api .core .SettableApiFuture ;
24+ import com .google .api .gax .retrying .ExponentialRetryAlgorithm ;
25+ import com .google .api .gax .retrying .TimedAttemptSettings ;
2226import com .google .cloud .firestore .UpdateBuilder .BatchState ;
2327import com .google .common .annotations .VisibleForTesting ;
2428import com .google .common .base .Preconditions ;
2832import java .util .ArrayList ;
2933import java .util .List ;
3034import java .util .Map ;
35+ import java .util .Set ;
36+ import java .util .concurrent .CopyOnWriteArrayList ;
37+ import java .util .concurrent .ScheduledExecutorService ;
3138import java .util .concurrent .TimeUnit ;
3239import java .util .logging .Level ;
3340import java .util .logging .Logger ;
@@ -41,6 +48,29 @@ class BulkCommitBatch extends UpdateBuilder<ApiFuture<WriteResult>> {
4148 super (firestore , maxBatchSize );
4249 }
4350
51+ BulkCommitBatch (
52+ FirestoreImpl firestore ,
53+ BulkCommitBatch retryBatch ,
54+ final Set <DocumentReference > docsToRetry ) {
55+ super (firestore );
56+ this .writes .addAll (
57+ FluentIterable .from (retryBatch .writes )
58+ .filter (
59+ new Predicate <WriteOperation >() {
60+ @ Override
61+ public boolean apply (WriteOperation writeOperation ) {
62+ return docsToRetry .contains (writeOperation .documentReference );
63+ }
64+ })
65+ .toList ());
66+
67+ Preconditions .checkState (
68+ retryBatch .state == BatchState .SENT ,
69+ "Batch should be SENT when creating a new BulkCommitBatch for retry" );
70+ this .state = retryBatch .state ;
71+ this .pendingOperations = retryBatch .pendingOperations ;
72+ }
73+
4474 ApiFuture <WriteResult > wrapResult (ApiFuture <WriteResult > result ) {
4575 return result ;
4676 }
@@ -55,6 +85,8 @@ public class BulkWriter {
5585 /** The maximum number of writes that can be in a single batch. */
5686 public static final int MAX_BATCH_SIZE = 500 ;
5787
88+ public static final int MAX_RETRY_ATTEMPTS = 10 ;
89+
5890 /**
5991 * The starting maximum number of operations per second as allowed by the 500/50/5 rule.
6092 *
@@ -85,8 +117,12 @@ public class BulkWriter {
85117 /** The maximum number of writes that can be in a single batch. */
86118 private int maxBatchSize = MAX_BATCH_SIZE ;
87119
88- /** A queue of batches to be written. */
89- private final List <BulkCommitBatch > batchQueue = new ArrayList <>();
120+ /**
121+ * A queue of batches to be written. Use a synchronized list to avoid multi-thread concurrent
122+ * modification errors (as this list is modified from both the user thread and the network
123+ * thread).
124+ */
125+ private final List <BulkCommitBatch > batchQueue = new CopyOnWriteArrayList <>();
90126
91127 /** Whether this BulkWriter instance is closed. Once closed, it cannot be opened again. */
92128 private boolean closed = false ;
@@ -96,8 +132,19 @@ public class BulkWriter {
96132
97133 private final FirestoreImpl firestore ;
98134
135+ private final ScheduledExecutorService firestoreExecutor ;
136+
137+ private final ExponentialRetryAlgorithm backoff ;
138+ private TimedAttemptSettings nextAttempt ;
139+
99140 BulkWriter (FirestoreImpl firestore , boolean enableThrottling ) {
100141 this .firestore = firestore ;
142+ this .backoff =
143+ new ExponentialRetryAlgorithm (
144+ firestore .getOptions ().getRetrySettings (), CurrentMillisClock .getDefaultClock ());
145+ this .nextAttempt = backoff .createFirstAttempt ();
146+ this .firestoreExecutor = firestore .getClient ().getExecutor ();
147+
101148 if (enableThrottling ) {
102149 rateLimiter =
103150 new RateLimiter (
@@ -444,12 +491,13 @@ public ApiFuture<WriteResult> update(
444491 public ApiFuture <Void > flush () {
445492 verifyNotClosed ();
446493 final SettableApiFuture <Void > flushComplete = SettableApiFuture .create ();
447- List <ApiFuture < Void >> writeFutures = new ArrayList <>();
494+ List <SettableApiFuture < WriteResult >> writeFutures = new ArrayList <>();
448495 for (BulkCommitBatch batch : batchQueue ) {
449- writeFutures .add (batch .awaitBulkCommit ());
496+ batch .markReadyToSend ();
497+ writeFutures .addAll (batch .getPendingFutures ());
450498 }
451499 sendReadyBatches ();
452- ApiFutures .allAsList (writeFutures )
500+ ApiFutures .successfulAsList (writeFutures )
453501 .addListener (
454502 new Runnable () {
455503 public void run () {
@@ -493,7 +541,7 @@ private BulkCommitBatch getEligibleBatch(DocumentReference documentReference) {
493541 if (batchQueue .size () > 0 ) {
494542 BulkCommitBatch lastBatch = batchQueue .get (batchQueue .size () - 1 );
495543 if (lastBatch .getState () == UpdateBuilder .BatchState .OPEN
496- && !lastBatch .getDocuments (). contains (documentReference )) {
544+ && !lastBatch .hasDocument (documentReference )) {
497545 return lastBatch ;
498546 }
499547 }
@@ -539,23 +587,20 @@ public boolean apply(BulkCommitBatch batch) {
539587
540588 // Send the batch if it is under the rate limit, or schedule another attempt after the
541589 // appropriate timeout.
542- long delayMs = rateLimiter .getNextRequestDelayMs (batch .getOperationCount ());
590+ long delayMs = rateLimiter .getNextRequestDelayMs (batch .getPendingOperationCount ());
543591 Preconditions .checkState (delayMs != -1 , "Batch size should be under capacity" );
544592 if (delayMs == 0 ) {
545593 sendBatch (batch );
546594 } else {
547- firestore
548- .getClient ()
549- .getExecutor ()
550- .schedule (
551- new Runnable () {
552- @ Override
553- public void run () {
554- sendBatch (batch );
555- }
556- },
557- delayMs ,
558- TimeUnit .MILLISECONDS );
595+ firestoreExecutor .schedule (
596+ new Runnable () {
597+ @ Override
598+ public void run () {
599+ sendBatch (batch );
600+ }
601+ },
602+ delayMs ,
603+ TimeUnit .MILLISECONDS );
559604 break ;
560605 }
561606
@@ -568,27 +613,110 @@ public void run() {
568613 * next group of ready batches.
569614 */
570615 private void sendBatch (final BulkCommitBatch batch ) {
571- boolean success = rateLimiter .tryMakeRequest (batch .getOperationCount ());
616+ Preconditions .checkState (
617+ batch .state == BatchState .READY_TO_SEND ,
618+ "The batch should be marked as READY_TO_SEND before committing" );
619+ batch .state = BatchState .SENT ;
620+ boolean success = rateLimiter .tryMakeRequest (batch .getPendingOperationCount ());
572621 Preconditions .checkState (success , "Batch should be under rate limit to be sent." );
573- try {
574- final ApiFuture <List <BatchWriteResult >> commitFuture = batch .bulkCommit ();
575- commitFuture .addListener (
576- new Runnable () {
577- public void run () {
578- try {
579- batch .processResults (commitFuture .get (), null );
580- } catch (Exception e ) {
581- batch .processResults (new ArrayList <BatchWriteResult >(), e );
582- }
583- // Remove the batch from BatchQueue after it has been processed.
584- boolean removed = batchQueue .remove (batch );
585- Preconditions .checkState (removed , "The batch should be in the BatchQueue." );
586- sendReadyBatches ();
587- }
588- },
622+
623+ ApiFuture <Void > commitFuture = bulkCommit (batch );
624+ commitFuture .addListener (
625+ new Runnable () {
626+ public void run () {
627+ boolean removed = batchQueue .remove (batch );
628+ Preconditions .checkState (
629+ removed , "The batch should be in the BatchQueue." + batchQueue .size ());
630+ sendReadyBatches ();
631+ }
632+ },
633+ MoreExecutors .directExecutor ());
634+ }
635+
636+ private ApiFuture <Void > bulkCommit (BulkCommitBatch batch ) {
637+ return bulkCommit (batch , 0 );
638+ }
639+
640+ private ApiFuture <Void > bulkCommit (final BulkCommitBatch batch , final int attempt ) {
641+ final SettableApiFuture <Void > backoffFuture = SettableApiFuture .create ();
642+
643+ // Add a backoff delay. At first, this is 0.
644+ firestoreExecutor .schedule (
645+ new Runnable () {
646+ @ Override
647+ public void run () {
648+ backoffFuture .set (null );
649+ }
650+ },
651+ nextAttempt .getRandomizedRetryDelay ().toMillis (),
652+ TimeUnit .MILLISECONDS );
653+
654+ return ApiFutures .transformAsync (
655+ backoffFuture , new BackoffCallback (batch , attempt ), firestoreExecutor );
656+ }
657+
658+ private class BackoffCallback implements ApiAsyncFunction <Void , Void > {
659+ final BulkCommitBatch batch ;
660+ final int attempt ;
661+
662+ public BackoffCallback (BulkCommitBatch batch , int attempt ) {
663+ this .batch = batch ;
664+ this .attempt = attempt ;
665+ }
666+
667+ @ Override
668+ public ApiFuture <Void > apply (Void ignored ) {
669+
670+ return ApiFutures .transformAsync (
671+ ApiFutures .catchingAsync (
672+ batch .bulkCommit (),
673+ Exception .class ,
674+ new ApiAsyncFunction <Exception , List <BatchWriteResult >>() {
675+ public ApiFuture <List <BatchWriteResult >> apply (Exception exception ) {
676+ List <BatchWriteResult > results = new ArrayList <>();
677+ // If the BatchWrite RPC fails, map the exception to each individual result.
678+ for (DocumentReference documentReference : batch .getPendingDocuments ()) {
679+ results .add (new BatchWriteResult (documentReference , null , exception ));
680+ }
681+ return ApiFutures .immediateFuture (results );
682+ }
683+ },
684+ MoreExecutors .directExecutor ()),
685+ new ProcessBulkCommitCallback (batch , attempt ),
589686 MoreExecutors .directExecutor ());
590- } catch (Exception e ) {
591- batch .processResults (new ArrayList <BatchWriteResult >(), e );
687+ }
688+ }
689+
690+ private class ProcessBulkCommitCallback
691+ implements ApiAsyncFunction <List <BatchWriteResult >, Void > {
692+ final BulkCommitBatch batch ;
693+ final int attempt ;
694+
695+ public ProcessBulkCommitCallback (BulkCommitBatch batch , int attempt ) {
696+ this .batch = batch ;
697+ this .attempt = attempt ;
698+ }
699+
700+ @ Override
701+ public ApiFuture <Void > apply (List <BatchWriteResult > results ) {
702+ batch .processResults (results );
703+ Set <DocumentReference > remainingOps = batch .getPendingDocuments ();
704+ if (!remainingOps .isEmpty ()) {
705+ logger .log (
706+ Level .WARNING ,
707+ String .format (
708+ "Current batch failed at retry #%d. Num failures: %d" ,
709+ attempt , remainingOps .size ()));
710+
711+ if (attempt < MAX_RETRY_ATTEMPTS ) {
712+ nextAttempt = backoff .createNextAttempt (nextAttempt );
713+ BulkCommitBatch newBatch = new BulkCommitBatch (firestore , batch , remainingOps );
714+ return bulkCommit (newBatch , attempt + 1 );
715+ } else {
716+ batch .failRemainingOperations (results );
717+ }
718+ }
719+ return ApiFutures .immediateFuture (null );
592720 }
593721 }
594722
@@ -601,15 +729,15 @@ private boolean isBatchSendable(BulkCommitBatch batch) {
601729 return false ;
602730 }
603731
604- for (final DocumentReference document : batch .getDocuments ()) {
732+ for (final DocumentReference documentReference : batch .getPendingDocuments ()) {
605733 boolean isRefInFlight =
606734 FluentIterable .from (batchQueue )
607735 .anyMatch (
608736 new Predicate <BulkCommitBatch >() {
609737 @ Override
610738 public boolean apply (BulkCommitBatch batch ) {
611739 return batch .getState ().equals (BatchState .SENT )
612- && batch .getDocuments (). contains ( document );
740+ && batch .hasDocument ( documentReference );
613741 }
614742 });
615743
@@ -620,7 +748,7 @@ public boolean apply(BulkCommitBatch batch) {
620748 "Duplicate write to document %s detected. Writing to the same document multiple"
621749 + " times will slow down BulkWriter. Write to unique documents in order to "
622750 + "maximize throughput." ,
623- document ));
751+ documentReference . getPath () ));
624752 return false ;
625753 }
626754 }
0 commit comments