Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
05f352d
Update protos from preview branch of https://github.com/googleapis/go…
dconeybe Aug 26, 2022
396c22a
update clirr-ignored-differences.xml
dconeybe Aug 26, 2022
70a61ad
fix copyrights in files exported from googleapis-gen
dconeybe Aug 26, 2022
1ae9901
Firestore[Stub]Settings.java: add back newHttpJsonBuilder() and frien…
dconeybe Aug 26, 2022
741c0fa
format code by running mvn com.coveo:fmt-maven-plugin:format
dconeybe Aug 26, 2022
0ded2c0
Merge remote-tracking branch 'origin/main' into count
dconeybe Aug 30, 2022
80189c1
feat: Query.count() implementation, part 1 (#1026)
dconeybe Aug 31, 2022
066718c
Merge remote-tracking branch 'origin/main' into count
dconeybe Sep 7, 2022
23a1557
feat: Transaction.get(AggregateQuery) added (#1029)
dconeybe Sep 8, 2022
f3ce77a
Merge remote-tracking branch 'origin/main' into count
dconeybe Sep 15, 2022
8e22716
Fix integration tests to use the default FirestoreOptions
dconeybe Sep 16, 2022
b7ec1a1
Merge remote-tracking branch 'origin/main' into count
dconeybe Sep 17, 2022
b227ea9
Reduce visibility of COUNT queries until it's ready to be released
dconeybe Sep 17, 2022
08b46b8
ITQueryCountTest.java: fix import of Preconditions from the correct p…
dconeybe Sep 17, 2022
862a8c9
AggregateQuerySnapshot.java: change return value of getCount() to `lo…
dconeybe Sep 17, 2022
ddcaf21
AggregateQuery.java: close the stream after receiving the first result
dconeybe Sep 17, 2022
77936cd
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 17, 2022
4b8d8f6
LocalFirestoreHelper.java: make sure to call onStart() on the Respons…
dconeybe Sep 19, 2022
c963f7b
AggregateQuery.java: simplify the logic to avoid repeated completion …
dconeybe Sep 19, 2022
16af34a
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 19, 2022
0d61d13
Convert ITAggregateQueryTest to a unit test
dconeybe Sep 20, 2022
8b8da7a
AggregateQuery.java: improve error messages if bad responses are rece…
dconeybe Sep 20, 2022
e8a2f61
QueryCountTest.java unit test added
dconeybe Sep 20, 2022
ef39a25
Merge remote-tracking branch 'origin/main' into count
dconeybe Sep 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.1.1')
implementation platform('com.google.cloud:libraries-bom:26.1.2')

implementation 'com.google.cloud:google-cloud-firestore'
```
Expand Down
7 changes: 7 additions & 0 deletions google-cloud-firestore/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,11 @@
<className>com/google/cloud/firestore/Firestore</className>
<method>com.google.api.core.ApiFuture recursiveDelete(*)</method>
</difference>

<!-- Aggregate Queries -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/firestore/spi/v1/FirestoreRpc</className>
<method>com.google.api.gax.rpc.ServerStreamingCallable runAggregationQueryCallable()</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.firestore;

import com.google.api.core.ApiFuture;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.Timestamp;
import com.google.firestore.v1.RunAggregationQueryRequest;
import com.google.firestore.v1.RunAggregationQueryResponse;
import com.google.firestore.v1.RunQueryRequest;
import com.google.firestore.v1.StructuredAggregationQuery;
import com.google.firestore.v1.Value;
import com.google.protobuf.ByteString;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

// TODO(count) Make this class public
@InternalExtensionOnly
class AggregateQuery {

/**
* The "alias" to specify in the {@link RunAggregationQueryRequest} proto when running a count
* query. The actual value is not meaningful, but will be used to get the count out of the {@link
* RunAggregationQueryResponse}.
*/
private static final String ALIAS_COUNT = "count";

@Nonnull private final Query query;

AggregateQuery(@Nonnull Query query) {
this.query = query;
}

@Nonnull
public Query getQuery() {
return query;
}

@Nonnull
public ApiFuture<AggregateQuerySnapshot> get() {
return get(null);
}

@Nonnull
ApiFuture<AggregateQuerySnapshot> get(@Nullable final ByteString transactionId) {
RunAggregationQueryRequest request = toProto(transactionId);
AggregateQueryResponseObserver responseObserver = new AggregateQueryResponseObserver();
ServerStreamingCallable<RunAggregationQueryRequest, RunAggregationQueryResponse> callable =
query.rpcContext.getClient().runAggregationQueryCallable();

query.rpcContext.streamRequest(request, responseObserver, callable);

return responseObserver.getFuture();
}

private final class AggregateQueryResponseObserver
implements ResponseObserver<RunAggregationQueryResponse> {

private final SettableApiFuture<AggregateQuerySnapshot> future = SettableApiFuture.create();
private final AtomicBoolean isFutureNotified = new AtomicBoolean(false);
private StreamController streamController;

SettableApiFuture<AggregateQuerySnapshot> getFuture() {
return future;
}

@Override
public void onStart(StreamController streamController) {
this.streamController = streamController;
}

@Override
public void onResponse(RunAggregationQueryResponse response) {
if (!isFutureNotified.compareAndSet(false, true)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be good to add a comment that we expect only one response and why. Please do as part of the next PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done: #1041

return;
}

Timestamp readTime = Timestamp.fromProto(response.getReadTime());
Value value = response.getResult().getAggregateFieldsMap().get(ALIAS_COUNT);
if (value == null) {
throw new IllegalArgumentException(
"RunAggregationQueryResponse is missing required alias: " + ALIAS_COUNT);
} else if (value.getValueTypeCase() != Value.ValueTypeCase.INTEGER_VALUE) {
throw new IllegalArgumentException(
"RunAggregationQueryResponse alias "
+ ALIAS_COUNT
+ " has incorrect type: "
+ value.getValueTypeCase());
}
long count = value.getIntegerValue();

future.set(new AggregateQuerySnapshot(AggregateQuery.this, readTime, count));

// Close the stream to avoid it dangling, since we're not expecting any more responses.
streamController.cancel();
}

@Override
public void onError(Throwable throwable) {
if (!isFutureNotified.compareAndSet(false, true)) {
return;
}

future.setException(throwable);
}

@Override
public void onComplete() {}
}

@Nonnull
public RunAggregationQueryRequest toProto() {
return toProto(null);
}

@Nonnull
RunAggregationQueryRequest toProto(@Nullable final ByteString transactionId) {
RunQueryRequest runQueryRequest = query.toProto();

RunAggregationQueryRequest.Builder request = RunAggregationQueryRequest.newBuilder();
request.setParent(runQueryRequest.getParent());
if (transactionId != null) {
request.setTransaction(transactionId);
}

StructuredAggregationQuery.Builder structuredAggregationQuery =
request.getStructuredAggregationQueryBuilder();
structuredAggregationQuery.setStructuredQuery(runQueryRequest.getStructuredQuery());

StructuredAggregationQuery.Aggregation.Builder aggregation =
StructuredAggregationQuery.Aggregation.newBuilder();
aggregation.setCount(StructuredAggregationQuery.Aggregation.Count.getDefaultInstance());
aggregation.setAlias(ALIAS_COUNT);
structuredAggregationQuery.addAggregations(aggregation);

return request.build();
}

@Nonnull
public static AggregateQuery fromProto(Firestore firestore, RunAggregationQueryRequest proto) {
RunQueryRequest runQueryRequest =
RunQueryRequest.newBuilder()
.setParent(proto.getParent())
.setStructuredQuery(proto.getStructuredAggregationQuery().getStructuredQuery())
.build();
Query query = Query.fromProto(firestore, runQueryRequest);
return new AggregateQuery(query);
}

@Override
public int hashCode() {
return query.hashCode();
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
} else if (!(obj instanceof AggregateQuery)) {
return false;
}
AggregateQuery other = (AggregateQuery) obj;
return query.equals(other.query);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.firestore;

import com.google.api.core.InternalExtensionOnly;
import com.google.cloud.Timestamp;
import java.util.Objects;
import javax.annotation.Nonnull;

// TODO(count) Make this class public
@InternalExtensionOnly
class AggregateQuerySnapshot {

@Nonnull private final AggregateQuery query;
@Nonnull private final Timestamp readTime;
private final long count;

AggregateQuerySnapshot(@Nonnull AggregateQuery query, @Nonnull Timestamp readTime, long count) {
this.query = query;
this.readTime = readTime;
this.count = count;
}

@Nonnull
public AggregateQuery getQuery() {
return query;
}

@Nonnull
public Timestamp getReadTime() {
return readTime;
}

public long getCount() {
return count;
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
} else if (!(obj instanceof AggregateQuerySnapshot)) {
return false;
}

AggregateQuerySnapshot other = (AggregateQuerySnapshot) obj;
return query.equals(other.query) && readTime.equals(other.readTime) && count == other.count;
}

@Override
public int hashCode() {
return Objects.hash(query, readTime, count);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1846,6 +1846,12 @@ private boolean isRetryableError(Throwable throwable) {
return false;
}

// TODO(count) Make this method public
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also need a comment for the method. Please do as part of the next PR.

@Nonnull
AggregateQuery count() {
return new AggregateQuery(this);
}

/**
* Returns true if this Query is equal to the provided object.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,18 @@ public ApiFuture<QuerySnapshot> get(@Nonnull Query query) {

return query.get(transactionId);
}

// TODO(count) Make this method public
/**
* Returns the result from the provided aggregate query. Holds a pessimistic lock on all accessed
* documents.
*
* @return The result of the aggregation.
*/
@Nonnull
ApiFuture<AggregateQuerySnapshot> get(@Nonnull AggregateQuery query) {
Preconditions.checkState(isEmpty(), READ_BEFORE_WRITE_ERROR_MSG);

return query.get(transactionId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import com.google.firestore.v1.ListenResponse;
import com.google.firestore.v1.PartitionQueryRequest;
import com.google.firestore.v1.RollbackRequest;
import com.google.firestore.v1.RunAggregationQueryRequest;
import com.google.firestore.v1.RunAggregationQueryResponse;
import com.google.firestore.v1.RunQueryRequest;
import com.google.firestore.v1.RunQueryResponse;
import com.google.protobuf.Empty;
Expand All @@ -60,6 +62,10 @@ public interface FirestoreRpc extends AutoCloseable, ServiceRpc {
/** Runs a query. */
ServerStreamingCallable<RunQueryRequest, RunQueryResponse> runQueryCallable();

/** Runs an aggregation query. */
ServerStreamingCallable<RunAggregationQueryRequest, RunAggregationQueryResponse>
runAggregationQueryCallable();

/** Starts a new transaction. */
UnaryCallable<BeginTransactionRequest, BeginTransactionResponse> beginTransactionCallable();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import com.google.firestore.v1.ListenResponse;
import com.google.firestore.v1.PartitionQueryRequest;
import com.google.firestore.v1.RollbackRequest;
import com.google.firestore.v1.RunAggregationQueryRequest;
import com.google.firestore.v1.RunAggregationQueryResponse;
import com.google.firestore.v1.RunQueryRequest;
import com.google.firestore.v1.RunQueryResponse;
import com.google.protobuf.Empty;
Expand Down Expand Up @@ -208,6 +210,12 @@ public ServerStreamingCallable<RunQueryRequest, RunQueryResponse> runQueryCallab
return firestoreStub.runQueryCallable();
}

@Override
public ServerStreamingCallable<RunAggregationQueryRequest, RunAggregationQueryResponse>
runAggregationQueryCallable() {
return firestoreStub.runAggregationQueryCallable();
}

@Override
public UnaryCallable<BeginTransactionRequest, BeginTransactionResponse>
beginTransactionCallable() {
Expand Down
Loading