Skip to content

Commit

Permalink
[format] recover HadoopFileIO
Browse files Browse the repository at this point in the history
  • Loading branch information
ranxianglei authored and ranxianglei.rxl committed Nov 12, 2024
1 parent bbdd316 commit 10ef09c
Showing 1 changed file with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.hadoop.SerializableConfiguration;
import org.apache.paimon.utils.FunctionWithException;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ReflectionUtils;

import org.apache.hadoop.fs.FSDataInputStream;
Expand All @@ -39,6 +40,9 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

/** Hadoop {@link FileIO}. */
Expand All @@ -48,6 +52,8 @@ public class HadoopFileIO implements FileIO {

protected SerializableConfiguration hadoopConf;

protected transient volatile Map<Pair<String, String>, FileSystem> fsMap;

@VisibleForTesting
public void setFileSystem(Path path, FileSystem fs) throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
Expand Down Expand Up @@ -143,7 +149,26 @@ private FileSystem getFileSystem(
org.apache.hadoop.fs.Path path,
FunctionWithException<org.apache.hadoop.fs.Path, FileSystem, IOException> creator)
throws IOException {
return creator.apply(path);
if (fsMap == null) {
synchronized (this) {
if (fsMap == null) {
fsMap = new ConcurrentHashMap<>();
}
}
}

Map<Pair<String, String>, FileSystem> map = fsMap;

URI uri = path.toUri();
String scheme = uri.getScheme();
String authority = uri.getAuthority();
Pair<String, String> key = Pair.of(scheme, authority);
FileSystem fs = map.get(key);
if (fs == null) {
fs = creator.apply(path);
map.put(key, fs);
}
return fs;
}

protected FileSystem createFileSystem(org.apache.hadoop.fs.Path path) throws IOException {
Expand Down

0 comments on commit 10ef09c

Please sign in to comment.