1616
1717package com .google .cloud .firestore ;
1818
19+ import com .google .api .core .ApiAsyncFunction ;
1920import com .google .api .core .ApiFuture ;
21+ import com .google .api .core .ApiFutureCallback ;
2022import com .google .api .core .ApiFutures ;
2123import com .google .api .core .CurrentMillisClock ;
2224import com .google .api .core .SettableApiFuture ;
3133import java .util .ArrayList ;
3234import java .util .List ;
3335import java .util .Map ;
34- import java .util .concurrent .Callable ;
35- import java .util .concurrent .ScheduledFuture ;
36+ import java .util .concurrent .ScheduledExecutorService ;
3637import java .util .concurrent .TimeUnit ;
3738import java .util .logging .Level ;
3839import java .util .logging .Logger ;
@@ -103,6 +104,8 @@ public class BulkWriter {
103104
104105 private final FirestoreImpl firestore ;
105106
107+ // private final ScheduledExecutorService firestoreExecutor;
108+
106109 private final ExponentialRetryAlgorithm backoff ;
107110 private TimedAttemptSettings nextAttempt ;
108111
@@ -112,6 +115,7 @@ public class BulkWriter {
112115 new ExponentialRetryAlgorithm (
113116 firestore .getOptions ().getRetrySettings (), CurrentMillisClock .getDefaultClock ());
114117 this .nextAttempt = backoff .createFirstAttempt ();
118+ // this.firestoreExecutor = firestore.getClient().getExecutor();
115119
116120 if (enableThrottling ) {
117121 rateLimiter =
@@ -508,7 +512,7 @@ private BulkCommitBatch getEligibleBatch(DocumentReference documentReference) {
508512 if (batchQueue .size () > 0 ) {
509513 BulkCommitBatch lastBatch = batchQueue .get (batchQueue .size () - 1 );
510514 if (lastBatch .getState () == UpdateBuilder .BatchState .OPEN
511- && !lastBatch .hasPath (documentReference . getPath () )) {
515+ && !lastBatch .hasDocument (documentReference )) {
512516 return lastBatch ;
513517 }
514518 }
@@ -559,18 +563,15 @@ public boolean apply(BulkCommitBatch batch) {
559563 if (delayMs == 0 ) {
560564 sendBatch (batch );
561565 } else {
562- firestore
563- .getClient ()
564- .getExecutor ()
565- .schedule (
566- new Runnable () {
567- @ Override
568- public void run () {
569- sendBatch (batch );
570- }
571- },
572- delayMs ,
573- TimeUnit .MILLISECONDS );
566+ firestore .getClient ().getExecutor ().schedule (
567+ new Runnable () {
568+ @ Override
569+ public void run () {
570+ sendBatch (batch );
571+ }
572+ },
573+ delayMs ,
574+ TimeUnit .MILLISECONDS );
574575 break ;
575576 }
576577
@@ -585,82 +586,97 @@ public void run() {
585586 private void sendBatch (final BulkCommitBatch batch ) {
586587 boolean success = rateLimiter .tryMakeRequest (batch .getPendingOperationCount ());
587588 Preconditions .checkState (success , "Batch should be under rate limit to be sent." );
588- MoreExecutors .directExecutor ()
589- .execute (
590- new Runnable () {
591- public void run () {
592- bulkCommit (batch );
593- boolean removed = batchQueue .remove (batch );
594- Preconditions .checkState (removed , "The batch should be in the BatchQueue." );
595- sendReadyBatches ();
596- }
597- });
589+
590+ // Schedule the actual RPC call on Firestore's executor so that it does not block the main
591+ // thread.
592+ ApiFuture <Void > commitFuture = bulkCommit (batch );
593+ commitFuture .addListener (
594+ new Runnable () {
595+ public void run () {
596+ boolean removed = batchQueue .remove (batch );
597+ Preconditions .checkState (removed , "The batch should be in the BatchQueue." );
598+ sendReadyBatches ();
599+ }
600+ },
601+ MoreExecutors .directExecutor ());
598602 }
599603
600- private void bulkCommit (BulkCommitBatch batch ) {
601- List <BatchWriteResult > results = new ArrayList <>();
602- for (int attempt = 0 ; attempt < MAX_RETRY_ATTEMPTS ; ++attempt ) {
603- final BulkCommitBatch finalBatch = batch ;
604- ScheduledFuture <List <BatchWriteResult >> attemptBulkCommit =
605- firestore
606- .getClient ()
607- .getExecutor ()
608- .schedule (
609- new Callable <List <BatchWriteResult >>() {
610- public List <BatchWriteResult > call () {
611- List <BatchWriteResult > results = new ArrayList <>();
612- try {
613- return finalBatch .bulkCommit ().get ();
614- } catch (Exception e ) {
615- // Map the failure to each individual write's result.
616- for (String path : finalBatch .getPendingDocs ()) {
617- if (e instanceof FirestoreException ) {
618- results .add (
619- new BatchWriteResult (
620- path ,
621- null ,
622- ((FirestoreException ) e ).getStatus (),
623- e .getMessage ()));
624- } else {
625- results .add (new BatchWriteResult (path , null , null , e .getMessage (), e ));
626- }
627- }
628- return results ;
629- }
630- }
631- },
632- nextAttempt .getRandomizedRetryDelay ().toMillis (),
633- TimeUnit .MILLISECONDS );
634-
635- try {
636- results = attemptBulkCommit .get ();
637- batch .processResults (results , null );
638- } catch (Exception e ) {
639- for (String path : batch .getPendingDocs ()) {
640- results .add (new BatchWriteResult (path , null , null , e .getMessage (), e ));
641- }
642- batch .processResults (new ArrayList <BatchWriteResult >(), e );
643- }
604+ private ApiFuture <Void > bulkCommit (BulkCommitBatch batch ) {
605+ return bulkCommit (batch , 0 );
606+ }
644607
645- if (batch .getPendingOperationCount () > 0 ) {
646- logger .log (
647- Level .WARNING ,
648- String .format (
649- "Current batch failed at retry #%d. Num failures: %d" ,
650- attempt , batch .getPendingOperationCount ()));
651- batch .sliceBatchForRetry (batch .getPendingDocs ());
652- batch .markReadyToSend ();
608+ ApiFuture <List <BatchWriteResult >> invokeBulkCommit (final BulkCommitBatch batch ) {
609+ return batch .bulkCommit ();
610+ }
653611
654- } else {
655- batch .markComplete ();
656- return ;
612+ private ApiFuture <Void > bulkCommit (final BulkCommitBatch batch , final int attempt ) {
613+ final BulkCommitBatch finalBatch = batch ;
614+ final SettableApiFuture <Void > backoffFuture = SettableApiFuture .create ();
615+
616+ class ProcessBulkCommitCallback implements ApiAsyncFunction <List <BatchWriteResult >, Void > {
617+ @ Override
618+ public ApiFuture <Void > apply (List <BatchWriteResult > results ) {
619+ finalBatch .processResults (results );
620+ if (finalBatch .getPendingOperationCount () > 0 ) {
621+ logger .log (
622+ Level .WARNING ,
623+ String .format (
624+ "Current batch failed at retry #%d. Num failures: %d" ,
625+ attempt , finalBatch .getPendingOperationCount ()));
626+ finalBatch .sliceBatchForRetry (finalBatch .getPendingDocuments ());
627+ finalBatch .markReadyToSend ();
628+
629+ if (attempt < MAX_RETRY_ATTEMPTS ) {
630+ nextAttempt = backoff .createNextAttempt (nextAttempt );
631+ return bulkCommit (finalBatch , attempt + 1 );
632+ } else {
633+ finalBatch .failRemainingOperations (results );
634+ finalBatch .markComplete ();
635+ }
636+ } else {
637+ finalBatch .markComplete ();
638+ }
639+ return ApiFutures .immediateFuture (null );
657640 }
641+ }
658642
659- nextAttempt = backoff .createNextAttempt (nextAttempt );
643+ class BackoffCallback implements ApiAsyncFunction <Void , Void > {
644+ @ Override
645+ public ApiFuture <Void > apply (Void ignored ) {
646+
647+ // If the BatchWrite RPC fails, map the exception to each individual result.
648+ return ApiFutures .transformAsync (
649+ ApiFutures .catchingAsync (
650+ invokeBulkCommit (finalBatch ),
651+ Exception .class ,
652+ new ApiAsyncFunction <Exception , List <BatchWriteResult >>() {
653+ public ApiFuture <List <BatchWriteResult >> apply (Exception exception ) {
654+ List <BatchWriteResult > results = new ArrayList <>();
655+ for (DocumentReference documentReference : finalBatch .getPendingDocuments ()) {
656+ results .add (new BatchWriteResult (documentReference , null , exception ));
657+ }
658+ return ApiFutures .immediateFuture (results );
659+ }
660+ },
661+ MoreExecutors .directExecutor ()),
662+ new ProcessBulkCommitCallback (),
663+ MoreExecutors .directExecutor ());
664+ }
660665 }
661666
662- batch .failRemainingOperations (results );
663- batch .markComplete ();
667+ // Add a backoff delay. At first, this is 0.
668+ firestore .getClient ().getExecutor ().schedule (
669+ new Runnable () {
670+ @ Override
671+ public void run () {
672+ backoffFuture .set (null );
673+ }
674+ },
675+ nextAttempt .getRandomizedRetryDelay ().toMillis (),
676+ TimeUnit .MILLISECONDS );
677+
678+ return ApiFutures .transformAsync (
679+ backoffFuture , new BackoffCallback (), MoreExecutors .directExecutor ());
664680 }
665681
666682 /**
@@ -672,14 +688,15 @@ private boolean isBatchSendable(BulkCommitBatch batch) {
672688 return false ;
673689 }
674690
675- for (final String path : batch .getPendingDocs ()) {
691+ for (final DocumentReference documentReference : batch .getPendingDocuments ()) {
676692 boolean isRefInFlight =
677693 FluentIterable .from (batchQueue )
678694 .anyMatch (
679695 new Predicate <BulkCommitBatch >() {
680696 @ Override
681697 public boolean apply (BulkCommitBatch batch ) {
682- return batch .getState ().equals (BatchState .SENT ) && batch .hasPath (path );
698+ return batch .getState ().equals (BatchState .SENT )
699+ && batch .hasDocument (documentReference );
683700 }
684701 });
685702
@@ -690,7 +707,7 @@ public boolean apply(BulkCommitBatch batch) {
690707 "Duplicate write to document %s detected. Writing to the same document multiple"
691708 + " times will slow down BulkWriter. Write to unique documents in order to "
692709 + "maximize throughput." ,
693- path ));
710+ documentReference . getPath () ));
694711 return false ;
695712 }
696713 }
0 commit comments