|
17 | 17 | import datetime
|
18 | 18 | import itertools
|
19 | 19 | import operator as op
|
| 20 | +import os |
20 | 21 | import threading
|
21 | 22 | import time
|
22 | 23 |
|
@@ -488,6 +489,45 @@ def test_streaming_pull_max_messages(
|
488 | 489 | finally:
|
489 | 490 | subscription_future.cancel() # trigger clean shutdown
|
490 | 491 |
|
| 492 | + @pytest.mark.skipif( |
| 493 | + "KOKORO_GFILE_DIR" not in os.environ, |
| 494 | + reason="Requires Kokoro environment with a limited subscriber service account.", |
| 495 | + ) |
| 496 | + def test_streaming_pull_subscriber_permissions_sufficient( |
| 497 | + self, publisher, topic_path, subscriber, subscription_path, cleanup |
| 498 | + ): |
| 499 | + |
| 500 | + # Make sure the topic and subscription get deleted. |
| 501 | + cleanup.append((publisher.delete_topic, topic_path)) |
| 502 | + cleanup.append((subscriber.delete_subscription, subscription_path)) |
| 503 | + |
| 504 | + # create a topic and subscribe to it |
| 505 | + publisher.create_topic(topic_path) |
| 506 | + subscriber.create_subscription(subscription_path, topic_path) |
| 507 | + |
| 508 | + # A service account granting only the pubsub.subscriber role must be used. |
| 509 | + filename = os.path.join( |
| 510 | + os.environ["KOKORO_GFILE_DIR"], "pubsub-subscriber-service-account.json" |
| 511 | + ) |
| 512 | + streaming_pull_subscriber = type(subscriber).from_service_account_file(filename) |
| 513 | + |
| 514 | + # Subscribe to the topic, publish a message, and verify that subscriber |
| 515 | + # successfully pulls and processes it. |
| 516 | + callback = StreamingPullCallback(processing_time=0.01, resolve_at_msg_count=1) |
| 517 | + future = streaming_pull_subscriber.subscribe(subscription_path, callback) |
| 518 | + self._publish_messages(publisher, topic_path, batch_sizes=[1]) |
| 519 | + |
| 520 | + try: |
| 521 | + callback.done_future.result(timeout=10) |
| 522 | + except exceptions.TimeoutError: |
| 523 | + pytest.fail( |
| 524 | + "Timeout: receiving/processing streamed messages took too long." |
| 525 | + ) |
| 526 | + else: |
| 527 | + assert 1 in callback.seen_message_ids |
| 528 | + finally: |
| 529 | + future.cancel() |
| 530 | + |
491 | 531 | def _publish_messages(self, publisher, topic_path, batch_sizes):
|
492 | 532 | """Publish ``count`` messages in batches and wait until completion."""
|
493 | 533 | publish_futures = []
|
|
0 commit comments