Skip to content

Commit 12d17d1

Browse files
fix: return results from getPartitions() in order (#653)
1 parent 41a0078 commit 12d17d1

File tree

3 files changed

+232
-13
lines changed

3 files changed

+232
-13
lines changed

google-cloud-firestore/src/main/java/com/google/cloud/firestore/CollectionGroup.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
import io.opencensus.common.Scope;
3434
import io.opencensus.trace.Span;
3535
import io.opencensus.trace.Status;
36+
import java.util.ArrayList;
3637
import java.util.Collections;
38+
import java.util.Comparator;
3739
import java.util.List;
3840
import javax.annotation.Nullable;
3941

@@ -63,6 +65,8 @@ public class CollectionGroup extends Query {
6365
* parallel. The returned partition cursors are split points that can be used as starting/end
6466
* points for the query results.
6567
*
68+
* @deprecated Please use {@link #getPartitions(long)} instead. All cursors will be loaded before
69+
* any value will be provided to {@code observer}.
6670
* @param desiredPartitionCount The desired maximum number of partition points. The number must be
6771
* strictly positive. The actual number of partitions returned may be fewer.
6872
* @param observer a stream observer that receives the result of the Partition request.
@@ -159,8 +163,23 @@ private PartitionQueryRequest buildRequest(long desiredPartitionCount) {
159163

160164
private void consumePartitions(
161165
PartitionQueryPagedResponse response, Function<QueryPartition, Void> consumer) {
162-
@Nullable Object[] lastCursor = null;
166+
List<Cursor> cursors = new ArrayList<>();
163167
for (Cursor cursor : response.iterateAll()) {
168+
cursors.add(cursor);
169+
}
170+
171+
// Sort the partitions as they may not be ordered if responses are paged.
172+
Collections.sort(
173+
cursors,
174+
new Comparator<Cursor>() {
175+
@Override
176+
public int compare(Cursor left, Cursor right) {
177+
return Order.INSTANCE.compareArrays(left.getValuesList(), right.getValuesList());
178+
}
179+
});
180+
181+
@Nullable Object[] lastCursor = null;
182+
for (Cursor cursor : cursors) {
164183
Object[] decodedCursorValue = new Object[cursor.getValuesCount()];
165184
for (int i = 0; i < cursor.getValuesCount(); ++i) {
166185
decodedCursorValue[i] = UserDataConverter.decodeValue(rpcContext, cursor.getValues(i));

google-cloud-firestore/src/main/java/com/google/cloud/firestore/Order.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ public int compare(@Nonnull Value left, @Nonnull Value right) {
109109
case GEO_POINT:
110110
return compareGeoPoints(left, right);
111111
case ARRAY:
112-
return compareArrays(left, right);
112+
return compareArrays(
113+
left.getArrayValue().getValuesList(), right.getArrayValue().getValuesList());
113114
case OBJECT:
114115
return compareObjects(left, right);
115116
default:
@@ -171,27 +172,22 @@ private int compareResourcePaths(Value left, Value right) {
171172
return leftPath.compareTo(rightPath);
172173
}
173174

174-
private int compareArrays(Value left, Value right) {
175-
List<Value> leftValue = left.getArrayValue().getValuesList();
176-
List<Value> rightValue = right.getArrayValue().getValuesList();
177-
178-
int minLength = Math.min(leftValue.size(), rightValue.size());
175+
public int compareArrays(List<Value> left, List<Value> right) {
176+
int minLength = Math.min(left.size(), right.size());
179177
for (int i = 0; i < minLength; i++) {
180-
int cmp = compare(leftValue.get(i), rightValue.get(i));
178+
int cmp = compare(left.get(i), right.get(i));
181179
if (cmp != 0) {
182180
return cmp;
183181
}
184182
}
185-
return Integer.compare(leftValue.size(), rightValue.size());
183+
return Integer.compare(left.size(), right.size());
186184
}
187185

188186
private int compareObjects(Value left, Value right) {
189187
// This requires iterating over the keys in the object in order and doing a
190188
// deep comparison.
191-
SortedMap<String, Value> leftMap = new TreeMap<>();
192-
leftMap.putAll(left.getMapValue().getFieldsMap());
193-
SortedMap<String, Value> rightMap = new TreeMap<>();
194-
rightMap.putAll(right.getMapValue().getFieldsMap());
189+
SortedMap<String, Value> leftMap = new TreeMap<>(left.getMapValue().getFieldsMap());
190+
SortedMap<String, Value> rightMap = new TreeMap<>(right.getMapValue().getFieldsMap());
195191

196192
Iterator<Entry<String, Value>> leftIterator = leftMap.entrySet().iterator();
197193
Iterator<Entry<String, Value>> rightIterator = rightMap.entrySet().iterator();
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.firestore;
18+
19+
import static com.google.cloud.firestore.LocalFirestoreHelper.queryResponse;
20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertFalse;
22+
import static org.junit.Assert.assertNull;
23+
import static org.junit.Assert.fail;
24+
import static org.mockito.Mockito.doAnswer;
25+
import static org.mockito.Mockito.doReturn;
26+
import static org.mockito.Mockito.when;
27+
28+
import com.google.api.core.ApiFutures;
29+
import com.google.api.gax.rpc.ApiStreamObserver;
30+
import com.google.api.gax.rpc.ServerStreamingCallable;
31+
import com.google.api.gax.rpc.UnaryCallable;
32+
import com.google.cloud.firestore.spi.v1.FirestoreRpc;
33+
import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPage;
34+
import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
35+
import com.google.common.collect.ImmutableList;
36+
import com.google.firestore.v1.Cursor;
37+
import com.google.firestore.v1.PartitionQueryRequest;
38+
import com.google.firestore.v1.PartitionQueryResponse;
39+
import com.google.firestore.v1.RunQueryRequest;
40+
import com.google.firestore.v1.StructuredQuery;
41+
import com.google.firestore.v1.Value;
42+
import java.util.List;
43+
import org.junit.Test;
44+
import org.junit.runner.RunWith;
45+
import org.mockito.ArgumentCaptor;
46+
import org.mockito.Captor;
47+
import org.mockito.Matchers;
48+
import org.mockito.Mock;
49+
import org.mockito.Mockito;
50+
import org.mockito.Spy;
51+
import org.mockito.runners.MockitoJUnitRunner;
52+
53+
@RunWith(MockitoJUnitRunner.class)
54+
public class PartitionQuery {
55+
public static final String DATABASE_NAME = "projects/test-project/databases/(default)/documents";
56+
public static final Cursor CURSOR1 =
57+
Cursor.newBuilder()
58+
.addValues(Value.newBuilder().setReferenceValue(DATABASE_NAME + "/collection/doc1"))
59+
.build();
60+
public static final Cursor PARTITION1 = CURSOR1.toBuilder().setBefore(true).build();
61+
public static final Cursor CURSOR2 =
62+
Cursor.newBuilder()
63+
.addValues(Value.newBuilder().setReferenceValue(DATABASE_NAME + "/collection/doc2"))
64+
.build();
65+
public static final Cursor PARTITION2 = CURSOR2.toBuilder().setBefore(true).build();
66+
67+
@Spy
68+
private final FirestoreImpl firestoreMock =
69+
new FirestoreImpl(
70+
FirestoreOptions.newBuilder().setProjectId("test-project").build(),
71+
Mockito.mock(FirestoreRpc.class));
72+
73+
@Mock private UnaryCallable<PartitionQueryRequest, PartitionQueryPagedResponse> callable;
74+
@Mock private PartitionQueryPagedResponse pagedResponse;
75+
@Mock private PartitionQueryPage queryPage;
76+
77+
@Captor private ArgumentCaptor<RunQueryRequest> runQuery;
78+
@Captor private ArgumentCaptor<ApiStreamObserver> streamObserverCapture;
79+
@Captor private ArgumentCaptor<PartitionQueryRequest> requestCaptor;
80+
81+
@Test
82+
public void requestsOneLessThanDesired() throws Exception {
83+
int desiredPartitionsCount = 2;
84+
85+
PartitionQueryRequest expectedRequest =
86+
PartitionQueryRequest.newBuilder()
87+
.setParent(DATABASE_NAME)
88+
.setStructuredQuery(
89+
StructuredQuery.newBuilder()
90+
.addFrom(
91+
StructuredQuery.CollectionSelector.newBuilder()
92+
.setAllDescendants(true)
93+
.setCollectionId("collectionId"))
94+
.addOrderBy(
95+
StructuredQuery.Order.newBuilder()
96+
.setField(
97+
StructuredQuery.FieldReference.newBuilder()
98+
.setFieldPath("__name__"))
99+
.setDirection(StructuredQuery.Direction.ASCENDING)))
100+
.setPartitionCount(desiredPartitionsCount - 1)
101+
.build();
102+
103+
PartitionQueryResponse response =
104+
PartitionQueryResponse.newBuilder().addPartitions(CURSOR1).build();
105+
106+
when(pagedResponse.iterateAll()).thenReturn(ImmutableList.of(CURSOR1));
107+
when(queryPage.getResponse()).thenReturn(response);
108+
doReturn(ApiFutures.immediateFuture(pagedResponse))
109+
.when(firestoreMock)
110+
.sendRequest(
111+
requestCaptor.capture(),
112+
Matchers.<UnaryCallable<PartitionQueryRequest, PartitionQueryPagedResponse>>any());
113+
114+
firestoreMock.collectionGroup("collectionId").getPartitions(desiredPartitionsCount).get();
115+
116+
PartitionQueryRequest actualRequest = requestCaptor.getValue();
117+
assertEquals(actualRequest, expectedRequest);
118+
}
119+
120+
@Test
121+
public void doesNotIssueRpcIfOnlyASinglePartitionIsRequested() throws Exception {
122+
int desiredPartitionsCount = 1;
123+
124+
List<QueryPartition> partitions =
125+
firestoreMock.collectionGroup("collectionId").getPartitions(desiredPartitionsCount).get();
126+
127+
assertEquals(partitions.size(), 1);
128+
assertNull(partitions.get(0).getStartAt());
129+
assertNull(partitions.get(0).getEndBefore());
130+
}
131+
132+
@Test
133+
public void validatesPartitionCount() {
134+
int desiredPartitionsCount = 0;
135+
try {
136+
firestoreMock.collectionGroup("collectionId").getPartitions(desiredPartitionsCount);
137+
fail();
138+
} catch (IllegalArgumentException e) {
139+
assertEquals(e.getMessage(), "Desired partition count must be one or greater");
140+
}
141+
}
142+
143+
@Test
144+
public void convertsPartitionsToQueries() throws Exception {
145+
int desiredPartitionsCount = 3;
146+
147+
PartitionQueryResponse response =
148+
PartitionQueryResponse.newBuilder().addPartitions(CURSOR1).build();
149+
150+
when(pagedResponse.iterateAll()).thenReturn(ImmutableList.of(CURSOR1, CURSOR2));
151+
when(queryPage.getResponse()).thenReturn(response);
152+
doReturn(ApiFutures.immediateFuture(pagedResponse))
153+
.when(firestoreMock)
154+
.sendRequest(
155+
requestCaptor.capture(),
156+
Matchers.<UnaryCallable<PartitionQueryRequest, PartitionQueryPagedResponse>>any());
157+
158+
doAnswer(queryResponse())
159+
.when(firestoreMock)
160+
.streamRequest(
161+
runQuery.capture(),
162+
streamObserverCapture.capture(),
163+
Matchers.<ServerStreamingCallable>any());
164+
165+
List<QueryPartition> partitions =
166+
firestoreMock.collectionGroup("collectionId").getPartitions(desiredPartitionsCount).get();
167+
168+
assertEquals(partitions.size(), 3);
169+
for (QueryPartition partition : partitions) {
170+
partition.createQuery().get();
171+
}
172+
173+
assertEquals(runQuery.getAllValues().size(), 3);
174+
175+
assertFalse(runQuery.getAllValues().get(0).getStructuredQuery().hasStartAt());
176+
assertEquals(runQuery.getAllValues().get(0).getStructuredQuery().getEndAt(), PARTITION1);
177+
assertEquals(runQuery.getAllValues().get(1).getStructuredQuery().getStartAt(), PARTITION1);
178+
assertEquals(runQuery.getAllValues().get(1).getStructuredQuery().getEndAt(), PARTITION2);
179+
assertEquals(runQuery.getAllValues().get(2).getStructuredQuery().getStartAt(), PARTITION2);
180+
assertFalse(runQuery.getAllValues().get(2).getStructuredQuery().hasEndAt());
181+
}
182+
183+
@Test
184+
public void sortsPartitions() throws Exception {
185+
int desiredPartitionsCount = 3;
186+
187+
PartitionQueryResponse response =
188+
PartitionQueryResponse.newBuilder().addPartitions(CURSOR1).build();
189+
190+
when(pagedResponse.iterateAll()).thenReturn(ImmutableList.of(CURSOR2, CURSOR1));
191+
when(queryPage.getResponse()).thenReturn(response);
192+
doReturn(ApiFutures.immediateFuture(pagedResponse))
193+
.when(firestoreMock)
194+
.sendRequest(
195+
requestCaptor.capture(),
196+
Matchers.<UnaryCallable<PartitionQueryRequest, PartitionQueryPagedResponse>>any());
197+
198+
List<QueryPartition> partitions =
199+
firestoreMock.collectionGroup("collectionId").getPartitions(desiredPartitionsCount).get();
200+
201+
assertEquals(((DocumentReference) partitions.get(0).getEndBefore()[0]).getId(), "doc1");
202+
assertEquals(((DocumentReference) partitions.get(1).getEndBefore()[0]).getId(), "doc2");
203+
}
204+
}

0 commit comments

Comments
 (0)