-
Notifications
You must be signed in to change notification settings - Fork 92
feat: support for async bidi streaming apis #836
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: support for async bidi streaming apis #836
Conversation
python versions <= 3.10 dont support `async with asyncio.timeout`
provide support for the same.
…into feat/834-bidi-async-support
these files will be removed once googleapis/python-api-core#836 gets submitted
* Add async bidiRpc files in python-storage these files will be removed once googleapis/python-api-core#836 gets submitted * fix import path for bidi_base
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. I have some small comments, and a bunch of nits. I'll TAL at the test next week.
google/api_core/bidi_async.py
Outdated
otherwise open-ended set of requests to send through a request-streaming | ||
(or bidirectional) RPC. | ||
The reason this is necessary is because it lets the user have control on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider keeping (and clarifying?) the "The reason this is necessary..." paragraph from _RequestQueueGenerator
, especially since the next pargraph below also talks about technical details. Or we could put both technical details in comments rather than in the docstring.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, I've moved this paragraph and next two to class level comments, PTAL and resolve the comments if it looks good.
|
||
request_generator.call = call | ||
|
||
if hasattr(call, "_wrapped"): # pragma: NO COVER |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was the TODO in the original file addressed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure, is there any issue number ? or internal bugId ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't have an issue. You could create one, but at least copy the text: # TODO: api_core should expose the future interface for wrapped callables as well
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No issue number (you could create one), but let's at least include the text: # TODO: api_core should expose the future interface for wrapped callables as well.
Adding @daniel-sanche for visibility. @ohmayr , do you have any remaining concerns with the code? |
…into feat/834-bidi-async-support
@vchudnov-g Addressed your comments. PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. We're almost there. Just some minor comments, and I want to ping @daniel-sanche again in case he has any comments.
_BIDIRECTIONAL_CONSUMER_NAME = "Thread-ConsumeBidirectionalStream" | ||
|
||
|
||
# The reason this is necessary is because gRPC takes an iterator as the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Consider moving this comment block inside the method (after the docstring)
|
||
request_generator.call = call | ||
|
||
if hasattr(call, "_wrapped"): # pragma: NO COVER |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't have an issue. You could create one, but at least copy the text: # TODO: api_core should expose the future interface for wrapped callables as well
.
|
||
request_generator.call = call | ||
|
||
if hasattr(call, "_wrapped"): # pragma: NO COVER |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No issue number (you could create one), but let's at least include the text: # TODO: api_core should expose the future interface for wrapped callables as well.
|
||
assert await anext(aiter(generator)) == mock.sentinel.A | ||
|
||
async def test_yield_initial_callable_and_exit(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the difference between this test function and the previous test_yield_initial_and_exit
? The bodies look the same. What am I missing?
assert await anext(aiter(generator)) | ||
assert ( | ||
exc_info.value.args[0] | ||
== "Inactive call, replacing item on queue and exiting request generator." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
semi-nit: It would be more robust if you could compare a single unique identifier in the exception (failing that, a shorter substring), because we could conceivably want to change the string in the future.
generator.call = call | ||
|
||
with pytest.raises(asyncio.TimeoutError): | ||
await asyncio.wait_for(anext(aiter(generator)), timeout=1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are we handling the timeouts in the test? I'd like to minimize the amount of real-time we spend testing. To that end:
- could the
timeout
here be even shorter, like a hundredth of a second? - in other test functions that don't have a timeout, is anything delaying the async test data from being streamed back? Where is that set? Could we minimize any such delays as well?
If I'm understanding this correctly, the timeout will happen here simply because there is nothing in the queue, right? But comparing to the previous test case, would this create raise a StopAsyncIteration
exception? Is that then caught by asyncio.wait_for
? In that case, does the value of the timeout matter here? Adding some comments here might help readers.
@pytest.mark.asyncio | ||
@pytest.mark.skipif( | ||
sys.version_info < (3, 8), # type: ignore[operator] | ||
reason="Python 3.8 below doesnt provide support for assert_awaited_once", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reason="Python 3.8 below doesnt provide support for assert_awaited_once", | |
reason="Versions of Python below 3.8 don't provide support for assert_awaited_once", |
feat: support for async bidi streaming apis
Further details can be found here
Fixes #834