Skip to content

Commit df185b9

Browse files
authored
chore(pubsub): add subscriber role test for streaming (#9507)
Pulling the messages using a streaming pull should work with accounts having only the pubsub.subscriber role. This commits add a test that covers this aspect.
1 parent cdcc278 commit df185b9

File tree

1 file changed

+40
-0
lines changed

1 file changed

+40
-0
lines changed

pubsub/tests/system.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import datetime
1818
import itertools
1919
import operator as op
20+
import os
2021
import threading
2122
import time
2223

@@ -488,6 +489,45 @@ def test_streaming_pull_max_messages(
488489
finally:
489490
subscription_future.cancel() # trigger clean shutdown
490491

492+
@pytest.mark.skipif(
493+
"KOKORO_GFILE_DIR" not in os.environ,
494+
reason="Requires Kokoro environment with a limited subscriber service account.",
495+
)
496+
def test_streaming_pull_subscriber_permissions_sufficient(
497+
self, publisher, topic_path, subscriber, subscription_path, cleanup
498+
):
499+
500+
# Make sure the topic and subscription get deleted.
501+
cleanup.append((publisher.delete_topic, topic_path))
502+
cleanup.append((subscriber.delete_subscription, subscription_path))
503+
504+
# create a topic and subscribe to it
505+
publisher.create_topic(topic_path)
506+
subscriber.create_subscription(subscription_path, topic_path)
507+
508+
# A service account granting only the pubsub.subscriber role must be used.
509+
filename = os.path.join(
510+
os.environ["KOKORO_GFILE_DIR"], "pubsub-subscriber-service-account.json"
511+
)
512+
streaming_pull_subscriber = type(subscriber).from_service_account_file(filename)
513+
514+
# Subscribe to the topic, publish a message, and verify that subscriber
515+
# successfully pulls and processes it.
516+
callback = StreamingPullCallback(processing_time=0.01, resolve_at_msg_count=1)
517+
future = streaming_pull_subscriber.subscribe(subscription_path, callback)
518+
self._publish_messages(publisher, topic_path, batch_sizes=[1])
519+
520+
try:
521+
callback.done_future.result(timeout=10)
522+
except exceptions.TimeoutError:
523+
pytest.fail(
524+
"Timeout: receiving/processing streamed messages took too long."
525+
)
526+
else:
527+
assert 1 in callback.seen_message_ids
528+
finally:
529+
future.cancel()
530+
491531
def _publish_messages(self, publisher, topic_path, batch_sizes):
492532
"""Publish ``count`` messages in batches and wait until completion."""
493533
publish_futures = []

0 commit comments

Comments
 (0)