Skip to content

Commit 6003b7a

Browse files
committed
Adds a ready event to BackgroundConsumer to allow waiting for the background thread to start
1 parent aa8e99a commit 6003b7a

File tree

1 file changed

+14
-0
lines changed

1 file changed

+14
-0
lines changed

api_core/google/api_core/bidi.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,7 @@ def __init__(self, bidi_rpc, on_response):
499499
self._wake = threading.Condition()
500500
self._thread = None
501501
self._operational_lock = threading.Lock()
502+
self._ready = threading.Event()
502503

503504
def _on_call_done(self, future):
504505
# Resume the thread if it's paused, this prevents blocking forever
@@ -507,6 +508,7 @@ def _on_call_done(self, future):
507508

508509
def _thread_main(self):
509510
try:
511+
self._ready.set()
510512
self._bidi_rpc.add_done_callback(self._on_call_done)
511513
self._bidi_rpc.open()
512514

@@ -560,6 +562,10 @@ def start(self):
560562
)
561563
thread.daemon = True
562564
thread.start()
565+
# In other places in the code, we rely on `thread.is_alive` which
566+
# isn't sufficient to know if a thread is going to run. To protect
567+
# against races, we use a ready event and wait on it to be set.
568+
self.is_ready.wait()
563569
self._thread = thread
564570
_LOGGER.debug("Started helper thread %s", thread.name)
565571

@@ -598,3 +604,11 @@ def resume(self):
598604
def is_paused(self):
599605
"""bool: True if the response stream is paused."""
600606
return self._paused
607+
608+
@property
609+
def is_ready(self):
610+
"""
611+
threading.Event: Returns an event indicating if this
612+
consumer has started
613+
"""
614+
return self._ready

0 commit comments

Comments
 (0)