Skip to content

Conversation

chandra-siri
Copy link

@chandra-siri chandra-siri commented Aug 14, 2025

feat: support for async bidi streaming apis

Further details can be found here

Fixes #834

@product-auto-label product-auto-label bot added the size: l Pull request size is large. label Aug 14, 2025
@chandra-siri chandra-siri marked this pull request as ready for review August 30, 2025 17:13
@chandra-siri chandra-siri requested review from a team as code owners August 30, 2025 17:13
@chandra-siri chandra-siri requested a review from ohmayr September 8, 2025 19:25
chandra-siri added a commit to chandra-siri/python-storage that referenced this pull request Sep 17, 2025
these files will be removed once googleapis/python-api-core#836
gets submitted
chandra-siri added a commit to googleapis/python-storage that referenced this pull request Sep 18, 2025
* Add async bidiRpc files in python-storage

these files will be removed once googleapis/python-api-core#836
gets submitted

* fix import path for bidi_base
Copy link
Contributor

@vchudnov-g vchudnov-g left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I have some small comments, and a bunch of nits. I'll TAL at the test next week.

otherwise open-ended set of requests to send through a request-streaming
(or bidirectional) RPC.
The reason this is necessary is because it lets the user have control on
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider keeping (and clarifying?) the "The reason this is necessary..." paragraph from _RequestQueueGenerator, especially since the next pargraph below also talks about technical details. Or we could put both technical details in comments rather than in the docstring.

WDYT?

Copy link
Author

@chandra-siri chandra-siri Oct 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I've moved this paragraph and next two to class level comments, PTAL and resolve the comments if it looks good.


request_generator.call = call

if hasattr(call, "_wrapped"): # pragma: NO COVER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was the TODO in the original file addressed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure, is there any issue number ? or internal bugId ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't have an issue. You could create one, but at least copy the text: # TODO: api_core should expose the future interface for wrapped callables as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issue number (you could create one), but let's at least include the text: # TODO: api_core should expose the future interface for wrapped callables as well.

@vchudnov-g
Copy link
Contributor

Adding @daniel-sanche for visibility.

@ohmayr , do you have any remaining concerns with the code?

@chandra-siri
Copy link
Author

Looks good. I have some small comments, and a bunch of nits. I'll TAL at the test next week.

@vchudnov-g Addressed your comments. PTAL

Copy link
Contributor

@vchudnov-g vchudnov-g left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. We're almost there. Just some minor comments, and I want to ping @daniel-sanche again in case he has any comments.

_BIDIRECTIONAL_CONSUMER_NAME = "Thread-ConsumeBidirectionalStream"


# The reason this is necessary is because gRPC takes an iterator as the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Consider moving this comment block inside the method (after the docstring)


request_generator.call = call

if hasattr(call, "_wrapped"): # pragma: NO COVER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't have an issue. You could create one, but at least copy the text: # TODO: api_core should expose the future interface for wrapped callables as well.


request_generator.call = call

if hasattr(call, "_wrapped"): # pragma: NO COVER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issue number (you could create one), but let's at least include the text: # TODO: api_core should expose the future interface for wrapped callables as well.


assert await anext(aiter(generator)) == mock.sentinel.A

async def test_yield_initial_callable_and_exit(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between this test function and the previous test_yield_initial_and_exit? The bodies look the same. What am I missing?

assert await anext(aiter(generator))
assert (
exc_info.value.args[0]
== "Inactive call, replacing item on queue and exiting request generator."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

semi-nit: It would be more robust if you could compare a single unique identifier in the exception (failing that, a shorter substring), because we could conceivably want to change the string in the future.

generator.call = call

with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(anext(aiter(generator)), timeout=1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we handling the timeouts in the test? I'd like to minimize the amount of real-time we spend testing. To that end:

  • could the timeout here be even shorter, like a hundredth of a second?
  • in other test functions that don't have a timeout, is anything delaying the async test data from being streamed back? Where is that set? Could we minimize any such delays as well?

If I'm understanding this correctly, the timeout will happen here simply because there is nothing in the queue, right? But comparing to the previous test case, would this create raise a StopAsyncIteration exception? Is that then caught by asyncio.wait_for? In that case, does the value of the timeout matter here? Adding some comments here might help readers.

@pytest.mark.asyncio
@pytest.mark.skipif(
sys.version_info < (3, 8), # type: ignore[operator]
reason="Python 3.8 below doesnt provide support for assert_awaited_once",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
reason="Python 3.8 below doesnt provide support for assert_awaited_once",
reason="Versions of Python below 3.8 don't provide support for assert_awaited_once",

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size: l Pull request size is large.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Need an async version of Bidi.py
3 participants