google-cloud-firestore
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java
index df25e1191..560a2d63a 100644
--- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java
@@ -916,7 +916,7 @@ private void sendBatchLocked(final BulkCommitBatch batch, final boolean flush) {
bulkWriterExecutor);
} else {
long delayMs = rateLimiter.getNextRequestDelayMs(batch.getMutationsSize());
- logger.log(Level.FINE, String.format("Backing off for %d seconds", delayMs / 1000));
+ logger.log(Level.FINE, () -> String.format("Backing off for %d seconds", delayMs / 1000));
bulkWriterExecutor.schedule(
() -> {
synchronized (lock) {
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/SilenceableBidiStream.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/SilenceableBidiStream.java
new file mode 100644
index 000000000..fa358ecaa
--- /dev/null
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/SilenceableBidiStream.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.firestore;
+
+import com.google.api.gax.rpc.BidiStreamObserver;
+import com.google.api.gax.rpc.ClientStream;
+import com.google.api.gax.rpc.StreamController;
+import java.util.function.Function;
+import java.util.logging.Logger;
+
+/**
+ * Conditionally pass through callbacks to wrapped `BidiStreamObserver`.
+ *
+ * Due to the asynchronous nature of a stream, there can be a delay between closing a stream and
+ * the upstream no longer sending responses. Receiving callbacks after closing upstream can have
+ * undesirable consequences.
+ *
+ *
The underlying `ClientStream` can be called through the `SilenceableBidiStream`. Methods such
+ * as `send()` and `closeSend()` are exposed.
+ *
+ *
The `SilenceableBidiStream` wraps a `BidiStreamObserver`. This is helpful for situations where
+ * the observer should be detached from a stream. Instead of calling the `closeSend()` method, the
+ * `closeSendAndSilence()` method will silence the stream by preventing further callbacks including
+ * `onError` and `onComplete`.
+ *
+ *
If silenced, the observer could be safely attached to a new stream. This is useful for error
+ * handling where upstream must be stopped, but a new stream can continue to service the observer.
+ * In these cases, the old stream cannot be allowed to send more responses, and especially cannot be
+ * allowed to send `onError` or `onComplete` since that would signal the downstream that the stream
+ * is finished.
+ */
+final class SilenceableBidiStream
+ implements BidiStreamObserver {
+
+ private final ClientStream stream;
+ private final BidiStreamObserver delegate;
+ private boolean silence = false;
+ private static final Logger LOGGER = Logger.getLogger(Watch.class.getName());
+
+ SilenceableBidiStream(
+ BidiStreamObserver responseObserverT,
+ Function, ClientStream> streamSupplier) {
+ this.delegate = responseObserverT;
+ stream = streamSupplier.apply(this);
+ }
+
+ public boolean isSilenced() {
+ return silence;
+ }
+
+ public void send(RequestT request) {
+ LOGGER.info(stream.toString());
+ stream.send(request);
+ }
+
+ public void closeSend() {
+ LOGGER.info(stream::toString);
+ stream.closeSend();
+ }
+
+ public void closeSendAndSilence() {
+ LOGGER.info(stream::toString);
+ silence = true;
+ stream.closeSend();
+ }
+
+ @Override
+ public void onReady(ClientStream stream) {
+ if (silence) {
+ LOGGER.info(() -> String.format("Silenced: %s", stream));
+ } else {
+ delegate.onReady(stream);
+ }
+ }
+
+ @Override
+ public void onStart(StreamController controller) {
+ if (silence) {
+ LOGGER.info(() -> String.format("Silenced: %s", stream));
+ } else {
+ delegate.onStart(controller);
+ }
+ }
+
+ @Override
+ public void onResponse(ResponseT response) {
+ if (silence) {
+ LOGGER.info(() -> String.format("Silenced: %s", stream));
+ } else {
+ delegate.onResponse(response);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ if (silence) {
+ LOGGER.info(() -> String.format("Silenced: %s", stream));
+ } else {
+ delegate.onError(t);
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ if (silence) {
+ LOGGER.info(() -> String.format("Silenced: %s", stream));
+ } else {
+ delegate.onComplete();
+ }
+ }
+}
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java
index b5029c8d6..14d84cd0e 100644
--- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java
@@ -49,6 +49,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
@@ -59,7 +60,7 @@
* It synchronizes on its own instance so it is advisable not to use this class for external
* synchronization.
*/
-class Watch implements BidiStreamObserver {
+final class Watch implements BidiStreamObserver {
/**
* Target ID used by watch. Watch uses a fixed target id since we only support one target per
* stream. The actual target ID we use is arbitrary.
@@ -73,7 +74,7 @@ class Watch implements BidiStreamObserver {
private final ExponentialRetryAlgorithm backoff;
private final Target target;
private TimedAttemptSettings nextAttempt;
- private ClientStream stream;
+ private SilenceableBidiStream stream;
/** The sorted tree of DocumentSnapshots as sent in the last snapshot. */
private DocumentSet documentSet;
@@ -115,6 +116,8 @@ static class ChangeSet {
List updates = new ArrayList<>();
}
+ private static final Logger LOGGER = Logger.getLogger(Watch.class.getName());
+
/**
* @param firestore The Firestore Database client.
* @param query The query that is used to order the document snapshots returned by this watch.
@@ -246,7 +249,16 @@ && affectsTarget(change.getTargetIdsList(), WATCH_TARGET_ID)) {
changeMap.put(ResourcePath.create(listenResponse.getDocumentRemove().getDocument()), null);
break;
case FILTER:
- if (listenResponse.getFilter().getCount() != currentSize()) {
+ // Keep copy of counts for producing log message.
+ // The method currentSize() is computationally expensive, and should only be run once.
+ int filterCount = listenResponse.getFilter().getCount();
+ int currentSize = currentSize();
+ if (filterCount != currentSize) {
+ LOGGER.info(
+ () ->
+ String.format(
+ "filter: count mismatch filter count %d != current size %d",
+ filterCount, currentSize));
// We need to remove all the current results.
resetDocs();
// The filter didn't match, so re-issue the query.
@@ -318,7 +330,7 @@ private void resetDocs() {
resumeToken = null;
for (DocumentSnapshot snapshot : documentSet) {
- // Mark each document as deleted. If documents are not deleted, they will be send again by
+ // Mark each document as deleted. If documents are not deleted, they will be sent again by
// the server.
changeMap.put(snapshot.getReference().getResourcePath(), null);
}
@@ -329,7 +341,7 @@ private void resetDocs() {
/** Closes the stream and calls onError() if the stream is still active. */
private void closeStream(final Throwable throwable) {
if (stream != null) {
- stream.closeSend();
+ stream.closeSendAndSilence();
stream = null;
}
@@ -371,7 +383,7 @@ private void maybeReopenStream(Throwable throwable) {
/** Helper to restart the outgoing stream to the backend. */
private void resetStream() {
if (stream != null) {
- stream.closeSend();
+ stream.closeSendAndSilence();
stream = null;
}
@@ -398,7 +410,12 @@ private void initStream() {
nextAttempt = backoff.createNextAttempt(nextAttempt);
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_LISTEN);
- stream = firestore.streamRequest(Watch.this, firestore.getClient().listenCallable());
+ stream =
+ new SilenceableBidiStream<>(
+ Watch.this,
+ observer ->
+ firestore.streamRequest(
+ observer, firestore.getClient().listenCallable()));
ListenRequest.Builder request = ListenRequest.newBuilder();
request.setDatabase(firestore.getDatabaseName());
@@ -459,6 +476,7 @@ private void pushSnapshot(final Timestamp readTime, ByteString nextResumeToken)
if (!hasPushed || !changes.isEmpty()) {
final QuerySnapshot querySnapshot =
QuerySnapshot.withChanges(query, readTime, documentSet, changes);
+ LOGGER.info(querySnapshot.toString());
userCallbackExecutor.execute(() -> listener.onEvent(querySnapshot, null));
hasPushed = true;
}
diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/FirestoreSpy.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/FirestoreSpy.java
new file mode 100644
index 000000000..bf02bc7d5
--- /dev/null
+++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/FirestoreSpy.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.firestore;
+
+import com.google.api.gax.rpc.BidiStreamObserver;
+import com.google.api.gax.rpc.BidiStreamingCallable;
+import com.google.api.gax.rpc.ClientStream;
+
+public final class FirestoreSpy {
+
+ public final FirestoreImpl spy;
+ public BidiStreamObserver streamRequestBidiStreamObserver;
+
+ public FirestoreSpy(FirestoreOptions firestoreOptions) {
+ spy =
+ new FirestoreImpl(firestoreOptions) {
+ @Override
+ public ClientStream streamRequest(
+ BidiStreamObserver responseObserverT,
+ BidiStreamingCallable callable) {
+ streamRequestBidiStreamObserver = responseObserverT;
+ return super.streamRequest(responseObserverT, callable);
+ }
+ };
+ }
+}
diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/SilenceableBidiStreamTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/SilenceableBidiStreamTest.java
new file mode 100644
index 000000000..948d8035f
--- /dev/null
+++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/SilenceableBidiStreamTest.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.firestore;
+
+import static com.google.common.truth.Truth.assertWithMessage;
+
+import com.google.api.gax.rpc.BidiStreamObserver;
+import com.google.api.gax.rpc.ClientStream;
+import com.google.api.gax.rpc.StreamController;
+import java.util.function.Consumer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SilenceableBidiStreamTest {
+
+ @Mock BidiStreamObserver mockObserver;
+
+ @Mock ClientStream mockClientStream;
+
+ SilenceableBidiStream sut;
+
+ @Before
+ public void before() {
+ Consumer captureCall = Mockito.mock(Consumer.class);
+ sut =
+ new SilenceableBidiStream<>(
+ mockObserver,
+ o -> {
+ captureCall.accept(o);
+ return mockClientStream;
+ });
+ // The lambda should pass itself as parameter
+ Mockito.verify(captureCall).accept(sut);
+ }
+
+ @Test
+ public void byDefault_theStreamWillPassThroughData_onComplete() {
+ sut.onComplete();
+ Mockito.verify(mockObserver, Mockito.times(1)).onComplete();
+ Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver);
+ }
+
+ @Test
+ public void byDefault_theStreamWillPassThroughData_onError() {
+ Exception e = new Exception("Something bad");
+ sut.onError(e);
+ Mockito.verify(mockObserver, Mockito.times(1)).onError(e);
+ Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver);
+ }
+
+ @Test
+ public void byDefault_theStreamWillPassThroughData_onResponse() {
+ sut.onResponse("ABC");
+ Mockito.verify(mockObserver, Mockito.times(1)).onResponse("ABC");
+ Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver);
+ }
+
+ @Test
+ public void byDefault_theStreamWillPassThroughData_onStart() {
+ StreamController controller = Mockito.mock(StreamController.class);
+ sut.onStart(controller);
+ Mockito.verify(mockObserver, Mockito.times(1)).onStart(controller);
+ Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver, controller);
+ }
+
+ @Test
+ public void byDefault_theStreamWillPassThroughData_onReady() {
+ ClientStream client = Mockito.mock(ClientStream.class);
+ sut.onReady(client);
+ Mockito.verify(mockObserver, Mockito.times(1)).onReady(client);
+ Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver, client);
+ }
+
+ @Test
+ public void byDefault_theStreamWillPassThroughData_send() {
+ sut.send(7);
+ Mockito.verify(mockClientStream, Mockito.times(1)).send(7);
+ Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver);
+ }
+
+ @Test
+ public void closeMethod() {
+ assertWithMessage("Expect isSilenced() to be false by default")
+ .that(sut.isSilenced())
+ .isFalse();
+ sut.closeSend();
+ assertWithMessage("Expect isSilenced() to be unaffected after close()")
+ .that(sut.isSilenced())
+ .isFalse();
+ Mockito.verify(mockClientStream, Mockito.times(1)).closeSend();
+ Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver);
+ }
+
+ @Test
+ public void closeAndSilenceMethod() {
+ assertWithMessage("Expect isSilenced() to be false by default")
+ .that(sut.isSilenced())
+ .isFalse();
+ sut.closeSendAndSilence();
+ assertWithMessage("Expect isSilenced() to be true after closeAndSilence()")
+ .that(sut.isSilenced())
+ .isTrue();
+ Mockito.verify(mockClientStream, Mockito.times(1)).closeSend();
+ Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver);
+ }
+
+ @Test
+ public void afterClose_theStreamWillPassThroughData_onComplete() {
+ sut.closeSend();
+ sut.onComplete();
+ Mockito.verify(mockClientStream, Mockito.times(1)).closeSend();
+ Mockito.verify(mockObserver, Mockito.times(1)).onComplete();
+ Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver);
+ }
+
+ @Test
+ public void afterClose_theStreamWillPassThroughData_onError() {
+ Exception e = new Exception("Something bad");
+ sut.closeSend();
+ sut.onError(e);
+ Mockito.verify(mockClientStream, Mockito.times(1)).closeSend();
+ Mockito.verify(mockObserver, Mockito.times(1)).onError(e);
+ Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver);
+ }
+
+ @Test
+ public void afterClose_theStreamWillPassThroughData_onResponse() {
+ sut.closeSend();
+ sut.onResponse("ABC");
+ Mockito.verify(mockClientStream, Mockito.times(1)).closeSend();
+ Mockito.verify(mockObserver, Mockito.times(1)).onResponse("ABC");
+ Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver);
+ }
+
+ @Test
+ public void afterCloseAndSilence_theStreamWillPassThroughData_onComplete() {
+ sut.closeSendAndSilence();
+ sut.onComplete();
+ Mockito.verify(mockClientStream, Mockito.times(1)).closeSend();
+ Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver);
+ }
+
+ @Test
+ public void afterCloseAndSilence_theStreamWillPassThroughData_onError() {
+ Exception e = new Exception("Something bad");
+ sut.closeSendAndSilence();
+ sut.onError(e);
+ Mockito.verify(mockClientStream, Mockito.times(1)).closeSend();
+ Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver);
+ }
+
+ @Test
+ public void afterCloseAndSilence_theStreamWillPassThroughData_onResponse() {
+ sut.closeSendAndSilence();
+ sut.onResponse("ABC");
+ Mockito.verify(mockClientStream, Mockito.times(1)).closeSend();
+ Mockito.verifyNoMoreInteractions(mockClientStream, mockObserver);
+ }
+}
diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/WatchTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/WatchTest.java
index 1627796c9..19295b901 100644
--- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/WatchTest.java
+++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/WatchTest.java
@@ -72,7 +72,9 @@
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
@@ -84,6 +86,9 @@
@RunWith(MockitoJUnitRunner.class)
public class WatchTest {
+
+ @Rule public Timeout timeout = new Timeout(1, TimeUnit.SECONDS);
+
/** The Target ID used by the Java Firestore SDK. */
private static final int TARGET_ID = 0x1;
diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITBaseTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITBaseTest.java
index 97c4c75f7..2a238bba2 100644
--- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITBaseTest.java
+++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITBaseTest.java
@@ -18,6 +18,7 @@
import com.google.cloud.firestore.Firestore;
import com.google.cloud.firestore.FirestoreOptions;
+import com.google.cloud.firestore.FirestoreSpy;
import com.google.common.base.Preconditions;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -30,6 +31,8 @@
public abstract class ITBaseTest {
private static final Logger logger = Logger.getLogger(ITBaseTest.class.getName());
protected Firestore firestore;
+ protected FirestoreSpy firestoreSpy;
+ private FirestoreOptions firestoreOptions;
@Before
public void before() {
@@ -43,7 +46,8 @@ public void before() {
logger.log(Level.INFO, "Integration test using default database.");
}
- firestore = optionsBuilder.build().getService();
+ firestoreOptions = optionsBuilder.build();
+ firestore = firestoreOptions.getService();
}
@After
@@ -53,5 +57,15 @@ public void after() throws Exception {
"Error instantiating Firestore. Check that the service account credentials were properly set.");
firestore.close();
firestore = null;
+ firestoreOptions = null;
+ firestoreSpy = null;
+ }
+
+ public FirestoreSpy useFirestoreSpy() {
+ if (firestoreSpy == null) {
+ firestoreSpy = new FirestoreSpy(firestoreOptions);
+ firestore = firestoreSpy.spy;
+ }
+ return firestoreSpy;
}
}
diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITQueryWatchTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITQueryWatchTest.java
index 6f80f916c..bded062ff 100644
--- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITQueryWatchTest.java
+++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITQueryWatchTest.java
@@ -39,8 +39,11 @@
import com.google.common.base.Joiner;
import com.google.common.base.Joiner.MapJoiner;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.truth.Truth;
+import com.google.firestore.v1.ExistenceFilter;
+import com.google.firestore.v1.ListenResponse;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumMap;
@@ -70,6 +73,7 @@ public final class ITQueryWatchTest extends ITBaseTest {
@Before
public void before() {
super.before();
+ useFirestoreSpy();
String autoId = LocalFirestoreHelper.autoId();
String collPath = String.format("java-%s-%s", testName.getMethodName(), autoId);
randomColl = firestore.collection(collPath);
@@ -100,9 +104,9 @@ public void emptyResults() throws InterruptedException {
ListenerAssertions listenerAssertions = listener.assertions();
listenerAssertions.noError();
listenerAssertions.eventCountIsAnyOf(Range.closed(1, 1));
- listenerAssertions.addedIdsIsAnyOf(emptyList());
- listenerAssertions.modifiedIdsIsAnyOf(emptyList());
- listenerAssertions.removedIdsIsAnyOf(emptyList());
+ listenerAssertions.addedIdsIsEmpty();
+ listenerAssertions.modifiedIdsIsEmpty();
+ listenerAssertions.removedIdsIsEmpty();
}
/**
@@ -132,9 +136,9 @@ public void nonEmptyResults() throws Exception {
ListenerAssertions listenerAssertions = listener.assertions();
listenerAssertions.noError();
listenerAssertions.eventCountIsAnyOf(Range.closed(1, 1));
- listenerAssertions.addedIdsIsAnyOf(singletonList("doc"));
- listenerAssertions.modifiedIdsIsAnyOf(emptyList());
- listenerAssertions.removedIdsIsAnyOf(emptyList());
+ listenerAssertions.addedIdsIsAnyOf("doc");
+ listenerAssertions.modifiedIdsIsEmpty();
+ listenerAssertions.removedIdsIsEmpty();
}
/**
@@ -167,8 +171,8 @@ public void emptyResults_newDocument_ADDED()
listenerAssertions.noError();
listenerAssertions.eventCountIsAnyOf(Range.closed(2, 2));
listenerAssertions.addedIdsIsAnyOf(emptyList(), singletonList("doc"));
- listenerAssertions.modifiedIdsIsAnyOf(emptyList());
- listenerAssertions.removedIdsIsAnyOf(emptyList());
+ listenerAssertions.modifiedIdsIsEmpty();
+ listenerAssertions.removedIdsIsEmpty();
}
/**
@@ -188,7 +192,6 @@ public void emptyResults_modifiedDocument_ADDED() throws Exception {
final Query query = randomColl.whereEqualTo("foo", "bar");
QuerySnapshotEventListener listener =
QuerySnapshotEventListener.builder().setInitialEventCount(1).setAddedEventCount(1).build();
- List receivedEvents = listener.receivedEvents;
ListenerRegistration registration = query.addSnapshotListener(listener);
try {
@@ -203,10 +206,10 @@ public void emptyResults_modifiedDocument_ADDED() throws Exception {
listenerAssertions.noError();
listenerAssertions.eventCountIsAnyOf(Range.closed(2, 2));
listenerAssertions.addedIdsIsAnyOf(emptyList(), singletonList("doc"));
- listenerAssertions.modifiedIdsIsAnyOf(emptyList());
- listenerAssertions.removedIdsIsAnyOf(emptyList());
+ listenerAssertions.modifiedIdsIsEmpty();
+ listenerAssertions.removedIdsIsEmpty();
- ListenerEvent event = receivedEvents.get(receivedEvents.size() - 1);
+ ListenerEvent event = listener.lastListenerEvent();
//noinspection ConstantConditions guarded by "assertNoError" above
QueryDocumentSnapshot doc = event.value.getDocumentChanges().get(0).getDocument();
assertThat(doc.get("foo")).isEqualTo("bar");
@@ -233,7 +236,6 @@ public void nonEmptyResults_modifiedDocument_MODIFIED() throws Exception {
.setInitialEventCount(1)
.setModifiedEventCount(1)
.build();
- List receivedEvents = listener.receivedEvents;
ListenerRegistration registration = query.addSnapshotListener(listener);
try {
@@ -249,9 +251,9 @@ public void nonEmptyResults_modifiedDocument_MODIFIED() throws Exception {
listenerAssertions.eventCountIsAnyOf(Range.closed(2, 2));
listenerAssertions.addedIdsIsAnyOf(emptyList(), singletonList("doc"));
listenerAssertions.modifiedIdsIsAnyOf(emptyList(), singletonList("doc"));
- listenerAssertions.removedIdsIsAnyOf(emptyList());
+ listenerAssertions.removedIdsIsEmpty();
- ListenerEvent event = receivedEvents.get(receivedEvents.size() - 1);
+ ListenerEvent event = listener.lastListenerEvent();
//noinspection ConstantConditions guarded by "assertNoError" above
QueryDocumentSnapshot doc = event.value.getDocumentChanges().get(0).getDocument();
assertThat(doc.get("foo")).isEqualTo("bar");
@@ -278,7 +280,6 @@ public void nonEmptyResults_deletedDocument_REMOVED() throws Exception {
.setInitialEventCount(1)
.setRemovedEventCount(1)
.build();
- List receivedEvents = listener.receivedEvents;
ListenerRegistration registration = query.addSnapshotListener(listener);
try {
@@ -293,10 +294,10 @@ public void nonEmptyResults_deletedDocument_REMOVED() throws Exception {
listenerAssertions.noError();
listenerAssertions.eventCountIsAnyOf(Range.closed(2, 2));
listenerAssertions.addedIdsIsAnyOf(emptyList(), singletonList("doc"));
- listenerAssertions.modifiedIdsIsAnyOf(emptyList());
+ listenerAssertions.modifiedIdsIsEmpty();
listenerAssertions.removedIdsIsAnyOf(emptyList(), singletonList("doc"));
- ListenerEvent event = receivedEvents.get(receivedEvents.size() - 1);
+ ListenerEvent event = listener.lastListenerEvent();
//noinspection ConstantConditions guarded by "assertNoError" above
QueryDocumentSnapshot doc = event.value.getDocumentChanges().get(0).getDocument();
assertThat(doc.get("foo")).isEqualTo("bar");
@@ -322,7 +323,6 @@ public void nonEmptyResults_modifiedDocument_REMOVED() throws Exception {
.setInitialEventCount(1)
.setRemovedEventCount(1)
.build();
- List receivedEvents = listener.receivedEvents;
ListenerRegistration registration = query.addSnapshotListener(listener);
try {
@@ -337,15 +337,88 @@ public void nonEmptyResults_modifiedDocument_REMOVED() throws Exception {
listenerAssertions.noError();
listenerAssertions.eventCountIsAnyOf(Range.closed(2, 2));
listenerAssertions.addedIdsIsAnyOf(emptyList(), singletonList("doc"));
- listenerAssertions.modifiedIdsIsAnyOf(emptyList());
+ listenerAssertions.modifiedIdsIsEmpty();
listenerAssertions.removedIdsIsAnyOf(emptyList(), singletonList("doc"));
- ListenerEvent event = receivedEvents.get(receivedEvents.size() - 1);
+ ListenerEvent event = listener.lastListenerEvent();
//noinspection ConstantConditions guarded by "assertNoError" above
QueryDocumentSnapshot doc = event.value.getDocumentChanges().get(0).getDocument();
assertThat(doc.get("foo")).isEqualTo("bar");
}
+ /**
+ *
+ *
+ *
+ * - Attach a listener to a query with non-empty results.
+ *
- Trigger existence filter mismatch, thereby invoking retry behavior.
+ *
- Add and remove documents.
+ *
- Verify expected snapshots are raised.
+ *
+ */
+ @Test
+ public void restartAfterFailedFilter() throws Exception {
+ // create a document in our collection that will match the query
+ DocumentReference testDoc1 = setDocument("doc1", map("foo", "bar"));
+ DocumentReference testDoc2 = setDocument("doc2", map("foo", "bar"));
+
+ final Query query = randomColl.whereEqualTo("foo", "bar");
+ QuerySnapshotEventListener listener =
+ QuerySnapshotEventListener.builder()
+ .setInitialEventCount(1)
+ .setAddedEventCount(3)
+ .setRemovedEventCount(1)
+ .build();
+ ListenerRegistration registration = query.addSnapshotListener(listener);
+
+ try {
+ listener.eventsCountDownLatch.awaitInitialEvents();
+ listener
+ .assertionsForLastEvent()
+ .noError()
+ .addedIdsIsAnyOf("doc1", "doc2")
+ .modifiedIdsIsEmpty()
+ .removedIdsIsEmpty();
+ listener.lastDocumentIdsIsAnyOf("doc1", "doc2");
+
+ // Trigger existence filter mismatch, thereby invoking retry behavior.
+ // Prompting Firestore to send filter mismatch is difficult, so we hack
+ // in the response. All we are concerned about is invoking retry.
+ firestoreSpy.streamRequestBidiStreamObserver.onResponse(filter(0));
+
+ // A race condition will sometimes throw an error if the SuppressibleBidiStream does not
+ // silence the old stream. This can be caused by `Preconditions.checkState(stream == null)`
+ // in Watch class.
+
+ setDocument("doc3", map("foo", "bar"));
+ listener.eventsCountDownLatch.await(DocumentChange.Type.ADDED);
+ listener
+ .assertionsForLastEvent()
+ .noError()
+ .addedIdsIsAnyOf("doc3")
+ .modifiedIdsIsEmpty()
+ .removedIdsIsEmpty();
+ listener.lastDocumentIdsIsAnyOf("doc1", "doc2", "doc3");
+
+ testDoc1.set(map("bar", "foo")).get(5, TimeUnit.SECONDS);
+ listener.eventsCountDownLatch.await(DocumentChange.Type.REMOVED);
+ listener
+ .assertionsForLastEvent()
+ .noError()
+ .addedIdsIsEmpty()
+ .modifiedIdsIsEmpty()
+ .removedIdsIsAnyOf("doc1");
+ listener.lastDocumentIdsIsAnyOf("doc2", "doc3");
+
+ } finally {
+ registration.remove();
+ }
+
+ ListenerAssertions listenerAssertions = listener.assertions();
+ listenerAssertions.noError();
+ listenerAssertions.eventCountIsAnyOf(Range.singleton(3));
+ }
+
/** Verifies that QuerySnapshot for limitToLast() queries are ordered correctly. */
@Test
public void limitToLast() throws Exception {
@@ -609,6 +682,20 @@ ListenerAssertions assertions() {
return new ListenerAssertions(receivedEvents);
}
+ ListenerAssertions assertionsForLastEvent() {
+ return new ListenerAssertions(singletonList(lastListenerEvent()));
+ }
+
+ ListenerEvent lastListenerEvent() {
+ return receivedEvents.get(receivedEvents.size() - 1);
+ }
+
+ void lastDocumentIdsIsAnyOf(String... s) {
+ List ids =
+ Lists.transform(lastListenerEvent().value.getDocuments(), DocumentSnapshot::getId);
+ Truth.assertThat(ids).containsExactlyElementsIn(s);
+ }
+
static Builder builder() {
return new Builder();
}
@@ -669,20 +756,22 @@ static final class ListenerAssertions {
removedIds = getIds(querySnapshots, DocumentChange.Type.REMOVED);
}
- private void noError() {
+ private ListenerAssertions noError() {
final Optional anyError =
receivedEvents.stream().filter(input -> input.error != null).findFirst();
- assertWithMessage("snapshotListener received an error")
- .that(anyError.isPresent())
- .isFalse();
+ if (anyError.isPresent()) {
+ throw new Error("snapshotListener received an error", anyError.get().error);
+ }
+ return this;
}
- private void hasError() {
+ private ListenerAssertions hasError() {
final Optional anyError =
receivedEvents.stream().filter(input -> input.error != null).findFirst();
assertWithMessage("snapshotListener did not receive an expected error")
.that(anyError.isPresent())
.isTrue();
+ return this;
}
private static List getQuerySnapshots(List events) {
@@ -706,32 +795,54 @@ private static List getIds(
return documentIds;
}
- void addedIdsIsAnyOf(List> s) {
- Truth.assertWithMessage(debugMessage()).that(addedIds).isEqualTo(s);
+ ListenerAssertions addedIdsIsEmpty() {
+ Truth.assertWithMessage(debugMessage()).that(addedIds).isEmpty();
+ return this;
+ }
+
+ ListenerAssertions addedIdsIsAnyOf(String... s) {
+ Truth.assertWithMessage(debugMessage()).that(addedIds).containsExactlyElementsIn(s);
+ return this;
}
- void addedIdsIsAnyOf(List> s1, List> s2) {
+ ListenerAssertions addedIdsIsAnyOf(List> s1, List> s2) {
Truth.assertWithMessage(debugMessage()).that(addedIds).isAnyOf(s1, s2);
+ return this;
}
- void modifiedIdsIsAnyOf(List> s) {
- Truth.assertWithMessage(debugMessage()).that(modifiedIds).isEqualTo(s);
+ ListenerAssertions modifiedIdsIsEmpty() {
+ Truth.assertWithMessage(debugMessage()).that(modifiedIds).isEmpty();
+ return this;
}
- void modifiedIdsIsAnyOf(List> s1, List> s2) {
+ ListenerAssertions modifiedIdsIsAnyOf(String... s) {
+ Truth.assertWithMessage(debugMessage()).that(modifiedIds).containsExactlyElementsIn(s);
+ return this;
+ }
+
+ ListenerAssertions modifiedIdsIsAnyOf(List> s1, List> s2) {
Truth.assertWithMessage(debugMessage()).that(modifiedIds).isAnyOf(s1, s2);
+ return this;
+ }
+
+ ListenerAssertions removedIdsIsEmpty() {
+ Truth.assertWithMessage(debugMessage()).that(removedIds).isEmpty();
+ return this;
}
- void removedIdsIsAnyOf(List> s) {
- Truth.assertWithMessage(debugMessage()).that(removedIds).isEqualTo(s);
+ ListenerAssertions removedIdsIsAnyOf(String... s) {
+ Truth.assertWithMessage(debugMessage()).that(removedIds).containsExactlyElementsIn(s);
+ return this;
}
- void removedIdsIsAnyOf(List> s1, List> s2) {
+ ListenerAssertions removedIdsIsAnyOf(List> s1, List> s2) {
Truth.assertWithMessage(debugMessage()).that(removedIds).isAnyOf(s1, s2);
+ return this;
}
- void eventCountIsAnyOf(Range range) {
+ ListenerAssertions eventCountIsAnyOf(Range range) {
Truth.assertWithMessage(debugMessage()).that((int) receivedEvents.size()).isIn(range);
+ return this;
}
private String debugMessage() {
@@ -797,4 +908,10 @@ private DocumentReference setDocument(String documentId, Map fields)
documentReference.set(fields).get();
return documentReference;
}
+
+ private ListenResponse filter(int documentCount) {
+ ListenResponse.Builder response = ListenResponse.newBuilder();
+ response.setFilter(ExistenceFilter.newBuilder().setCount(documentCount).build());
+ return response.build();
+ }
}
diff --git a/grpc-google-cloud-firestore-admin-v1/pom.xml b/grpc-google-cloud-firestore-admin-v1/pom.xml
index 2bfb5e0af..4ad35c5eb 100644
--- a/grpc-google-cloud-firestore-admin-v1/pom.xml
+++ b/grpc-google-cloud-firestore-admin-v1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
grpc-google-cloud-firestore-admin-v1
- 3.15.4
+ 3.15.5
grpc-google-cloud-firestore-admin-v1
GRPC library for grpc-google-cloud-firestore-admin-v1
com.google.cloud
google-cloud-firestore-parent
- 3.15.4
+ 3.15.5
diff --git a/grpc-google-cloud-firestore-v1/pom.xml b/grpc-google-cloud-firestore-v1/pom.xml
index 62610fbf6..2f92bd06d 100644
--- a/grpc-google-cloud-firestore-v1/pom.xml
+++ b/grpc-google-cloud-firestore-v1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
grpc-google-cloud-firestore-v1
- 3.15.4
+ 3.15.5
grpc-google-cloud-firestore-v1
GRPC library for grpc-google-cloud-firestore-v1
com.google.cloud
google-cloud-firestore-parent
- 3.15.4
+ 3.15.5
diff --git a/pom.xml b/pom.xml
index ba234957d..4aaca0181 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
com.google.cloud
google-cloud-firestore-parent
pom
- 3.15.4
+ 3.15.5
Google Cloud Firestore Parent
https://github.com/googleapis/java-firestore
@@ -150,32 +150,32 @@
com.google.api.grpc
proto-google-cloud-firestore-admin-v1
- 3.15.4
+ 3.15.5
com.google.cloud
google-cloud-firestore
- 3.15.4
+ 3.15.5
com.google.cloud
proto-google-cloud-firestore-bundle-v1
- 3.15.4
+ 3.15.5
com.google.api.grpc
proto-google-cloud-firestore-v1
- 3.15.4
+ 3.15.5
com.google.api.grpc
grpc-google-cloud-firestore-admin-v1
- 3.15.4
+ 3.15.5
com.google.api.grpc
grpc-google-cloud-firestore-v1
- 3.15.4
+ 3.15.5
diff --git a/proto-google-cloud-firestore-admin-v1/pom.xml b/proto-google-cloud-firestore-admin-v1/pom.xml
index 4e58481ec..b0cc893bb 100644
--- a/proto-google-cloud-firestore-admin-v1/pom.xml
+++ b/proto-google-cloud-firestore-admin-v1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
proto-google-cloud-firestore-admin-v1
- 3.15.4
+ 3.15.5
proto-google-cloud-firestore-admin-v1
PROTO library for proto-google-cloud-firestore-admin-v1
com.google.cloud
google-cloud-firestore-parent
- 3.15.4
+ 3.15.5
diff --git a/proto-google-cloud-firestore-bundle-v1/pom.xml b/proto-google-cloud-firestore-bundle-v1/pom.xml
index cbbd28056..6f115104c 100644
--- a/proto-google-cloud-firestore-bundle-v1/pom.xml
+++ b/proto-google-cloud-firestore-bundle-v1/pom.xml
@@ -5,14 +5,14 @@
4.0.0
proto-google-cloud-firestore-bundle-v1
- 3.15.4
+ 3.15.5
proto-google-cloud-firestore-bundle-v1
PROTO library for proto-google-cloud-firestore-bundle-v1
com.google.cloud
google-cloud-firestore-parent
- 3.15.4
+ 3.15.5
diff --git a/proto-google-cloud-firestore-v1/pom.xml b/proto-google-cloud-firestore-v1/pom.xml
index 6d8bf6a95..4033b1b63 100644
--- a/proto-google-cloud-firestore-v1/pom.xml
+++ b/proto-google-cloud-firestore-v1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
proto-google-cloud-firestore-v1
- 3.15.4
+ 3.15.5
proto-google-cloud-firestore-v1
PROTO library for proto-google-cloud-firestore-v1
com.google.cloud
google-cloud-firestore-parent
- 3.15.4
+ 3.15.5
diff --git a/samples/install-without-bom/pom.xml b/samples/install-without-bom/pom.xml
index cfa99dd2d..6501ce326 100644
--- a/samples/install-without-bom/pom.xml
+++ b/samples/install-without-bom/pom.xml
@@ -30,7 +30,7 @@
com.google.cloud
google-cloud-firestore
- 3.15.2
+ 3.15.4
diff --git a/samples/native-image-sample/pom.xml b/samples/native-image-sample/pom.xml
index cfd3e2e4b..3cee46c63 100644
--- a/samples/native-image-sample/pom.xml
+++ b/samples/native-image-sample/pom.xml
@@ -32,7 +32,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
com.google.cloud
libraries-bom
- 26.26.0
+ 26.27.0
pom
import
diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml
index 99bfbac62..5104bb4e5 100644
--- a/samples/snapshot/pom.xml
+++ b/samples/snapshot/pom.xml
@@ -29,7 +29,7 @@
com.google.cloud
google-cloud-firestore
- 3.15.2
+ 3.15.4
diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml
index 7e295263c..7bbf9b9f5 100644
--- a/samples/snippets/pom.xml
+++ b/samples/snippets/pom.xml
@@ -34,7 +34,7 @@
com.google.cloud
libraries-bom
- 26.26.0
+ 26.27.0
pom
import
diff --git a/versions.txt b/versions.txt
index 76c76c65c..91b94d737 100644
--- a/versions.txt
+++ b/versions.txt
@@ -1,11 +1,11 @@
# Format:
# module:released-version:current-version
-google-cloud-firestore:3.15.4:3.15.4
-google-cloud-firestore-admin:3.15.4:3.15.4
-google-cloud-firestore-bom:3.15.4:3.15.4
-grpc-google-cloud-firestore-admin-v1:3.15.4:3.15.4
-grpc-google-cloud-firestore-v1:3.15.4:3.15.4
-proto-google-cloud-firestore-admin-v1:3.15.4:3.15.4
-proto-google-cloud-firestore-v1:3.15.4:3.15.4
-proto-google-cloud-firestore-bundle-v1:3.15.4:3.15.4
+google-cloud-firestore:3.15.5:3.15.5
+google-cloud-firestore-admin:3.15.5:3.15.5
+google-cloud-firestore-bom:3.15.5:3.15.5
+grpc-google-cloud-firestore-admin-v1:3.15.5:3.15.5
+grpc-google-cloud-firestore-v1:3.15.5:3.15.5
+proto-google-cloud-firestore-admin-v1:3.15.5:3.15.5
+proto-google-cloud-firestore-v1:3.15.5:3.15.5
+proto-google-cloud-firestore-bundle-v1:3.15.5:3.15.5