🌐 AI搜索 & 代理 主页
Skip to content

Commit b975732

Browse files
authored
feat(experimental): Read handle refresh (#1559)
* feat(experimental): refresh read handles * fix unit tests
1 parent cdbb26c commit b975732

File tree

2 files changed

+52
-1
lines changed

2 files changed

+52
-1
lines changed

google/cloud/storage/_experimental/asyncio/async_read_object_stream.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,11 @@ async def recv(self) -> _storage_v2.BidiReadObjectResponse:
145145
"""
146146
if not self._is_stream_open:
147147
raise ValueError("Stream is not open")
148-
return await self.socket_like_rpc.recv()
148+
response = await self.socket_like_rpc.recv()
149+
# Update read_handle if present in response
150+
if response and response.read_handle:
151+
self.read_handle = response.read_handle
152+
return response
149153

150154
@property
151155
def is_stream_open(self) -> bool:

tests/unit/asyncio/test_async_read_object_stream.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from unittest.mock import AsyncMock
1818
from google.cloud import _storage_v2
1919

20+
from google.cloud.storage._experimental.asyncio import async_read_object_stream
2021
from google.cloud.storage._experimental.asyncio.async_read_object_stream import (
2122
_AsyncReadObjectStream,
2223
)
@@ -273,3 +274,49 @@ async def test_recv_without_open_should_raise_error(
273274

274275
# assert
275276
assert str(exc.value) == "Stream is not open"
277+
278+
@mock.patch("google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc")
279+
@mock.patch("google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client")
280+
@pytest.mark.asyncio
281+
async def test_recv_updates_read_handle_on_refresh(mock_client, mock_cls_async_bidi_rpc):
282+
"""
283+
Verify that the `recv` method correctly updates the stream's handle
284+
when a new one is provided in a server response.
285+
"""
286+
# Arrange
287+
socket_like_rpc = AsyncMock()
288+
mock_cls_async_bidi_rpc.return_value = socket_like_rpc
289+
socket_like_rpc.open = AsyncMock()
290+
291+
initial_handle = _storage_v2.BidiReadHandle(handle=b"initial-handle-token")
292+
response_with_initial_handle = _storage_v2.BidiReadObjectResponse(read_handle=initial_handle)
293+
response_without_handle = _storage_v2.BidiReadObjectResponse(read_handle=None)
294+
295+
refreshed_handle = _storage_v2.BidiReadHandle(handle=b"new-refreshed-handle-token")
296+
response_with_refreshed_handle = _storage_v2.BidiReadObjectResponse(read_handle=refreshed_handle)
297+
298+
socket_like_rpc.recv.side_effect = [
299+
response_with_initial_handle,
300+
response_without_handle,
301+
response_with_refreshed_handle,
302+
]
303+
304+
starting_handle = _storage_v2.BidiReadHandle(handle=b"starting-handle-token")
305+
stream = async_read_object_stream._AsyncReadObjectStream(
306+
client=mock_client,
307+
bucket_name=_TEST_BUCKET_NAME,
308+
object_name=_TEST_OBJECT_NAME,
309+
read_handle=starting_handle,
310+
)
311+
312+
# Act & Assert
313+
assert stream.read_handle == starting_handle
314+
315+
await stream.open()
316+
assert stream.read_handle == initial_handle
317+
318+
await stream.recv()
319+
assert stream.read_handle == initial_handle
320+
321+
await stream.recv()
322+
assert stream.read_handle == refreshed_handle

0 commit comments

Comments
 (0)