diff --git a/.github/workflows/auto-release.yaml b/.github/workflows/auto-release.yaml index 18e23230d..18d92e5a2 100644 --- a/.github/workflows/auto-release.yaml +++ b/.github/workflows/auto-release.yaml @@ -21,7 +21,7 @@ jobs: runs-on: ubuntu-latest if: contains(github.head_ref, 'release-please') steps: - - uses: actions/github-script@v5 + - uses: actions/github-script@v7 with: github-token: ${{secrets.YOSHI_APPROVER_TOKEN}} debug: true diff --git a/CHANGELOG.md b/CHANGELOG.md index 2dd1bf092..060ade8ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,18 @@ # Changelog +## [3.15.5](https://github.com/googleapis/java-firestore/compare/v3.15.4...v3.15.5) (2023-11-14) + + +### Bug Fixes + +* Prevent watch stream from emitting events after close. ([#1471](https://github.com/googleapis/java-firestore/issues/1471)) ([ee3f8c0](https://github.com/googleapis/java-firestore/commit/ee3f8c0c6e2c8aff64f429e8eea649cc6614c74f)) + + +### Dependencies + +* Update actions/github-script action to v7 ([#1473](https://github.com/googleapis/java-firestore/issues/1473)) ([1097a0b](https://github.com/googleapis/java-firestore/commit/1097a0b97c1dff40c07b657417db4ac431822619)) +* Update actions/github-script action to v7 ([#1475](https://github.com/googleapis/java-firestore/issues/1475)) ([d848d87](https://github.com/googleapis/java-firestore/commit/d848d875738260cde620f5cedfc15bf4dae31aac)) + ## [3.15.4](https://github.com/googleapis/java-firestore/compare/v3.15.3...v3.15.4) (2023-11-07) diff --git a/README.md b/README.md index fe2e66039..0df6dc6f0 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ If you are using Maven with [BOM][libraries-bom], add this to your pom.xml file: com.google.cloud libraries-bom - 26.26.0 + 26.27.0 pom import @@ -42,7 +42,7 @@ If you are using Maven without the BOM, add this to your dependencies: com.google.cloud google-cloud-firestore - 3.15.2 + 3.15.4 ``` @@ -50,20 +50,20 @@ If you are using Maven without the BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.26.0') +implementation platform('com.google.cloud:libraries-bom:26.27.0') implementation 'com.google.cloud:google-cloud-firestore' ``` If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-firestore:3.15.3' +implementation 'com.google.cloud:google-cloud-firestore:3.15.4' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-firestore" % "3.15.3" +libraryDependencies += "com.google.cloud" % "google-cloud-firestore" % "3.15.4" ``` @@ -222,7 +222,7 @@ Java is a registered trademark of Oracle and/or its affiliates. [kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-firestore/java11.html [stability-image]: https://img.shields.io/badge/stability-stable-green [maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-firestore.svg -[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-firestore/3.15.3 +[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-firestore/3.15.4 [authentication]: https://github.com/googleapis/google-cloud-java#authentication [auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes [predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles diff --git a/google-cloud-firestore-admin/pom.xml b/google-cloud-firestore-admin/pom.xml index bbefd2b1f..4464f6cb2 100644 --- a/google-cloud-firestore-admin/pom.xml +++ b/google-cloud-firestore-admin/pom.xml @@ -4,7 +4,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 google-cloud-firestore-admin - 3.15.4 + 3.15.5 jar Google Cloud Firestore Admin Client https://github.com/googleapis/java-firestore @@ -14,7 +14,7 @@ com.google.cloud google-cloud-firestore-parent - 3.15.4 + 3.15.5 diff --git a/google-cloud-firestore-bom/pom.xml b/google-cloud-firestore-bom/pom.xml index dd811882b..6179d8e88 100644 --- a/google-cloud-firestore-bom/pom.xml +++ b/google-cloud-firestore-bom/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.google.cloud google-cloud-firestore-bom - 3.15.4 + 3.15.5 pom com.google.cloud @@ -52,37 +52,37 @@ com.google.cloud google-cloud-firestore - 3.15.4 + 3.15.5 com.google.cloud google-cloud-firestore-admin - 3.15.4 + 3.15.5 com.google.api.grpc grpc-google-cloud-firestore-admin-v1 - 3.15.4 + 3.15.5 com.google.api.grpc grpc-google-cloud-firestore-v1 - 3.15.4 + 3.15.5 com.google.api.grpc proto-google-cloud-firestore-admin-v1 - 3.15.4 + 3.15.5 com.google.api.grpc proto-google-cloud-firestore-v1 - 3.15.4 + 3.15.5 com.google.cloud proto-google-cloud-firestore-bundle-v1 - 3.15.4 + 3.15.5 diff --git a/google-cloud-firestore/pom.xml b/google-cloud-firestore/pom.xml index 7179f7684..3e460f8f5 100644 --- a/google-cloud-firestore/pom.xml +++ b/google-cloud-firestore/pom.xml @@ -2,7 +2,7 @@ 4.0.0 google-cloud-firestore - 3.15.4 + 3.15.5 jar Google Cloud Firestore https://github.com/googleapis/java-firestore @@ -12,7 +12,7 @@ com.google.cloud google-cloud-firestore-parent - 3.15.4 + 3.15.5 google-cloud-firestore diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java index df25e1191..560a2d63a 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java @@ -916,7 +916,7 @@ private void sendBatchLocked(final BulkCommitBatch batch, final boolean flush) { bulkWriterExecutor); } else { long delayMs = rateLimiter.getNextRequestDelayMs(batch.getMutationsSize()); - logger.log(Level.FINE, String.format("Backing off for %d seconds", delayMs / 1000)); + logger.log(Level.FINE, () -> String.format("Backing off for %d seconds", delayMs / 1000)); bulkWriterExecutor.schedule( () -> { synchronized (lock) { diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/SilenceableBidiStream.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/SilenceableBidiStream.java new file mode 100644 index 000000000..fa358ecaa --- /dev/null +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/SilenceableBidiStream.java @@ -0,0 +1,125 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.firestore; + +import com.google.api.gax.rpc.BidiStreamObserver; +import com.google.api.gax.rpc.ClientStream; +import com.google.api.gax.rpc.StreamController; +import java.util.function.Function; +import java.util.logging.Logger; + +/** + * Conditionally pass through callbacks to wrapped `BidiStreamObserver`. + * + *

Due to the asynchronous nature of a stream, there can be a delay between closing a stream and + * the upstream no longer sending responses. Receiving callbacks after closing upstream can have + * undesirable consequences. + * + *

The underlying `ClientStream` can be called through the `SilenceableBidiStream`. Methods such + * as `send()` and `closeSend()` are exposed. + * + *

The `SilenceableBidiStream` wraps a `BidiStreamObserver`. This is helpful for situations where + * the observer should be detached from a stream. Instead of calling the `closeSend()` method, the + * `closeSendAndSilence()` method will silence the stream by preventing further callbacks including + * `onError` and `onComplete`. + * + *

If silenced, the observer could be safely attached to a new stream. This is useful for error + * handling where upstream must be stopped, but a new stream can continue to service the observer. + * In these cases, the old stream cannot be allowed to send more responses, and especially cannot be + * allowed to send `onError` or `onComplete` since that would signal the downstream that the stream + * is finished. + */ +final class SilenceableBidiStream + implements BidiStreamObserver { + + private final ClientStream stream; + private final BidiStreamObserver delegate; + private boolean silence = false; + private static final Logger LOGGER = Logger.getLogger(Watch.class.getName()); + + SilenceableBidiStream( + BidiStreamObserver responseObserverT, + Function, ClientStream> streamSupplier) { + this.delegate = responseObserverT; + stream = streamSupplier.apply(this); + } + + public boolean isSilenced() { + return silence; + } + + public void send(RequestT request) { + LOGGER.info(stream.toString()); + stream.send(request); + } + + public void closeSend() { + LOGGER.info(stream::toString); + stream.closeSend(); + } + + public void closeSendAndSilence() { + LOGGER.info(stream::toString); + silence = true; + stream.closeSend(); + } + + @Override + public void onReady(ClientStream stream) { + if (silence) { + LOGGER.info(() -> String.format("Silenced: %s", stream)); + } else { + delegate.onReady(stream); + } + } + + @Override + public void onStart(StreamController controller) { + if (silence) { + LOGGER.info(() -> String.format("Silenced: %s", stream)); + } else { + delegate.onStart(controller); + } + } + + @Override + public void onResponse(ResponseT response) { + if (silence) { + LOGGER.info(() -> String.format("Silenced: %s", stream)); + } else { + delegate.onResponse(response); + } + } + + @Override + public void onError(Throwable t) { + if (silence) { + LOGGER.info(() -> String.format("Silenced: %s", stream)); + } else { + delegate.onError(t); + } + } + + @Override + public void onComplete() { + if (silence) { + LOGGER.info(() -> String.format("Silenced: %s", stream)); + } else { + delegate.onComplete(); + } + } +} diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java index b5029c8d6..14d84cd0e 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java @@ -49,6 +49,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; import javax.annotation.Nullable; /** @@ -59,7 +60,7 @@ * It synchronizes on its own instance so it is advisable not to use this class for external * synchronization. */ -class Watch implements BidiStreamObserver { +final class Watch implements BidiStreamObserver { /** * Target ID used by watch. Watch uses a fixed target id since we only support one target per * stream. The actual target ID we use is arbitrary. @@ -73,7 +74,7 @@ class Watch implements BidiStreamObserver { private final ExponentialRetryAlgorithm backoff; private final Target target; private TimedAttemptSettings nextAttempt; - private ClientStream stream; + private SilenceableBidiStream stream; /** The sorted tree of DocumentSnapshots as sent in the last snapshot. */ private DocumentSet documentSet; @@ -115,6 +116,8 @@ static class ChangeSet { List updates = new ArrayList<>(); } + private static final Logger LOGGER = Logger.getLogger(Watch.class.getName()); + /** * @param firestore The Firestore Database client. * @param query The query that is used to order the document snapshots returned by this watch. @@ -246,7 +249,16 @@ && affectsTarget(change.getTargetIdsList(), WATCH_TARGET_ID)) { changeMap.put(ResourcePath.create(listenResponse.getDocumentRemove().getDocument()), null); break; case FILTER: - if (listenResponse.getFilter().getCount() != currentSize()) { + // Keep copy of counts for producing log message. + // The method currentSize() is computationally expensive, and should only be run once. + int filterCount = listenResponse.getFilter().getCount(); + int currentSize = currentSize(); + if (filterCount != currentSize) { + LOGGER.info( + () -> + String.format( + "filter: count mismatch filter count %d != current size %d", + filterCount, currentSize)); // We need to remove all the current results. resetDocs(); // The filter didn't match, so re-issue the query. @@ -318,7 +330,7 @@ private void resetDocs() { resumeToken = null; for (DocumentSnapshot snapshot : documentSet) { - // Mark each document as deleted. If documents are not deleted, they will be send again by + // Mark each document as deleted. If documents are not deleted, they will be sent again by // the server. changeMap.put(snapshot.getReference().getResourcePath(), null); } @@ -329,7 +341,7 @@ private void resetDocs() { /** Closes the stream and calls onError() if the stream is still active. */ private void closeStream(final Throwable throwable) { if (stream != null) { - stream.closeSend(); + stream.closeSendAndSilence(); stream = null; } @@ -371,7 +383,7 @@ private void maybeReopenStream(Throwable throwable) { /** Helper to restart the outgoing stream to the backend. */ private void resetStream() { if (stream != null) { - stream.closeSend(); + stream.closeSendAndSilence(); stream = null; } @@ -398,7 +410,12 @@ private void initStream() { nextAttempt = backoff.createNextAttempt(nextAttempt); Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_LISTEN); - stream = firestore.streamRequest(Watch.this, firestore.getClient().listenCallable()); + stream = + new SilenceableBidiStream<>( + Watch.this, + observer -> + firestore.streamRequest( + observer, firestore.getClient().listenCallable())); ListenRequest.Builder request = ListenRequest.newBuilder(); request.setDatabase(firestore.getDatabaseName()); @@ -459,6 +476,7 @@ private void pushSnapshot(final Timestamp readTime, ByteString nextResumeToken) if (!hasPushed || !changes.isEmpty()) { final QuerySnapshot querySnapshot = QuerySnapshot.withChanges(query, readTime, documentSet, changes); + LOGGER.info(querySnapshot.toString()); userCallbackExecutor.execute(() -> listener.onEvent(querySnapshot, null)); hasPushed = true; } diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/FirestoreSpy.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/FirestoreSpy.java new file mode 100644 index 000000000..bf02bc7d5 --- /dev/null +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/FirestoreSpy.java @@ -0,0 +1,40 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.firestore; + +import com.google.api.gax.rpc.BidiStreamObserver; +import com.google.api.gax.rpc.BidiStreamingCallable; +import com.google.api.gax.rpc.ClientStream; + +public final class FirestoreSpy { + + public final FirestoreImpl spy; + public BidiStreamObserver streamRequestBidiStreamObserver; + + public FirestoreSpy(FirestoreOptions firestoreOptions) { + spy = + new FirestoreImpl(firestoreOptions) { + @Override + public ClientStream streamRequest( + BidiStreamObserver responseObserverT, + BidiStreamingCallable callable) { + streamRequestBidiStreamObserver = responseObserverT; + return super.streamRequest(responseObserverT, callable); + } + }; + } +} diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/SilenceableBidiStreamTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/SilenceableBidiStreamTest.java new file mode 100644 index 000000000..948d8035f --- /dev/null +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/SilenceableBidiStreamTest.java @@ -0,0 +1,178 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.firestore; + +import static com.google.common.truth.Truth.assertWithMessage; + +import com.google.api.gax.rpc.BidiStreamObserver; +import com.google.api.gax.rpc.ClientStream; +import com.google.api.gax.rpc.StreamController; +import java.util.function.Consumer; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class SilenceableBidiStreamTest { + + @Mock BidiStreamObserver mockObserver; + + @Mock ClientStream mockClientStream; + + SilenceableBidiStream sut; + + @Before + public void before() { + Consumer captureCall = Mockito.mock(Consumer.class); + sut = + new SilenceableBidiStream<>( + mockObserver, + o -> { + captureCall.accept(o); + return mockClientStream; + }); + // The lambda should pass itself as parameter + Mockito.verify(captureCall).accept(sut); + } + + @Test + public void byDefault_theStreamWillPassThroughData_onComplete() { + sut.onComplete(); + Mockito.verify(mockObserver, Mockito.times(1)).onComplete(); + Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver); + } + + @Test + public void byDefault_theStreamWillPassThroughData_onError() { + Exception e = new Exception("Something bad"); + sut.onError(e); + Mockito.verify(mockObserver, Mockito.times(1)).onError(e); + Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver); + } + + @Test + public void byDefault_theStreamWillPassThroughData_onResponse() { + sut.onResponse("ABC"); + Mockito.verify(mockObserver, Mockito.times(1)).onResponse("ABC"); + Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver); + } + + @Test + public void byDefault_theStreamWillPassThroughData_onStart() { + StreamController controller = Mockito.mock(StreamController.class); + sut.onStart(controller); + Mockito.verify(mockObserver, Mockito.times(1)).onStart(controller); + Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver, controller); + } + + @Test + public void byDefault_theStreamWillPassThroughData_onReady() { + ClientStream client = Mockito.mock(ClientStream.class); + sut.onReady(client); + Mockito.verify(mockObserver, Mockito.times(1)).onReady(client); + Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver, client); + } + + @Test + public void byDefault_theStreamWillPassThroughData_send() { + sut.send(7); + Mockito.verify(mockClientStream, Mockito.times(1)).send(7); + Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver); + } + + @Test + public void closeMethod() { + assertWithMessage("Expect isSilenced() to be false by default") + .that(sut.isSilenced()) + .isFalse(); + sut.closeSend(); + assertWithMessage("Expect isSilenced() to be unaffected after close()") + .that(sut.isSilenced()) + .isFalse(); + Mockito.verify(mockClientStream, Mockito.times(1)).closeSend(); + Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver); + } + + @Test + public void closeAndSilenceMethod() { + assertWithMessage("Expect isSilenced() to be false by default") + .that(sut.isSilenced()) + .isFalse(); + sut.closeSendAndSilence(); + assertWithMessage("Expect isSilenced() to be true after closeAndSilence()") + .that(sut.isSilenced()) + .isTrue(); + Mockito.verify(mockClientStream, Mockito.times(1)).closeSend(); + Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver); + } + + @Test + public void afterClose_theStreamWillPassThroughData_onComplete() { + sut.closeSend(); + sut.onComplete(); + Mockito.verify(mockClientStream, Mockito.times(1)).closeSend(); + Mockito.verify(mockObserver, Mockito.times(1)).onComplete(); + Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver); + } + + @Test + public void afterClose_theStreamWillPassThroughData_onError() { + Exception e = new Exception("Something bad"); + sut.closeSend(); + sut.onError(e); + Mockito.verify(mockClientStream, Mockito.times(1)).closeSend(); + Mockito.verify(mockObserver, Mockito.times(1)).onError(e); + Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver); + } + + @Test + public void afterClose_theStreamWillPassThroughData_onResponse() { + sut.closeSend(); + sut.onResponse("ABC"); + Mockito.verify(mockClientStream, Mockito.times(1)).closeSend(); + Mockito.verify(mockObserver, Mockito.times(1)).onResponse("ABC"); + Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver); + } + + @Test + public void afterCloseAndSilence_theStreamWillPassThroughData_onComplete() { + sut.closeSendAndSilence(); + sut.onComplete(); + Mockito.verify(mockClientStream, Mockito.times(1)).closeSend(); + Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver); + } + + @Test + public void afterCloseAndSilence_theStreamWillPassThroughData_onError() { + Exception e = new Exception("Something bad"); + sut.closeSendAndSilence(); + sut.onError(e); + Mockito.verify(mockClientStream, Mockito.times(1)).closeSend(); + Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver); + } + + @Test + public void afterCloseAndSilence_theStreamWillPassThroughData_onResponse() { + sut.closeSendAndSilence(); + sut.onResponse("ABC"); + Mockito.verify(mockClientStream, Mockito.times(1)).closeSend(); + Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver); + } +} diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/WatchTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/WatchTest.java index 1627796c9..19295b901 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/WatchTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/WatchTest.java @@ -72,7 +72,9 @@ import javax.annotation.Nullable; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; @@ -84,6 +86,9 @@ @RunWith(MockitoJUnitRunner.class) public class WatchTest { + + @Rule public Timeout timeout = new Timeout(1, TimeUnit.SECONDS); + /** The Target ID used by the Java Firestore SDK. */ private static final int TARGET_ID = 0x1; diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITBaseTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITBaseTest.java index 97c4c75f7..2a238bba2 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITBaseTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITBaseTest.java @@ -18,6 +18,7 @@ import com.google.cloud.firestore.Firestore; import com.google.cloud.firestore.FirestoreOptions; +import com.google.cloud.firestore.FirestoreSpy; import com.google.common.base.Preconditions; import java.util.logging.Level; import java.util.logging.Logger; @@ -30,6 +31,8 @@ public abstract class ITBaseTest { private static final Logger logger = Logger.getLogger(ITBaseTest.class.getName()); protected Firestore firestore; + protected FirestoreSpy firestoreSpy; + private FirestoreOptions firestoreOptions; @Before public void before() { @@ -43,7 +46,8 @@ public void before() { logger.log(Level.INFO, "Integration test using default database."); } - firestore = optionsBuilder.build().getService(); + firestoreOptions = optionsBuilder.build(); + firestore = firestoreOptions.getService(); } @After @@ -53,5 +57,15 @@ public void after() throws Exception { "Error instantiating Firestore. Check that the service account credentials were properly set."); firestore.close(); firestore = null; + firestoreOptions = null; + firestoreSpy = null; + } + + public FirestoreSpy useFirestoreSpy() { + if (firestoreSpy == null) { + firestoreSpy = new FirestoreSpy(firestoreOptions); + firestore = firestoreSpy.spy; + } + return firestoreSpy; } } diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITQueryWatchTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITQueryWatchTest.java index 6f80f916c..bded062ff 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITQueryWatchTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITQueryWatchTest.java @@ -39,8 +39,11 @@ import com.google.common.base.Joiner; import com.google.common.base.Joiner.MapJoiner; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.collect.Range; import com.google.common.truth.Truth; +import com.google.firestore.v1.ExistenceFilter; +import com.google.firestore.v1.ListenResponse; import java.util.ArrayList; import java.util.Collections; import java.util.EnumMap; @@ -70,6 +73,7 @@ public final class ITQueryWatchTest extends ITBaseTest { @Before public void before() { super.before(); + useFirestoreSpy(); String autoId = LocalFirestoreHelper.autoId(); String collPath = String.format("java-%s-%s", testName.getMethodName(), autoId); randomColl = firestore.collection(collPath); @@ -100,9 +104,9 @@ public void emptyResults() throws InterruptedException { ListenerAssertions listenerAssertions = listener.assertions(); listenerAssertions.noError(); listenerAssertions.eventCountIsAnyOf(Range.closed(1, 1)); - listenerAssertions.addedIdsIsAnyOf(emptyList()); - listenerAssertions.modifiedIdsIsAnyOf(emptyList()); - listenerAssertions.removedIdsIsAnyOf(emptyList()); + listenerAssertions.addedIdsIsEmpty(); + listenerAssertions.modifiedIdsIsEmpty(); + listenerAssertions.removedIdsIsEmpty(); } /** @@ -132,9 +136,9 @@ public void nonEmptyResults() throws Exception { ListenerAssertions listenerAssertions = listener.assertions(); listenerAssertions.noError(); listenerAssertions.eventCountIsAnyOf(Range.closed(1, 1)); - listenerAssertions.addedIdsIsAnyOf(singletonList("doc")); - listenerAssertions.modifiedIdsIsAnyOf(emptyList()); - listenerAssertions.removedIdsIsAnyOf(emptyList()); + listenerAssertions.addedIdsIsAnyOf("doc"); + listenerAssertions.modifiedIdsIsEmpty(); + listenerAssertions.removedIdsIsEmpty(); } /** @@ -167,8 +171,8 @@ public void emptyResults_newDocument_ADDED() listenerAssertions.noError(); listenerAssertions.eventCountIsAnyOf(Range.closed(2, 2)); listenerAssertions.addedIdsIsAnyOf(emptyList(), singletonList("doc")); - listenerAssertions.modifiedIdsIsAnyOf(emptyList()); - listenerAssertions.removedIdsIsAnyOf(emptyList()); + listenerAssertions.modifiedIdsIsEmpty(); + listenerAssertions.removedIdsIsEmpty(); } /** @@ -188,7 +192,6 @@ public void emptyResults_modifiedDocument_ADDED() throws Exception { final Query query = randomColl.whereEqualTo("foo", "bar"); QuerySnapshotEventListener listener = QuerySnapshotEventListener.builder().setInitialEventCount(1).setAddedEventCount(1).build(); - List receivedEvents = listener.receivedEvents; ListenerRegistration registration = query.addSnapshotListener(listener); try { @@ -203,10 +206,10 @@ public void emptyResults_modifiedDocument_ADDED() throws Exception { listenerAssertions.noError(); listenerAssertions.eventCountIsAnyOf(Range.closed(2, 2)); listenerAssertions.addedIdsIsAnyOf(emptyList(), singletonList("doc")); - listenerAssertions.modifiedIdsIsAnyOf(emptyList()); - listenerAssertions.removedIdsIsAnyOf(emptyList()); + listenerAssertions.modifiedIdsIsEmpty(); + listenerAssertions.removedIdsIsEmpty(); - ListenerEvent event = receivedEvents.get(receivedEvents.size() - 1); + ListenerEvent event = listener.lastListenerEvent(); //noinspection ConstantConditions guarded by "assertNoError" above QueryDocumentSnapshot doc = event.value.getDocumentChanges().get(0).getDocument(); assertThat(doc.get("foo")).isEqualTo("bar"); @@ -233,7 +236,6 @@ public void nonEmptyResults_modifiedDocument_MODIFIED() throws Exception { .setInitialEventCount(1) .setModifiedEventCount(1) .build(); - List receivedEvents = listener.receivedEvents; ListenerRegistration registration = query.addSnapshotListener(listener); try { @@ -249,9 +251,9 @@ public void nonEmptyResults_modifiedDocument_MODIFIED() throws Exception { listenerAssertions.eventCountIsAnyOf(Range.closed(2, 2)); listenerAssertions.addedIdsIsAnyOf(emptyList(), singletonList("doc")); listenerAssertions.modifiedIdsIsAnyOf(emptyList(), singletonList("doc")); - listenerAssertions.removedIdsIsAnyOf(emptyList()); + listenerAssertions.removedIdsIsEmpty(); - ListenerEvent event = receivedEvents.get(receivedEvents.size() - 1); + ListenerEvent event = listener.lastListenerEvent(); //noinspection ConstantConditions guarded by "assertNoError" above QueryDocumentSnapshot doc = event.value.getDocumentChanges().get(0).getDocument(); assertThat(doc.get("foo")).isEqualTo("bar"); @@ -278,7 +280,6 @@ public void nonEmptyResults_deletedDocument_REMOVED() throws Exception { .setInitialEventCount(1) .setRemovedEventCount(1) .build(); - List receivedEvents = listener.receivedEvents; ListenerRegistration registration = query.addSnapshotListener(listener); try { @@ -293,10 +294,10 @@ public void nonEmptyResults_deletedDocument_REMOVED() throws Exception { listenerAssertions.noError(); listenerAssertions.eventCountIsAnyOf(Range.closed(2, 2)); listenerAssertions.addedIdsIsAnyOf(emptyList(), singletonList("doc")); - listenerAssertions.modifiedIdsIsAnyOf(emptyList()); + listenerAssertions.modifiedIdsIsEmpty(); listenerAssertions.removedIdsIsAnyOf(emptyList(), singletonList("doc")); - ListenerEvent event = receivedEvents.get(receivedEvents.size() - 1); + ListenerEvent event = listener.lastListenerEvent(); //noinspection ConstantConditions guarded by "assertNoError" above QueryDocumentSnapshot doc = event.value.getDocumentChanges().get(0).getDocument(); assertThat(doc.get("foo")).isEqualTo("bar"); @@ -322,7 +323,6 @@ public void nonEmptyResults_modifiedDocument_REMOVED() throws Exception { .setInitialEventCount(1) .setRemovedEventCount(1) .build(); - List receivedEvents = listener.receivedEvents; ListenerRegistration registration = query.addSnapshotListener(listener); try { @@ -337,15 +337,88 @@ public void nonEmptyResults_modifiedDocument_REMOVED() throws Exception { listenerAssertions.noError(); listenerAssertions.eventCountIsAnyOf(Range.closed(2, 2)); listenerAssertions.addedIdsIsAnyOf(emptyList(), singletonList("doc")); - listenerAssertions.modifiedIdsIsAnyOf(emptyList()); + listenerAssertions.modifiedIdsIsEmpty(); listenerAssertions.removedIdsIsAnyOf(emptyList(), singletonList("doc")); - ListenerEvent event = receivedEvents.get(receivedEvents.size() - 1); + ListenerEvent event = listener.lastListenerEvent(); //noinspection ConstantConditions guarded by "assertNoError" above QueryDocumentSnapshot doc = event.value.getDocumentChanges().get(0).getDocument(); assertThat(doc.get("foo")).isEqualTo("bar"); } + /** + * + * + *

    + *
  1. Attach a listener to a query with non-empty results. + *
  2. Trigger existence filter mismatch, thereby invoking retry behavior. + *
  3. Add and remove documents. + *
  4. Verify expected snapshots are raised. + *
+ */ + @Test + public void restartAfterFailedFilter() throws Exception { + // create a document in our collection that will match the query + DocumentReference testDoc1 = setDocument("doc1", map("foo", "bar")); + DocumentReference testDoc2 = setDocument("doc2", map("foo", "bar")); + + final Query query = randomColl.whereEqualTo("foo", "bar"); + QuerySnapshotEventListener listener = + QuerySnapshotEventListener.builder() + .setInitialEventCount(1) + .setAddedEventCount(3) + .setRemovedEventCount(1) + .build(); + ListenerRegistration registration = query.addSnapshotListener(listener); + + try { + listener.eventsCountDownLatch.awaitInitialEvents(); + listener + .assertionsForLastEvent() + .noError() + .addedIdsIsAnyOf("doc1", "doc2") + .modifiedIdsIsEmpty() + .removedIdsIsEmpty(); + listener.lastDocumentIdsIsAnyOf("doc1", "doc2"); + + // Trigger existence filter mismatch, thereby invoking retry behavior. + // Prompting Firestore to send filter mismatch is difficult, so we hack + // in the response. All we are concerned about is invoking retry. + firestoreSpy.streamRequestBidiStreamObserver.onResponse(filter(0)); + + // A race condition will sometimes throw an error if the SuppressibleBidiStream does not + // silence the old stream. This can be caused by `Preconditions.checkState(stream == null)` + // in Watch class. + + setDocument("doc3", map("foo", "bar")); + listener.eventsCountDownLatch.await(DocumentChange.Type.ADDED); + listener + .assertionsForLastEvent() + .noError() + .addedIdsIsAnyOf("doc3") + .modifiedIdsIsEmpty() + .removedIdsIsEmpty(); + listener.lastDocumentIdsIsAnyOf("doc1", "doc2", "doc3"); + + testDoc1.set(map("bar", "foo")).get(5, TimeUnit.SECONDS); + listener.eventsCountDownLatch.await(DocumentChange.Type.REMOVED); + listener + .assertionsForLastEvent() + .noError() + .addedIdsIsEmpty() + .modifiedIdsIsEmpty() + .removedIdsIsAnyOf("doc1"); + listener.lastDocumentIdsIsAnyOf("doc2", "doc3"); + + } finally { + registration.remove(); + } + + ListenerAssertions listenerAssertions = listener.assertions(); + listenerAssertions.noError(); + listenerAssertions.eventCountIsAnyOf(Range.singleton(3)); + } + /** Verifies that QuerySnapshot for limitToLast() queries are ordered correctly. */ @Test public void limitToLast() throws Exception { @@ -609,6 +682,20 @@ ListenerAssertions assertions() { return new ListenerAssertions(receivedEvents); } + ListenerAssertions assertionsForLastEvent() { + return new ListenerAssertions(singletonList(lastListenerEvent())); + } + + ListenerEvent lastListenerEvent() { + return receivedEvents.get(receivedEvents.size() - 1); + } + + void lastDocumentIdsIsAnyOf(String... s) { + List ids = + Lists.transform(lastListenerEvent().value.getDocuments(), DocumentSnapshot::getId); + Truth.assertThat(ids).containsExactlyElementsIn(s); + } + static Builder builder() { return new Builder(); } @@ -669,20 +756,22 @@ static final class ListenerAssertions { removedIds = getIds(querySnapshots, DocumentChange.Type.REMOVED); } - private void noError() { + private ListenerAssertions noError() { final Optional anyError = receivedEvents.stream().filter(input -> input.error != null).findFirst(); - assertWithMessage("snapshotListener received an error") - .that(anyError.isPresent()) - .isFalse(); + if (anyError.isPresent()) { + throw new Error("snapshotListener received an error", anyError.get().error); + } + return this; } - private void hasError() { + private ListenerAssertions hasError() { final Optional anyError = receivedEvents.stream().filter(input -> input.error != null).findFirst(); assertWithMessage("snapshotListener did not receive an expected error") .that(anyError.isPresent()) .isTrue(); + return this; } private static List getQuerySnapshots(List events) { @@ -706,32 +795,54 @@ private static List getIds( return documentIds; } - void addedIdsIsAnyOf(List s) { - Truth.assertWithMessage(debugMessage()).that(addedIds).isEqualTo(s); + ListenerAssertions addedIdsIsEmpty() { + Truth.assertWithMessage(debugMessage()).that(addedIds).isEmpty(); + return this; + } + + ListenerAssertions addedIdsIsAnyOf(String... s) { + Truth.assertWithMessage(debugMessage()).that(addedIds).containsExactlyElementsIn(s); + return this; } - void addedIdsIsAnyOf(List s1, List s2) { + ListenerAssertions addedIdsIsAnyOf(List s1, List s2) { Truth.assertWithMessage(debugMessage()).that(addedIds).isAnyOf(s1, s2); + return this; } - void modifiedIdsIsAnyOf(List s) { - Truth.assertWithMessage(debugMessage()).that(modifiedIds).isEqualTo(s); + ListenerAssertions modifiedIdsIsEmpty() { + Truth.assertWithMessage(debugMessage()).that(modifiedIds).isEmpty(); + return this; } - void modifiedIdsIsAnyOf(List s1, List s2) { + ListenerAssertions modifiedIdsIsAnyOf(String... s) { + Truth.assertWithMessage(debugMessage()).that(modifiedIds).containsExactlyElementsIn(s); + return this; + } + + ListenerAssertions modifiedIdsIsAnyOf(List s1, List s2) { Truth.assertWithMessage(debugMessage()).that(modifiedIds).isAnyOf(s1, s2); + return this; + } + + ListenerAssertions removedIdsIsEmpty() { + Truth.assertWithMessage(debugMessage()).that(removedIds).isEmpty(); + return this; } - void removedIdsIsAnyOf(List s) { - Truth.assertWithMessage(debugMessage()).that(removedIds).isEqualTo(s); + ListenerAssertions removedIdsIsAnyOf(String... s) { + Truth.assertWithMessage(debugMessage()).that(removedIds).containsExactlyElementsIn(s); + return this; } - void removedIdsIsAnyOf(List s1, List s2) { + ListenerAssertions removedIdsIsAnyOf(List s1, List s2) { Truth.assertWithMessage(debugMessage()).that(removedIds).isAnyOf(s1, s2); + return this; } - void eventCountIsAnyOf(Range range) { + ListenerAssertions eventCountIsAnyOf(Range range) { Truth.assertWithMessage(debugMessage()).that((int) receivedEvents.size()).isIn(range); + return this; } private String debugMessage() { @@ -797,4 +908,10 @@ private DocumentReference setDocument(String documentId, Map fields) documentReference.set(fields).get(); return documentReference; } + + private ListenResponse filter(int documentCount) { + ListenResponse.Builder response = ListenResponse.newBuilder(); + response.setFilter(ExistenceFilter.newBuilder().setCount(documentCount).build()); + return response.build(); + } } diff --git a/grpc-google-cloud-firestore-admin-v1/pom.xml b/grpc-google-cloud-firestore-admin-v1/pom.xml index 2bfb5e0af..4ad35c5eb 100644 --- a/grpc-google-cloud-firestore-admin-v1/pom.xml +++ b/grpc-google-cloud-firestore-admin-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-firestore-admin-v1 - 3.15.4 + 3.15.5 grpc-google-cloud-firestore-admin-v1 GRPC library for grpc-google-cloud-firestore-admin-v1 com.google.cloud google-cloud-firestore-parent - 3.15.4 + 3.15.5 diff --git a/grpc-google-cloud-firestore-v1/pom.xml b/grpc-google-cloud-firestore-v1/pom.xml index 62610fbf6..2f92bd06d 100644 --- a/grpc-google-cloud-firestore-v1/pom.xml +++ b/grpc-google-cloud-firestore-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-firestore-v1 - 3.15.4 + 3.15.5 grpc-google-cloud-firestore-v1 GRPC library for grpc-google-cloud-firestore-v1 com.google.cloud google-cloud-firestore-parent - 3.15.4 + 3.15.5 diff --git a/pom.xml b/pom.xml index ba234957d..4aaca0181 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.google.cloud google-cloud-firestore-parent pom - 3.15.4 + 3.15.5 Google Cloud Firestore Parent https://github.com/googleapis/java-firestore @@ -150,32 +150,32 @@ com.google.api.grpc proto-google-cloud-firestore-admin-v1 - 3.15.4 + 3.15.5 com.google.cloud google-cloud-firestore - 3.15.4 + 3.15.5 com.google.cloud proto-google-cloud-firestore-bundle-v1 - 3.15.4 + 3.15.5 com.google.api.grpc proto-google-cloud-firestore-v1 - 3.15.4 + 3.15.5 com.google.api.grpc grpc-google-cloud-firestore-admin-v1 - 3.15.4 + 3.15.5 com.google.api.grpc grpc-google-cloud-firestore-v1 - 3.15.4 + 3.15.5 diff --git a/proto-google-cloud-firestore-admin-v1/pom.xml b/proto-google-cloud-firestore-admin-v1/pom.xml index 4e58481ec..b0cc893bb 100644 --- a/proto-google-cloud-firestore-admin-v1/pom.xml +++ b/proto-google-cloud-firestore-admin-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-firestore-admin-v1 - 3.15.4 + 3.15.5 proto-google-cloud-firestore-admin-v1 PROTO library for proto-google-cloud-firestore-admin-v1 com.google.cloud google-cloud-firestore-parent - 3.15.4 + 3.15.5 diff --git a/proto-google-cloud-firestore-bundle-v1/pom.xml b/proto-google-cloud-firestore-bundle-v1/pom.xml index cbbd28056..6f115104c 100644 --- a/proto-google-cloud-firestore-bundle-v1/pom.xml +++ b/proto-google-cloud-firestore-bundle-v1/pom.xml @@ -5,14 +5,14 @@ 4.0.0 proto-google-cloud-firestore-bundle-v1 - 3.15.4 + 3.15.5 proto-google-cloud-firestore-bundle-v1 PROTO library for proto-google-cloud-firestore-bundle-v1 com.google.cloud google-cloud-firestore-parent - 3.15.4 + 3.15.5 diff --git a/proto-google-cloud-firestore-v1/pom.xml b/proto-google-cloud-firestore-v1/pom.xml index 6d8bf6a95..4033b1b63 100644 --- a/proto-google-cloud-firestore-v1/pom.xml +++ b/proto-google-cloud-firestore-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-firestore-v1 - 3.15.4 + 3.15.5 proto-google-cloud-firestore-v1 PROTO library for proto-google-cloud-firestore-v1 com.google.cloud google-cloud-firestore-parent - 3.15.4 + 3.15.5 diff --git a/samples/install-without-bom/pom.xml b/samples/install-without-bom/pom.xml index cfa99dd2d..6501ce326 100644 --- a/samples/install-without-bom/pom.xml +++ b/samples/install-without-bom/pom.xml @@ -30,7 +30,7 @@ com.google.cloud google-cloud-firestore - 3.15.2 + 3.15.4 diff --git a/samples/native-image-sample/pom.xml b/samples/native-image-sample/pom.xml index cfd3e2e4b..3cee46c63 100644 --- a/samples/native-image-sample/pom.xml +++ b/samples/native-image-sample/pom.xml @@ -32,7 +32,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> com.google.cloud libraries-bom - 26.26.0 + 26.27.0 pom import diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml index 99bfbac62..5104bb4e5 100644 --- a/samples/snapshot/pom.xml +++ b/samples/snapshot/pom.xml @@ -29,7 +29,7 @@ com.google.cloud google-cloud-firestore - 3.15.2 + 3.15.4 diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml index 7e295263c..7bbf9b9f5 100644 --- a/samples/snippets/pom.xml +++ b/samples/snippets/pom.xml @@ -34,7 +34,7 @@ com.google.cloud libraries-bom - 26.26.0 + 26.27.0 pom import diff --git a/versions.txt b/versions.txt index 76c76c65c..91b94d737 100644 --- a/versions.txt +++ b/versions.txt @@ -1,11 +1,11 @@ # Format: # module:released-version:current-version -google-cloud-firestore:3.15.4:3.15.4 -google-cloud-firestore-admin:3.15.4:3.15.4 -google-cloud-firestore-bom:3.15.4:3.15.4 -grpc-google-cloud-firestore-admin-v1:3.15.4:3.15.4 -grpc-google-cloud-firestore-v1:3.15.4:3.15.4 -proto-google-cloud-firestore-admin-v1:3.15.4:3.15.4 -proto-google-cloud-firestore-v1:3.15.4:3.15.4 -proto-google-cloud-firestore-bundle-v1:3.15.4:3.15.4 +google-cloud-firestore:3.15.5:3.15.5 +google-cloud-firestore-admin:3.15.5:3.15.5 +google-cloud-firestore-bom:3.15.5:3.15.5 +grpc-google-cloud-firestore-admin-v1:3.15.5:3.15.5 +grpc-google-cloud-firestore-v1:3.15.5:3.15.5 +proto-google-cloud-firestore-admin-v1:3.15.5:3.15.5 +proto-google-cloud-firestore-v1:3.15.5:3.15.5 +proto-google-cloud-firestore-bundle-v1:3.15.5:3.15.5