🌐 AI搜索 & 代理 主页
Skip to content

dask unstable; multiple race condition errors: P2PConsistencyError, RuntimeError, FutureCancelledError ... #9163

@lbesnard

Description

@lbesnard

Hi,

I'm encountering repeated failures during xarray.Dataset.to_zarr() when using Dask Distributed.
The failures occur during the Dask shuffle/rechunk phase and appear to be caused by instability in the P2P shuffle as well as many other dask issues.

While processing a batch of NetCDF to zarr, I end up having to create a horrible code which has to catch all possible dask issues, so that, the cluster/client gets destroyed, recreated, and the failed batch of files reprocessed.
Even though recreating the cluster allows the batch to succeed on retry, it shouldn't be required.

Below are all the dask errors I have to catch in my code.

   
    batch_is_processed = False
    while not batch_is_processed: 
         try:
                with self.lock:
                    ds.to_zarr(
                        self.store,
                        mode="w",  # Overwrite mode for the first batch
                        write_empty_chunks=self.write_empty_chunks,
                        compute=True,  # Compute the result immediately
                        consolidated=self.consolidated,
                        safe_chunks=self.safe_chunks,
                        align_chunks=self.align_chunks,
                    )
                    batch_is_processed = True
             except (FutureCancelledError, P2PConsistencyError, RuntimeError) as e:
                    error_text = str(e)

                    SHUFFLE_KEYWORDS = [
                        "P2P",
                        "failed during transfer phase",
                        "failed during barrier phase",
                        "failed during shuffle phase",
                        "shuffle failure",
                        "No active shuffle",
                        "Unexpected error encountered during P2P",
                    ]

                    CONNECTION_KEYWORDS = [
                        "Timed out trying to connect",
                        "CancelledError",
                        "Too many open files",
                    ]

                    DESERIALISATION_KEYWORDS = [
                        "Error during deserialization",
                        "different environments",
                    ]

                    # Determine if the error should trigger a retry (cluster reset)
                    retryable = any(
                        keyword in error_text
                        for keyword in (
                            SHUFFLE_KEYWORDS
                            + CONNECTION_KEYWORDS
                            + DESERIALISATION_KEYWORDS
                        )
                    )

                    # RuntimeError that is NOT retryable -> treat as normal exception
                    if isinstance(e, RuntimeError) and not retryable:
                        # Treat as a regular exception: fallback to individual processing
                        # code removed for simplification
                    else:
                            self._reset_cluster()
                            batch_is_processed = False
                            # ... the rest of the logic is in a while loop

I'd like to highlight that my workers/schedulers are only using 50% of cpu/mem max. I've tried many different settings, and ended up having to create workers only with 1 thread if I don't want to run into more race conditions.

Errors Observed

P2PConsistencyError: No active shuffle with id='868df3e600f2968e0cd678a103d355d2' found
RuntimeError: P2P 43b3a653ad1c988df817f013d7314141 failed during transfer phase
OSError: Timed out trying to connect to tls://<worker-ip>:8786 after 30 s
asyncio.exceptions.CancelledError

...

Minimal Complete Verifiable Example:

# Put your MCVE code here

Anything else we need to know?:

Environment:

  • Dask version: 2025.11.0
  • Python version:
  • Operating System:
  • Install method (conda, pip, source):

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething is brokenneeds infoNeeds further information from the userneeds reproducer

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions