Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions google-cloud-firestore/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,27 @@

<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!-- Shutdown/Shutdown Now -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/firestore/Firestore</className>
<method>void shutdown()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/firestore/Firestore</className>
<method>void shutdownNow()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/firestore/spi/v1/FirestoreRpc</className>
<method>void shutdown()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/firestore/spi/v1/FirestoreRpc</className>
<method>void shutdownNow()</method>
</difference>

<!-- v2.1.1 -->
<difference>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,4 +296,13 @@ void getAll(
*/
@Override
void close() throws Exception;

/**
* Initiates an orderly shutdown in which previously submitted work is finished, but no new work
* will be accepted.
*/
void shutdown();

/** Attempts to stop all actively executing work and halts the processing of waiting work. */
void shutdownNow();
}
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,18 @@ public void close() throws Exception {
closed = true;
}

@Override
public void shutdown() {
firestoreClient.shutdown();
closed = true;
}

@Override
public void shutdownNow() {
firestoreClient.shutdownNow();
closed = true;
}

private static class TransactionAsyncAdapter<T> implements Transaction.AsyncFunction<T> {
private final Transaction.Function<T> syncFunction;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,15 +337,18 @@ private void closeStream(final Throwable throwable) {
new Runnable() {
@Override
public void run() {
listener.onEvent(
null,
throwable instanceof FirestoreException
? (FirestoreException) throwable
: FirestoreException.forApiException(
new ApiException(
throwable,
GrpcStatusCode.of(getStatus(throwable).getCode()),
false)));
if (throwable instanceof FirestoreException) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are necessary becuse of grpc interrupting the channel while this could still be listenting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and the exceptions thrown then do no have status codes. This is all based on local debugging.

listener.onEvent(null, (FirestoreException) throwable);
} else {
Status status = getStatus(throwable);
FirestoreException firestoreException =
FirestoreException.forApiException(
new ApiException(
throwable,
GrpcStatusCode.of(status != null ? status.getCode() : Code.UNKNOWN),
false));
listener.onEvent(null, firestoreException);
}
}
});
}
Expand Down Expand Up @@ -383,31 +386,36 @@ private void initStream() {
new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
return;
}

synchronized (Watch.this) {
try {
if (!isActive.get()) {
return;
}

Preconditions.checkState(stream == null);
synchronized (Watch.this) {
if (!isActive.get()) {
return;
}

current = false;
nextAttempt = backoff.createNextAttempt(nextAttempt);
Preconditions.checkState(stream == null);

Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_LISTEN);
stream = firestore.streamRequest(Watch.this, firestore.getClient().listenCallable());
current = false;
nextAttempt = backoff.createNextAttempt(nextAttempt);

ListenRequest.Builder request = ListenRequest.newBuilder();
request.setDatabase(firestore.getDatabaseName());
request.setAddTarget(target);
if (resumeToken != null) {
request.getAddTargetBuilder().setResumeToken(resumeToken);
}
Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_LISTEN);
stream =
firestore.streamRequest(Watch.this, firestore.getClient().listenCallable());

ListenRequest.Builder request = ListenRequest.newBuilder();
request.setDatabase(firestore.getDatabaseName());
request.setAddTarget(target);
if (resumeToken != null) {
request.getAddTargetBuilder().setResumeToken(resumeToken);
}

stream.onNext(request.build());
stream.onNext(request.build());
}
} catch (Throwable throwable) {
onError(throwable);
}
}
},
Expand Down Expand Up @@ -549,6 +557,10 @@ private List<DocumentChange> computeSnapshot(Timestamp readTime) {
private static boolean isPermanentError(Throwable throwable) {
Status status = getStatus(throwable);

if (status == null) {
return true;
}

switch (status.getCode()) {
case CANCELLED:
case UNKNOWN:
Expand All @@ -563,20 +575,20 @@ private static boolean isPermanentError(Throwable throwable) {
}
}

/** Extracts the GRPC status code if available. Returns UNKNOWN for non-GRPC exceptions. */
/** Extracts the GRPC status code if available. Returns `null` for non-GRPC exceptions. */
@Nullable
private static Status getStatus(Throwable throwable) {
Status status = Status.UNKNOWN;

if (throwable instanceof StatusRuntimeException) {
status = ((StatusRuntimeException) throwable).getStatus();
return ((StatusRuntimeException) throwable).getStatus();
} else if (throwable instanceof StatusException) {
status = ((StatusException) throwable).getStatus();
return ((StatusException) throwable).getStatus();
}
return status;
return null;
}

/** Determines whether we need to initiate a longer backoff due to system overload. */
private static boolean isResourceExhaustedError(Throwable throwable) {
return getStatus(throwable).getCode().equals(Code.RESOURCE_EXHAUSTED);
Status status = getStatus(throwable);
return status != null && status.getCode().equals(Code.RESOURCE_EXHAUSTED);
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,8 @@ public interface FirestoreRpc extends AutoCloseable, ServiceRpc {

/** Returns a bi-directional watch stream. */
BidiStreamingCallable<ListenRequest, ListenResponse> listenCallable();

void shutdownNow();

void shutdown();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.TransportChannel;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.rpc.UnaryCallSettings.Builder;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.NoCredentials;
import com.google.cloud.ServiceOptions;
Expand Down Expand Up @@ -127,7 +126,7 @@ public GrpcFirestoreRpc(final FirestoreOptions options) throws IOException {
clientContext = ClientContext.create(settingsBuilder.build());
}
ApiFunction<UnaryCallSettings.Builder<?, ?>, Void> retrySettingsSetter =
new ApiFunction<Builder<?, ?>, Void>() {
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
public Void apply(UnaryCallSettings.Builder<?, ?> builder) {
builder.setRetrySettings(options.getRetrySettings());
Expand All @@ -145,18 +144,43 @@ public Void apply(UnaryCallSettings.Builder<?, ?> builder) {

@Override
public void close() throws Exception {
if (!closed) {
firestoreStub.close();
for (BackgroundResource resource : clientContext.getBackgroundResources()) {
resource.close();
}
executorFactory.release(executor);
closed = true;
}
for (BackgroundResource resource : clientContext.getBackgroundResources()) {
resource.awaitTermination(1, TimeUnit.SECONDS);
}
}

@Override
public void shutdown() {
if (closed) {
return;
}
closed = true;
firestoreStub.close();
firestoreStub.shutdown();
for (BackgroundResource resource : clientContext.getBackgroundResources()) {
resource.close();
resource.shutdown();
}
executorFactory.release(executor);
closed = true;
}

@Override
public void shutdownNow() {
if (closed) {
return;
}
firestoreStub.shutdownNow();
for (BackgroundResource resource : clientContext.getBackgroundResources()) {
resource.awaitTermination(1, TimeUnit.SECONDS);
resource.shutdownNow();
}
executorFactory.release(executor);
closed = true;
}

@Override
Expand Down
Loading