|
17 | 17 | from unittest.mock import AsyncMock |
18 | 18 | from google.cloud import _storage_v2 |
19 | 19 |
|
| 20 | +from google.cloud.storage._experimental.asyncio import async_read_object_stream |
20 | 21 | from google.cloud.storage._experimental.asyncio.async_read_object_stream import ( |
21 | 22 | _AsyncReadObjectStream, |
22 | 23 | ) |
@@ -273,3 +274,49 @@ async def test_recv_without_open_should_raise_error( |
273 | 274 |
|
274 | 275 | # assert |
275 | 276 | assert str(exc.value) == "Stream is not open" |
| 277 | + |
| 278 | +@mock.patch("google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc") |
| 279 | +@mock.patch("google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client") |
| 280 | +@pytest.mark.asyncio |
| 281 | +async def test_recv_updates_read_handle_on_refresh(mock_client, mock_cls_async_bidi_rpc): |
| 282 | + """ |
| 283 | + Verify that the `recv` method correctly updates the stream's handle |
| 284 | + when a new one is provided in a server response. |
| 285 | + """ |
| 286 | + # Arrange |
| 287 | + socket_like_rpc = AsyncMock() |
| 288 | + mock_cls_async_bidi_rpc.return_value = socket_like_rpc |
| 289 | + socket_like_rpc.open = AsyncMock() |
| 290 | + |
| 291 | + initial_handle = _storage_v2.BidiReadHandle(handle=b"initial-handle-token") |
| 292 | + response_with_initial_handle = _storage_v2.BidiReadObjectResponse(read_handle=initial_handle) |
| 293 | + response_without_handle = _storage_v2.BidiReadObjectResponse(read_handle=None) |
| 294 | + |
| 295 | + refreshed_handle = _storage_v2.BidiReadHandle(handle=b"new-refreshed-handle-token") |
| 296 | + response_with_refreshed_handle = _storage_v2.BidiReadObjectResponse(read_handle=refreshed_handle) |
| 297 | + |
| 298 | + socket_like_rpc.recv.side_effect = [ |
| 299 | + response_with_initial_handle, |
| 300 | + response_without_handle, |
| 301 | + response_with_refreshed_handle, |
| 302 | + ] |
| 303 | + |
| 304 | + starting_handle = _storage_v2.BidiReadHandle(handle=b"starting-handle-token") |
| 305 | + stream = async_read_object_stream._AsyncReadObjectStream( |
| 306 | + client=mock_client, |
| 307 | + bucket_name=_TEST_BUCKET_NAME, |
| 308 | + object_name=_TEST_OBJECT_NAME, |
| 309 | + read_handle=starting_handle, |
| 310 | + ) |
| 311 | + |
| 312 | + # Act & Assert |
| 313 | + assert stream.read_handle == starting_handle |
| 314 | + |
| 315 | + await stream.open() |
| 316 | + assert stream.read_handle == initial_handle |
| 317 | + |
| 318 | + await stream.recv() |
| 319 | + assert stream.read_handle == initial_handle |
| 320 | + |
| 321 | + await stream.recv() |
| 322 | + assert stream.read_handle == refreshed_handle |
0 commit comments