Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Race condition in FileSystems initialisation #33965

Open
1 of 17 tasks
JozoVilcek opened this issue Feb 12, 2025 · 8 comments · May be fixed by #34028
Open
1 of 17 tasks

[Bug]: Race condition in FileSystems initialisation #33965

JozoVilcek opened this issue Feb 12, 2025 · 8 comments · May be fixed by #34028

Comments

@JozoVilcek
Copy link
Contributor

What happened?

When method FileSystems.setDefaultPipelineOptions() is called form multiple threads followed by using the FileSystems, then some task will fail as FILESYSTEM_REVISION will already be set but SCHEME_TO_FILESYSTEM is not yet initialised.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@Abacn
Copy link
Contributor

Abacn commented Feb 18, 2025

FileSystems.setDefaultPipelineOptions() should be called once on worker startup. It is an internal method and is not supposed to be used outside worker. Would you mind sharing stacktrace so we can identify excessive call in Beam code path?

@liferoad
Copy link
Contributor

#34007 (comment)

@github-actions github-actions bot added this to the 2.64.0 Release milestone Feb 18, 2025
@JozoVilcek
Copy link
Contributor Author

@liferoad I wonder why is this closed as it is still being discussed. Perhaps unwanted closed?

@Abacn , I admit my use-case is not a standard one. I am in a mixed pure Spark and Beam on Spark runner environment. I did came across this when in pure spark I am trying to reuse filesystem utility and was calling FileSystems.setDefaultPipelineOptions() manually. I was trying to use registerFileSystemsOnce() but that will not help me since setDefaultPipelineOptions() is called from inside SerializablePipelineOptions.

I understand that API is internal and subject to change. But still I believe there is race condition in the init.

@Abacn , can you share how does workers ensure this is consistently initialised under multithreaded execution. Maybe it can inspire me with some thoughts. I was not able to find it hence I believe that in Beam the race is there perhaps too just initi sequence is more loaded therefore from initial call to setDefaultPipelineOptions() to actual user code using the filesystem takes more time and registration finished. My code path is much much shorter and surface this.

@liferoad liferoad reopened this Feb 19, 2025
@liferoad
Copy link
Contributor

will my PR #34007 fix the issue you have? I closed this since I thought #34007 (comment) could resolve your problem. Reopen this.

@Abacn
Copy link
Contributor

Abacn commented Feb 19, 2025

setDefaultPipelineOptions() is called from inside SerializablePipelineOptions.

This class is used by Spark/Flink/Samza/Jet runners. A fix could be move to use registerFileSystemsOnce() in SerializablePipelineOptions.

@Abacn
Copy link
Contributor

Abacn commented Feb 19, 2025

This issue is the same as #33965. SerializablePipelineOptions shouldn't call FileSystems.setDefaultPipelineOption

@JozoVilcek
Copy link
Contributor Author

will my PR #34007 fix the issue you have? I closed this since I thought #34007 (comment) could resolve your problem. Reopen this.

@liferoad yes, something like that would fix the problem

This class is used by Spark/Flink/Samza/Jet runners. A fix could be move to use registerFileSystemsOnce() in SerializablePipelineOptions.

@Abacn That fix works for my specific use case, but the underlying issue remains for anyone using setDefaultPipelineOptions(). The root cause is that the method does not update revisions and schemas simultaneously, making it susceptible to race conditions in a multi-threaded environment.

@Abacn
Copy link
Contributor

Abacn commented Feb 19, 2025

the method does not update revisions and schemas simultaneously,

setDefaultPipelineOptions() is supposed to be called once, per job, on worker, at the beginning of pipeline execution. Because it initializes FileSystems with a given PipelineOption.

A second invocation will overwrite the PipelineOption in FileSystem, if it is still used by the first pipeline, then the first pipeline is running into inconsistent state where its FileSystem interface are overriden by others

However it is used in other place that really shouldn't to, to be able to use Beam FileSystem outside pipeline execution, and SerializablePipelineOptions constructor is one of it.

So I think a proper fix is to eliminate setDefaultPipelineOptions and change to registerFileSystemsOnce

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment