Skip to content

Commit 1a0e970

Browse files
Merge pull request #776 from garrettjonesgoogle/pubsub-alpha
Adding bundling support for PublisherApi.publish
2 parents e32afa6 + e369b3c commit 1a0e970

File tree

10 files changed

+378
-168
lines changed

10 files changed

+378
-168
lines changed

gcloud-java-pubsub/baseline/src/main/java/com/google/gcloud/pubsub/spi/v1/PublisherApi.java

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@
3333

3434
package com.google.gcloud.pubsub.spi.v1;
3535

36-
import com.google.api.gax.grpc.ApiCallSettings;
3736
import com.google.api.gax.grpc.ApiCallable;
37+
import com.google.api.gax.grpc.ApiCallable.BundlableApiCallableInfo;
38+
import com.google.api.gax.grpc.BundlerFactory;
3839
import com.google.api.gax.protobuf.PathTemplate;
3940
import com.google.protobuf.Empty;
4041
import com.google.pubsub.v1.DeleteTopicRequest;
@@ -65,9 +66,25 @@
6566
*/
6667
@javax.annotation.Generated("by GAPIC")
6768
public class PublisherApi implements AutoCloseable {
69+
// ========
70+
// Members
71+
// ========
72+
73+
private final ManagedChannel channel;
74+
private final List<AutoCloseable> closeables = new ArrayList<>();
75+
76+
private final ApiCallable<Topic, Topic> createTopicCallable;
77+
private final ApiCallable<PublishRequest, PublishResponse> publishCallable;
78+
private final ApiCallable<GetTopicRequest, Topic> getTopicCallable;
79+
private final ApiCallable<ListTopicsRequest, ListTopicsResponse> listTopicsCallable;
80+
private final ApiCallable<ListTopicsRequest, Iterable<Topic>> listTopicsIterableCallable;
81+
private final ApiCallable<ListTopicSubscriptionsRequest, ListTopicSubscriptionsResponse>
82+
listTopicSubscriptionsCallable;
83+
private final ApiCallable<ListTopicSubscriptionsRequest, Iterable<String>>
84+
listTopicSubscriptionsIterableCallable;
85+
private final ApiCallable<DeleteTopicRequest, Empty> deleteTopicCallable;
6886

6987
public static class ResourceNames {
70-
private ResourceNames() {}
7188

7289
// =======================
7390
// ResourceNames Constants
@@ -93,6 +110,8 @@ private ResourceNames() {}
93110
private static final PathTemplate TOPIC_PATH_TEMPLATE =
94111
PathTemplate.create("projects/{project}/topics/{topic}");
95112

113+
private ResourceNames() {}
114+
96115
// ==============================
97116
// Resource Name Helper Functions
98117
// ==============================
@@ -153,24 +172,6 @@ public static final String parseTopicFromTopicPath(String topicPath) {
153172
}
154173
}
155174

156-
// ========
157-
// Members
158-
// ========
159-
160-
private final ManagedChannel channel;
161-
private final List<AutoCloseable> closeables = new ArrayList<>();
162-
163-
private final ApiCallable<Topic, Topic> createTopicCallable;
164-
private final ApiCallable<PublishRequest, PublishResponse> publishCallable;
165-
private final ApiCallable<GetTopicRequest, Topic> getTopicCallable;
166-
private final ApiCallable<ListTopicsRequest, ListTopicsResponse> listTopicsCallable;
167-
private final ApiCallable<ListTopicsRequest, Iterable<Topic>> listTopicsIterableCallable;
168-
private final ApiCallable<ListTopicSubscriptionsRequest, ListTopicSubscriptionsResponse>
169-
listTopicSubscriptionsCallable;
170-
private final ApiCallable<ListTopicSubscriptionsRequest, Iterable<String>>
171-
listTopicSubscriptionsIterableCallable;
172-
private final ApiCallable<DeleteTopicRequest, Empty> deleteTopicCallable;
173-
174175
// ===============
175176
// Factory Methods
176177
// ===============
@@ -186,8 +187,9 @@ public static PublisherApi create() throws IOException {
186187
}
187188

188189
/**
189-
* Constructs an instance of PublisherApi, using the given settings. The channels are created based
190-
* on the settings passed in, or defaults for any settings that are not set.
190+
* Constructs an instance of PublisherApi, using the given settings.
191+
* The channels are created based on the settings passed in, or defaults for any
192+
* settings that are not set.
191193
*
192194
* <!-- manual edit -->
193195
* <!-- end manual edit -->
@@ -197,8 +199,9 @@ public static PublisherApi create(PublisherSettings settings) throws IOException
197199
}
198200

199201
/**
200-
* Constructs an instance of PublisherApi, using the given settings. This is protected so that it
201-
* easy to make a subclass, but otherwise, the static factory methods should be preferred.
202+
* Constructs an instance of PublisherApi, using the given settings.
203+
* This is protected so that it easy to make a subclass, but otherwise, the static
204+
* factory methods should be preferred.
202205
*
203206
* <!-- manual edit -->
204207
* <!-- end manual edit -->
@@ -207,7 +210,14 @@ protected PublisherApi(PublisherSettings settings) throws IOException {
207210
this.channel = settings.getChannel();
208211

209212
this.createTopicCallable = settings.createTopicMethod().build(settings);
210-
this.publishCallable = settings.publishMethod().build(settings);
213+
BundlableApiCallableInfo<PublishRequest, PublishResponse> bundlablePublish =
214+
settings.publishMethod().buildBundlable(settings);
215+
this.publishCallable = bundlablePublish.getApiCallable();
216+
BundlerFactory<PublishRequest, PublishResponse> publishBundlerFactory =
217+
bundlablePublish.getBundlerFactory();
218+
if (publishBundlerFactory != null) {
219+
this.closeables.add(publishBundlerFactory);
220+
}
211221
this.getTopicCallable = settings.getTopicMethod().build(settings);
212222
this.listTopicsCallable = settings.listTopicsMethod().build(settings);
213223
this.listTopicsIterableCallable = settings.listTopicsMethod().buildPageStreaming(settings);

gcloud-java-pubsub/baseline/src/main/java/com/google/gcloud/pubsub/spi/v1/PublisherSettings.java

Lines changed: 89 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,11 @@
3838
import com.google.api.gax.core.RetryParams;
3939
import com.google.api.gax.grpc.ApiCallSettings;
4040
import com.google.api.gax.grpc.ApiCallable.ApiCallableBuilder;
41+
import com.google.api.gax.grpc.ApiCallable.BundlableApiCallableBuilder;
4142
import com.google.api.gax.grpc.ApiCallable.PageStreamingApiCallableBuilder;
42-
import com.google.api.gax.grpc.PageDescriptor;
43+
import com.google.api.gax.grpc.BundlingDescriptor;
44+
import com.google.api.gax.grpc.PageStreamingDescriptor;
45+
import com.google.api.gax.grpc.RequestIssuer;
4346
import com.google.api.gax.grpc.ServiceApiSettings;
4447
import com.google.common.collect.ImmutableList;
4548
import com.google.common.collect.ImmutableMap;
@@ -56,8 +59,12 @@
5659
import com.google.pubsub.v1.PublishRequest;
5760
import com.google.pubsub.v1.PublishResponse;
5861
import com.google.pubsub.v1.PublisherGrpc;
62+
import com.google.pubsub.v1.PubsubMessage;
5963
import com.google.pubsub.v1.Topic;
6064
import io.grpc.Status;
65+
import java.util.ArrayList;
66+
import java.util.Collection;
67+
import java.util.List;
6168

6269
// Manually-added imports: add custom (non-generated) imports after this point.
6370

@@ -132,9 +139,11 @@ public class PublisherSettings extends ServiceApiSettings {
132139
RETRY_PARAM_DEFINITIONS = definitions.build();
133140
}
134141

142+
private final MethodBuilders methods;
143+
135144
private static class MethodBuilders {
136145
private final ApiCallableBuilder<Topic, Topic> createTopicMethod;
137-
private final ApiCallableBuilder<PublishRequest, PublishResponse> publishMethod;
146+
private final BundlableApiCallableBuilder<PublishRequest, PublishResponse> publishMethod;
138147
private final ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod;
139148
private final PageStreamingApiCallableBuilder<ListTopicsRequest, ListTopicsResponse, Topic>
140149
listTopicsMethod;
@@ -149,7 +158,8 @@ public MethodBuilders() {
149158
createTopicMethod.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("idempotent"));
150159
createTopicMethod.setRetryParams(RETRY_PARAM_DEFINITIONS.get("default"));
151160

152-
publishMethod = new ApiCallableBuilder<>(PublisherGrpc.METHOD_PUBLISH);
161+
publishMethod =
162+
new BundlableApiCallableBuilder<>(PublisherGrpc.METHOD_PUBLISH, PUBLISH_BUNDLING_DESC);
153163
publishMethod.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("non_idempotent"));
154164
publishMethod.setRetryParams(RETRY_PARAM_DEFINITIONS.get("default"));
155165

@@ -187,8 +197,6 @@ public MethodBuilders() {
187197
}
188198
}
189199

190-
private final MethodBuilders methods;
191-
192200
// ===============
193201
// Factory Methods
194202
// ===============
@@ -211,8 +219,9 @@ public static PublisherSettings create() {
211219
}
212220

213221
/**
214-
* Constructs an instance of PublisherSettings with default settings. This is protected so that it
215-
* easy to make a subclass, but otherwise, the static factory methods should be preferred.
222+
* Constructs an instance of PublisherSettings with default settings. This is protected
223+
* so that it easy to make a subclass, but otherwise, the static factory methods should be
224+
* preferred.
216225
*
217226
* <!-- manual edit -->
218227
* <!-- end manual edit -->
@@ -223,7 +232,7 @@ protected PublisherSettings(MethodBuilders methods) {
223232
}
224233

225234
/**
226-
* Returns the ApiCallableBuilder for the API method createTopic.
235+
* Returns the builder for the API method createTopic.
227236
*
228237
* <!-- manual edit -->
229238
* <!-- end manual edit -->
@@ -233,17 +242,17 @@ public ApiCallableBuilder<Topic, Topic> createTopicMethod() {
233242
}
234243

235244
/**
236-
* Returns the ApiCallableBuilder for the API method publish.
245+
* Returns the builder for the API method publish.
237246
*
238247
* <!-- manual edit -->
239248
* <!-- end manual edit -->
240249
*/
241-
public ApiCallableBuilder<PublishRequest, PublishResponse> publishMethod() {
250+
public BundlableApiCallableBuilder<PublishRequest, PublishResponse> publishMethod() {
242251
return methods.publishMethod;
243252
}
244253

245254
/**
246-
* Returns the ApiCallableBuilder for the API method getTopic.
255+
* Returns the builder for the API method getTopic.
247256
*
248257
* <!-- manual edit -->
249258
* <!-- end manual edit -->
@@ -253,7 +262,7 @@ public ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod() {
253262
}
254263

255264
/**
256-
* Returns the PageStreamingApiCallableBuilder for the API method listTopics.
265+
* Returns the builder for the API method listTopics.
257266
*
258267
* <!-- manual edit -->
259268
* <!-- end manual edit -->
@@ -264,7 +273,7 @@ public ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod() {
264273
}
265274

266275
/**
267-
* Returns the PageStreamingApiCallableBuilder for the API method listTopicSubscriptions.
276+
* Returns the builder for the API method listTopicSubscriptions.
268277
*
269278
* <!-- manual edit -->
270279
* <!-- end manual edit -->
@@ -276,7 +285,7 @@ public ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod() {
276285
}
277286

278287
/**
279-
* Returns the ApiCallableBuilder for the API method deleteTopic.
288+
* Returns the builder for the API method deleteTopic.
280289
*
281290
* <!-- manual edit -->
282291
* <!-- end manual edit -->
@@ -285,9 +294,9 @@ public ApiCallableBuilder<DeleteTopicRequest, Empty> deleteTopicMethod() {
285294
return methods.deleteTopicMethod;
286295
}
287296

288-
private static PageDescriptor<ListTopicsRequest, ListTopicsResponse, Topic>
297+
private static PageStreamingDescriptor<ListTopicsRequest, ListTopicsResponse, Topic>
289298
LIST_TOPICS_PAGE_STR_DESC =
290-
new PageDescriptor<ListTopicsRequest, ListTopicsResponse, Topic>() {
299+
new PageStreamingDescriptor<ListTopicsRequest, ListTopicsResponse, Topic>() {
291300
@Override
292301
public Object emptyToken() {
293302
return "";
@@ -309,10 +318,10 @@ public Iterable<Topic> extractResources(ListTopicsResponse payload) {
309318
}
310319
};
311320

312-
private static PageDescriptor<
321+
private static PageStreamingDescriptor<
313322
ListTopicSubscriptionsRequest, ListTopicSubscriptionsResponse, String>
314323
LIST_TOPIC_SUBSCRIPTIONS_PAGE_STR_DESC =
315-
new PageDescriptor<
324+
new PageStreamingDescriptor<
316325
ListTopicSubscriptionsRequest, ListTopicSubscriptionsResponse, String>() {
317326
@Override
318327
public Object emptyToken() {
@@ -337,4 +346,66 @@ public Iterable<String> extractResources(ListTopicSubscriptionsResponse payload)
337346
return payload.getSubscriptionsList();
338347
}
339348
};
349+
350+
private static BundlingDescriptor<PublishRequest, PublishResponse> PUBLISH_BUNDLING_DESC =
351+
new BundlingDescriptor<PublishRequest, PublishResponse>() {
352+
@Override
353+
public String getBundlePartitionKey(PublishRequest request) {
354+
return request.getTopic();
355+
}
356+
357+
@Override
358+
public PublishRequest mergeRequests(Collection<PublishRequest> requests) {
359+
PublishRequest firstRequest = requests.iterator().next();
360+
361+
List<PubsubMessage> elements = new ArrayList<>();
362+
for (PublishRequest request : requests) {
363+
elements.addAll(request.getMessagesList());
364+
}
365+
366+
PublishRequest bundleRequest =
367+
PublishRequest.newBuilder()
368+
.setTopic(firstRequest.getTopic())
369+
.addAllMessages(elements)
370+
.build();
371+
return bundleRequest;
372+
}
373+
374+
@Override
375+
public void splitResponse(
376+
PublishResponse bundleResponse,
377+
Collection<? extends RequestIssuer<PublishRequest, PublishResponse>> bundle) {
378+
int bundleMessageIndex = 0;
379+
for (RequestIssuer<PublishRequest, PublishResponse> responder : bundle) {
380+
List<String> subresponseElements = new ArrayList<>();
381+
int subresponseCount = responder.getRequest().getMessagesCount();
382+
for (int i = 0; i < subresponseCount; i++) {
383+
subresponseElements.add(bundleResponse.getMessageIds(bundleMessageIndex));
384+
bundleMessageIndex += 1;
385+
}
386+
PublishResponse response =
387+
PublishResponse.newBuilder().addAllMessageIds(subresponseElements).build();
388+
responder.setResponse(response);
389+
}
390+
}
391+
392+
@Override
393+
public void splitException(
394+
Throwable throwable,
395+
Collection<? extends RequestIssuer<PublishRequest, PublishResponse>> bundle) {
396+
for (RequestIssuer<PublishRequest, PublishResponse> responder : bundle) {
397+
responder.setException(throwable);
398+
}
399+
}
400+
401+
@Override
402+
public long countElements(PublishRequest request) {
403+
return request.getMessagesCount();
404+
}
405+
406+
@Override
407+
public long countBytes(PublishRequest request) {
408+
return request.getSerializedSize();
409+
}
410+
};
340411
}

0 commit comments

Comments
 (0)