3838import com .google .api .gax .core .RetryParams ;
3939import com .google .api .gax .grpc .ApiCallSettings ;
4040import com .google .api .gax .grpc .ApiCallable .ApiCallableBuilder ;
41+ import com .google .api .gax .grpc .ApiCallable .BundlableApiCallableBuilder ;
4142import com .google .api .gax .grpc .ApiCallable .PageStreamingApiCallableBuilder ;
42- import com .google .api .gax .grpc .PageDescriptor ;
43+ import com .google .api .gax .grpc .BundlingDescriptor ;
44+ import com .google .api .gax .grpc .PageStreamingDescriptor ;
45+ import com .google .api .gax .grpc .RequestIssuer ;
4346import com .google .api .gax .grpc .ServiceApiSettings ;
4447import com .google .common .collect .ImmutableList ;
4548import com .google .common .collect .ImmutableMap ;
5659import com .google .pubsub .v1 .PublishRequest ;
5760import com .google .pubsub .v1 .PublishResponse ;
5861import com .google .pubsub .v1 .PublisherGrpc ;
62+ import com .google .pubsub .v1 .PubsubMessage ;
5963import com .google .pubsub .v1 .Topic ;
6064import io .grpc .Status ;
65+ import java .util .ArrayList ;
66+ import java .util .Collection ;
67+ import java .util .List ;
6168
6269// Manually-added imports: add custom (non-generated) imports after this point.
6370
@@ -132,9 +139,11 @@ public class PublisherSettings extends ServiceApiSettings {
132139 RETRY_PARAM_DEFINITIONS = definitions .build ();
133140 }
134141
142+ private final MethodBuilders methods ;
143+
135144 private static class MethodBuilders {
136145 private final ApiCallableBuilder <Topic , Topic > createTopicMethod ;
137- private final ApiCallableBuilder <PublishRequest , PublishResponse > publishMethod ;
146+ private final BundlableApiCallableBuilder <PublishRequest , PublishResponse > publishMethod ;
138147 private final ApiCallableBuilder <GetTopicRequest , Topic > getTopicMethod ;
139148 private final PageStreamingApiCallableBuilder <ListTopicsRequest , ListTopicsResponse , Topic >
140149 listTopicsMethod ;
@@ -149,7 +158,8 @@ public MethodBuilders() {
149158 createTopicMethod .setRetryableCodes (RETRYABLE_CODE_DEFINITIONS .get ("idempotent" ));
150159 createTopicMethod .setRetryParams (RETRY_PARAM_DEFINITIONS .get ("default" ));
151160
152- publishMethod = new ApiCallableBuilder <>(PublisherGrpc .METHOD_PUBLISH );
161+ publishMethod =
162+ new BundlableApiCallableBuilder <>(PublisherGrpc .METHOD_PUBLISH , PUBLISH_BUNDLING_DESC );
153163 publishMethod .setRetryableCodes (RETRYABLE_CODE_DEFINITIONS .get ("non_idempotent" ));
154164 publishMethod .setRetryParams (RETRY_PARAM_DEFINITIONS .get ("default" ));
155165
@@ -187,8 +197,6 @@ public MethodBuilders() {
187197 }
188198 }
189199
190- private final MethodBuilders methods ;
191-
192200 // ===============
193201 // Factory Methods
194202 // ===============
@@ -211,8 +219,9 @@ public static PublisherSettings create() {
211219 }
212220
213221 /**
214- * Constructs an instance of PublisherSettings with default settings. This is protected so that it
215- * easy to make a subclass, but otherwise, the static factory methods should be preferred.
222+ * Constructs an instance of PublisherSettings with default settings. This is protected
223+ * so that it easy to make a subclass, but otherwise, the static factory methods should be
224+ * preferred.
216225 *
217226 * <!-- manual edit -->
218227 * <!-- end manual edit -->
@@ -223,7 +232,7 @@ protected PublisherSettings(MethodBuilders methods) {
223232 }
224233
225234 /**
226- * Returns the ApiCallableBuilder for the API method createTopic.
235+ * Returns the builder for the API method createTopic.
227236 *
228237 * <!-- manual edit -->
229238 * <!-- end manual edit -->
@@ -233,17 +242,17 @@ public ApiCallableBuilder<Topic, Topic> createTopicMethod() {
233242 }
234243
235244 /**
236- * Returns the ApiCallableBuilder for the API method publish.
245+ * Returns the builder for the API method publish.
237246 *
238247 * <!-- manual edit -->
239248 * <!-- end manual edit -->
240249 */
241- public ApiCallableBuilder <PublishRequest , PublishResponse > publishMethod () {
250+ public BundlableApiCallableBuilder <PublishRequest , PublishResponse > publishMethod () {
242251 return methods .publishMethod ;
243252 }
244253
245254 /**
246- * Returns the ApiCallableBuilder for the API method getTopic.
255+ * Returns the builder for the API method getTopic.
247256 *
248257 * <!-- manual edit -->
249258 * <!-- end manual edit -->
@@ -253,7 +262,7 @@ public ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod() {
253262 }
254263
255264 /**
256- * Returns the PageStreamingApiCallableBuilder for the API method listTopics.
265+ * Returns the builder for the API method listTopics.
257266 *
258267 * <!-- manual edit -->
259268 * <!-- end manual edit -->
@@ -264,7 +273,7 @@ public ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod() {
264273 }
265274
266275 /**
267- * Returns the PageStreamingApiCallableBuilder for the API method listTopicSubscriptions.
276+ * Returns the builder for the API method listTopicSubscriptions.
268277 *
269278 * <!-- manual edit -->
270279 * <!-- end manual edit -->
@@ -276,7 +285,7 @@ public ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod() {
276285 }
277286
278287 /**
279- * Returns the ApiCallableBuilder for the API method deleteTopic.
288+ * Returns the builder for the API method deleteTopic.
280289 *
281290 * <!-- manual edit -->
282291 * <!-- end manual edit -->
@@ -285,9 +294,9 @@ public ApiCallableBuilder<DeleteTopicRequest, Empty> deleteTopicMethod() {
285294 return methods .deleteTopicMethod ;
286295 }
287296
288- private static PageDescriptor <ListTopicsRequest , ListTopicsResponse , Topic >
297+ private static PageStreamingDescriptor <ListTopicsRequest , ListTopicsResponse , Topic >
289298 LIST_TOPICS_PAGE_STR_DESC =
290- new PageDescriptor <ListTopicsRequest , ListTopicsResponse , Topic >() {
299+ new PageStreamingDescriptor <ListTopicsRequest , ListTopicsResponse , Topic >() {
291300 @ Override
292301 public Object emptyToken () {
293302 return "" ;
@@ -309,10 +318,10 @@ public Iterable<Topic> extractResources(ListTopicsResponse payload) {
309318 }
310319 };
311320
312- private static PageDescriptor <
321+ private static PageStreamingDescriptor <
313322 ListTopicSubscriptionsRequest , ListTopicSubscriptionsResponse , String >
314323 LIST_TOPIC_SUBSCRIPTIONS_PAGE_STR_DESC =
315- new PageDescriptor <
324+ new PageStreamingDescriptor <
316325 ListTopicSubscriptionsRequest , ListTopicSubscriptionsResponse , String >() {
317326 @ Override
318327 public Object emptyToken () {
@@ -337,4 +346,66 @@ public Iterable<String> extractResources(ListTopicSubscriptionsResponse payload)
337346 return payload .getSubscriptionsList ();
338347 }
339348 };
349+
350+ private static BundlingDescriptor <PublishRequest , PublishResponse > PUBLISH_BUNDLING_DESC =
351+ new BundlingDescriptor <PublishRequest , PublishResponse >() {
352+ @ Override
353+ public String getBundlePartitionKey (PublishRequest request ) {
354+ return request .getTopic ();
355+ }
356+
357+ @ Override
358+ public PublishRequest mergeRequests (Collection <PublishRequest > requests ) {
359+ PublishRequest firstRequest = requests .iterator ().next ();
360+
361+ List <PubsubMessage > elements = new ArrayList <>();
362+ for (PublishRequest request : requests ) {
363+ elements .addAll (request .getMessagesList ());
364+ }
365+
366+ PublishRequest bundleRequest =
367+ PublishRequest .newBuilder ()
368+ .setTopic (firstRequest .getTopic ())
369+ .addAllMessages (elements )
370+ .build ();
371+ return bundleRequest ;
372+ }
373+
374+ @ Override
375+ public void splitResponse (
376+ PublishResponse bundleResponse ,
377+ Collection <? extends RequestIssuer <PublishRequest , PublishResponse >> bundle ) {
378+ int bundleMessageIndex = 0 ;
379+ for (RequestIssuer <PublishRequest , PublishResponse > responder : bundle ) {
380+ List <String > subresponseElements = new ArrayList <>();
381+ int subresponseCount = responder .getRequest ().getMessagesCount ();
382+ for (int i = 0 ; i < subresponseCount ; i ++) {
383+ subresponseElements .add (bundleResponse .getMessageIds (bundleMessageIndex ));
384+ bundleMessageIndex += 1 ;
385+ }
386+ PublishResponse response =
387+ PublishResponse .newBuilder ().addAllMessageIds (subresponseElements ).build ();
388+ responder .setResponse (response );
389+ }
390+ }
391+
392+ @ Override
393+ public void splitException (
394+ Throwable throwable ,
395+ Collection <? extends RequestIssuer <PublishRequest , PublishResponse >> bundle ) {
396+ for (RequestIssuer <PublishRequest , PublishResponse > responder : bundle ) {
397+ responder .setException (throwable );
398+ }
399+ }
400+
401+ @ Override
402+ public long countElements (PublishRequest request ) {
403+ return request .getMessagesCount ();
404+ }
405+
406+ @ Override
407+ public long countBytes (PublishRequest request ) {
408+ return request .getSerializedSize ();
409+ }
410+ };
340411}
0 commit comments