1717package com .google .cloud .firestore ;
1818
1919import com .google .api .core .ApiFuture ;
20- import com .google .api .core .ApiFutureCallback ;
21- import com .google .api .core .ApiFutures ;
2220import com .google .api .core .SettableApiFuture ;
23- import com .google .api .gax .rpc .ApiException ;
2421import com .google .api .gax .rpc .ApiStreamObserver ;
2522import com .google .api .gax .rpc .BidiStreamingCallable ;
2623import com .google .api .gax .rpc .ServerStreamingCallable ;
2926import com .google .cloud .firestore .spi .v1 .FirestoreRpc ;
3027import com .google .common .base .Preconditions ;
3128import com .google .common .collect .ImmutableMap ;
32- import com .google .common .util .concurrent .MoreExecutors ;
3329import com .google .firestore .v1 .BatchGetDocumentsRequest ;
3430import com .google .firestore .v1 .BatchGetDocumentsResponse ;
3531import com .google .firestore .v1 .DatabaseRootName ;
3632import com .google .protobuf .ByteString ;
3733import io .grpc .Context ;
38- import io .grpc .Status ;
39- import io .opencensus .common .Scope ;
4034import io .opencensus .trace .AttributeValue ;
41- import io .opencensus .trace .Span ;
4235import io .opencensus .trace .Tracer ;
4336import io .opencensus .trace .Tracing ;
4437import java .util .ArrayList ;
4740import java .util .Map ;
4841import java .util .Random ;
4942import java .util .concurrent .Executor ;
50- import java .util .logging .Logger ;
5143import javax .annotation .Nonnull ;
5244import javax .annotation .Nullable ;
5345
@@ -62,10 +54,7 @@ class FirestoreImpl implements Firestore {
6254 private static final String AUTO_ID_ALPHABET =
6355 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" ;
6456
65- private static final Logger LOGGER = Logger .getLogger ("Firestore" );
6657 private static final Tracer tracer = Tracing .getTracer ();
67- private static final io .opencensus .trace .Status TOO_MANY_RETRIES_STATUS =
68- io .opencensus .trace .Status .ABORTED .withDescription ("too many retries" );
6958
7059 private final FirestoreRpc firestoreClient ;
7160 private final FirestoreOptions firestoreOptions ;
@@ -298,146 +287,17 @@ public <T> ApiFuture<T> runTransaction(@Nonnull final Transaction.Function<T> up
298287 public <T > ApiFuture <T > runTransaction (
299288 @ Nonnull final Transaction .Function <T > updateFunction ,
300289 @ Nonnull TransactionOptions transactionOptions ) {
301- SettableApiFuture <T > resultFuture = SettableApiFuture .create ();
302- runTransaction (updateFunction , resultFuture , transactionOptions );
303- return resultFuture ;
304- }
305-
306- /** Transaction functions that returns its result in the provided SettableFuture. */
307- private <T > void runTransaction (
308- final Transaction .Function <T > transactionCallback ,
309- final SettableApiFuture <T > resultFuture ,
310- final TransactionOptions options ) {
311- // span is intentionally not ended here. It will be ended by runTransactionAttempt on success
312- // or error.
313- Span span = tracer .spanBuilder ("CloudFirestore.Transaction" ).startSpan ();
314- try (Scope s = tracer .withSpan (span )) {
315- runTransactionAttempt (transactionCallback , resultFuture , options , span );
316- }
317- }
318-
319- private <T > void runTransactionAttempt (
320- final Transaction .Function <T > transactionCallback ,
321- final SettableApiFuture <T > resultFuture ,
322- final TransactionOptions options ,
323- final Span span ) {
324- final Transaction transaction = new Transaction (this , options .getPreviousTransactionId ());
325290 final Executor userCallbackExecutor =
326291 Context .currentContextExecutor (
327- options .getExecutor () != null ? options .getExecutor () : firestoreClient .getExecutor ());
292+ transactionOptions .getExecutor () != null
293+ ? transactionOptions .getExecutor ()
294+ : firestoreClient .getExecutor ());
328295
329- final int attemptsRemaining = options .getNumberOfAttempts () - 1 ;
330- span .addAnnotation (
331- "Start runTransaction" ,
332- ImmutableMap .of ("attemptsRemaining" , AttributeValue .longAttributeValue (attemptsRemaining )));
296+ TransactionRunner <T > transactionRunner =
297+ new TransactionRunner <>(
298+ this , updateFunction , userCallbackExecutor , transactionOptions .getNumberOfAttempts ());
333299
334- ApiFutures .addCallback (
335- transaction .begin (),
336- new ApiFutureCallback <Void >() {
337- @ Override
338- public void onFailure (Throwable throwable ) {
339- // Don't retry failed BeginTransaction requests.
340- rejectTransaction (throwable );
341- }
342-
343- @ Override
344- public void onSuccess (Void ignored ) {
345- ApiFutures .addCallback (
346- invokeUserCallback (),
347- new ApiFutureCallback <T >() {
348- @ Override
349- public void onFailure (Throwable throwable ) {
350- // This was a error in the user callback, forward the throwable.
351- rejectTransaction (throwable );
352- }
353-
354- @ Override
355- public void onSuccess (final T userResult ) {
356- // Commit the transaction
357- ApiFutures .addCallback (
358- transaction .commit (),
359- new ApiFutureCallback <List <WriteResult >>() {
360- @ Override
361- public void onFailure (Throwable throwable ) {
362- // Retry failed commits.
363- maybeRetry (throwable );
364- }
365-
366- @ Override
367- public void onSuccess (List <WriteResult > writeResults ) {
368- span .setStatus (io .opencensus .trace .Status .OK );
369- span .end ();
370- resultFuture .set (userResult );
371- }
372- },
373- MoreExecutors .directExecutor ());
374- }
375- },
376- MoreExecutors .directExecutor ());
377- }
378-
379- private SettableApiFuture <T > invokeUserCallback () {
380- // Execute the user callback on the provided executor.
381- final SettableApiFuture <T > callbackResult = SettableApiFuture .create ();
382- userCallbackExecutor .execute (
383- new Runnable () {
384- @ Override
385- public void run () {
386- try {
387- callbackResult .set (transactionCallback .updateCallback (transaction ));
388- } catch (Throwable t ) {
389- callbackResult .setException (t );
390- }
391- }
392- });
393- return callbackResult ;
394- }
395-
396- private void maybeRetry (Throwable throwable ) {
397- if (attemptsRemaining > 0 ) {
398- span .addAnnotation ("retrying" );
399- runTransactionAttempt (
400- transactionCallback ,
401- resultFuture ,
402- new TransactionOptions (
403- attemptsRemaining , options .getExecutor (), transaction .getTransactionId ()),
404- span );
405- } else {
406- span .setStatus (TOO_MANY_RETRIES_STATUS );
407- rejectTransaction (
408- FirestoreException .serverRejected (
409- Status .ABORTED ,
410- throwable ,
411- "Transaction was cancelled because of too many retries." ));
412- }
413- }
414-
415- private void rejectTransaction (final Throwable throwable ) {
416- if (throwable instanceof ApiException ) {
417- span .setStatus (TraceUtil .statusFromApiException ((ApiException ) throwable ));
418- }
419- span .end ();
420- if (transaction .isPending ()) {
421- ApiFutures .addCallback (
422- transaction .rollback (),
423- new ApiFutureCallback <Void >() {
424- @ Override
425- public void onFailure (Throwable throwable ) {
426- resultFuture .setException (throwable );
427- }
428-
429- @ Override
430- public void onSuccess (Void ignored ) {
431- resultFuture .setException (throwable );
432- }
433- },
434- MoreExecutors .directExecutor ());
435- } else {
436- resultFuture .setException (throwable );
437- }
438- }
439- },
440- MoreExecutors .directExecutor ());
300+ return transactionRunner .run ();
441301 }
442302
443303 /** Returns whether the user has opted into receiving dates as com.google.cloud.Timestamp. */
0 commit comments