-
-
Notifications
You must be signed in to change notification settings - Fork 750
Description
Hi,
I'm encountering repeated failures during xarray.Dataset.to_zarr() when using Dask Distributed.
The failures occur during the Dask shuffle/rechunk phase and appear to be caused by instability in the P2P shuffle as well as many other dask issues.
While processing a batch of NetCDF to zarr, I end up having to create a horrible code which has to catch all possible dask issues, so that, the cluster/client gets destroyed, recreated, and the failed batch of files reprocessed.
Even though recreating the cluster allows the batch to succeed on retry, it shouldn't be required.
Below are all the dask errors I have to catch in my code.
batch_is_processed = False
while not batch_is_processed:
try:
with self.lock:
ds.to_zarr(
self.store,
mode="w", # Overwrite mode for the first batch
write_empty_chunks=self.write_empty_chunks,
compute=True, # Compute the result immediately
consolidated=self.consolidated,
safe_chunks=self.safe_chunks,
align_chunks=self.align_chunks,
)
batch_is_processed = True
except (FutureCancelledError, P2PConsistencyError, RuntimeError) as e:
error_text = str(e)
SHUFFLE_KEYWORDS = [
"P2P",
"failed during transfer phase",
"failed during barrier phase",
"failed during shuffle phase",
"shuffle failure",
"No active shuffle",
"Unexpected error encountered during P2P",
]
CONNECTION_KEYWORDS = [
"Timed out trying to connect",
"CancelledError",
"Too many open files",
]
DESERIALISATION_KEYWORDS = [
"Error during deserialization",
"different environments",
]
# Determine if the error should trigger a retry (cluster reset)
retryable = any(
keyword in error_text
for keyword in (
SHUFFLE_KEYWORDS
+ CONNECTION_KEYWORDS
+ DESERIALISATION_KEYWORDS
)
)
# RuntimeError that is NOT retryable -> treat as normal exception
if isinstance(e, RuntimeError) and not retryable:
# Treat as a regular exception: fallback to individual processing
# code removed for simplification
else:
self._reset_cluster()
batch_is_processed = False
# ... the rest of the logic is in a while loopI'd like to highlight that my workers/schedulers are only using 50% of cpu/mem max. I've tried many different settings, and ended up having to create workers only with 1 thread if I don't want to run into more race conditions.
Errors Observed
P2PConsistencyError: No active shuffle with id='868df3e600f2968e0cd678a103d355d2' found
RuntimeError: P2P 43b3a653ad1c988df817f013d7314141 failed during transfer phase
OSError: Timed out trying to connect to tls://<worker-ip>:8786 after 30 s
asyncio.exceptions.CancelledError
...
Minimal Complete Verifiable Example:
# Put your MCVE code hereAnything else we need to know?:
Environment:
- Dask version: 2025.11.0
- Python version:
- Operating System:
- Install method (conda, pip, source):