fix: Prevent watch stream from emitting events after close.#1471
fix: Prevent watch stream from emitting events after close.#1471tom-andersen merged 12 commits intomainfrom
Conversation
| } else { | ||
| long delayMs = rateLimiter.getNextRequestDelayMs(batch.getMutationsSize()); | ||
| logger.log(Level.FINE, String.format("Backing off for %d seconds", delayMs / 1000)); | ||
| logger.log(Level.FINE, () -> String.format("Backing off for %d seconds", delayMs / 1000)); |
There was a problem hiding this comment.
This isn't directly related to PR. But since I was adding logging...
I noticed this log was building a string, even if log level is not fine. A lambda allows lazy evaluation, such that production environments don't suffer work when logging is set to a more coarse level.
| public class WatchTest { | ||
|
|
||
| @Rule | ||
| public Timeout timeout = new Timeout(1, TimeUnit.SECONDS); |
There was a problem hiding this comment.
While working through problem, I introduced bug that made unit tests hang. To make sure tests completed, I added timeout to fail test if they don't run within 1 second.
wu-hui
left a comment
There was a problem hiding this comment.
Please also look into if we can add unit tests for the new stream class.
| import java.util.function.Function; | ||
| import java.util.logging.Logger; | ||
|
|
||
| final class SuppressibleBidiStream<RequestT, ResponseT> |
There was a problem hiding this comment.
Please add a class level comment to explain what this class does and why it is neccessary.
|
|
||
| private final ClientStream<ListenRequest> stream; | ||
| private final BidiStreamObserver<RequestT, ResponseT> delegate; | ||
| private boolean silence = false; |
There was a problem hiding this comment.
Since the class is now call Suppressbile.., this should be isSuppressed?
| stream.closeSend(); | ||
| } | ||
|
|
||
| public void closeAndSilence() { |
There was a problem hiding this comment.
Same here, use closeAndSuppress instead.
| @Override | ||
| public void onReady(ClientStream<RequestT> stream) { | ||
| if (silence) { | ||
| LOGGER.info(() -> String.format("Silenced: %s", stream)); |
| if (listenResponse.getFilter().getCount() != currentSize()) { | ||
| int filterCount = listenResponse.getFilter().getCount(); | ||
| int currentSize = currentSize(); | ||
| if (filterCount != currentSize) { |
There was a problem hiding this comment.
Inlining currentSize() seems better IMO?
There was a problem hiding this comment.
currentSize() hides a lot of computation:
private int currentSize() {
ChangeSet changeSet = extractChanges(Timestamp.now());
return documentSet.size() + changeSet.adds.size() - changeSet.deletes.size();
}
private ChangeSet extractChanges(Timestamp readTime) {
ChangeSet changeSet = new ChangeSet();
for (Entry<ResourcePath, Document> change : changeMap.entrySet()) {
if (change.getValue() == null) {
if (documentSet.contains(change.getKey())) {
changeSet.deletes.add(documentSet.getDocument(change.getKey()));
}
continue;
}
QueryDocumentSnapshot snapshot =
QueryDocumentSnapshot.fromDocument(firestore, readTime, change.getValue());
if (documentSet.contains(change.getKey())) {
changeSet.updates.add(snapshot);
} else {
changeSet.adds.add(snapshot);
}
}
return changeSet;
}
There was a problem hiding this comment.
Sorry, I meant if (filterCount != currentSize())
| listener.eventsCountDownLatch.awaitInitialEvents(); | ||
| listener | ||
| .assertionsForLastEvent() | ||
| .noError() |
There was a problem hiding this comment.
Add more comments explaining why we have this test and what would happen if SuppressiableStream is not used.
|
|
||
| // A race condition will sometimes throw an error if the SuppressibleBidiStream does not | ||
| // silence the old stream. This can be caused by `Preconditions.checkState(stream == null)` | ||
| // in Watch class. |
There was a problem hiding this comment.
Explanation added here.
wu-hui
left a comment
There was a problem hiding this comment.
Some minor nits, but LGTM in general, thanks!
| * allowed to send `onError` or `onComplete` since that would signal the downstream that the stream | ||
| * is finished. | ||
| * | ||
| * @param <RequestT> |
There was a problem hiding this comment.
Don't think these @params are useful here.
| if (listenResponse.getFilter().getCount() != currentSize()) { | ||
| int filterCount = listenResponse.getFilter().getCount(); | ||
| int currentSize = currentSize(); | ||
| if (filterCount != currentSize) { |
There was a problem hiding this comment.
Sorry, I meant if (filterCount != currentSize())
|
|
||
| @Mock ClientStream<Integer> mockClientStream; | ||
|
|
||
| SilenceableBidiStream<Integer, String> sut; |
There was a problem hiding this comment.
what does sut stand for here?
There was a problem hiding this comment.
System Under Test
b/288270811