diff --git a/core/src/main/java/io/kestra/core/runners/LocalWorkingDir.java b/core/src/main/java/io/kestra/core/runners/LocalWorkingDir.java index 7e99c1ed51d..03c79a8bae0 100644 --- a/core/src/main/java/io/kestra/core/runners/LocalWorkingDir.java +++ b/core/src/main/java/io/kestra/core/runners/LocalWorkingDir.java @@ -1,21 +1,24 @@ package io.kestra.core.runners; import io.kestra.core.utils.IdUtils; +import io.kestra.core.utils.PathMatcherPredicate; import org.apache.commons.io.FileUtils; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; -import java.nio.file.FileSystem; -import java.nio.file.FileSystems; +import java.io.InputStream; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.PathMatcher; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.function.Predicate; + +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; /** * Default implementation of the {@link WorkingDir}. @@ -128,7 +131,7 @@ public Path createTempFile(final byte[] content, final String extension) throws **/ @Override public Path createFile(String filename) throws IOException { - return createFile(filename, null); + return createFile(filename, (InputStream) null); } /** @@ -136,18 +139,54 @@ public Path createFile(String filename) throws IOException { **/ @Override public Path createFile(String filename, byte[] content) throws IOException { + return createFile(filename, content == null ? null : new ByteArrayInputStream(content)); + } + + /** + * {@inheritDoc} + **/ + @Override + public Path createFile(String filename, InputStream content) throws IOException { if (filename == null || filename.isBlank()) { throw new IllegalArgumentException("Cannot create a working directory file with a null or empty name"); } Path newFilePath = this.resolve(Path.of(filename)); Files.createDirectories(newFilePath.getParent()); - Path file = Files.createFile(newFilePath); + + Files.createFile(newFilePath); if (content != null) { - Files.write(file, content); + try (content) { + Files.copy(content, newFilePath, REPLACE_EXISTING); + } } - return file; + return newFilePath; + } + + /** + * {@inheritDoc} + **/ + @Override + public Path putFile(Path path, InputStream inputStream) throws IOException { + if (path == null) { + throw new IllegalArgumentException("Cannot create a working directory file with a null path"); + } + if (inputStream == null) { + throw new IllegalArgumentException("Cannot create a working directory file with an empty inputStream"); + } + Path newFilePath = this.resolve(path); + Files.createDirectories(newFilePath.getParent()); + + if (Files.notExists(newFilePath)) { + Files.createFile(newFilePath); + } + + try (inputStream) { + Files.copy(inputStream, newFilePath, REPLACE_EXISTING); + } + + return newFilePath; } /** @@ -158,7 +197,7 @@ public List findAllFilesMatching(final List patterns) throws IOExc if (patterns == null || patterns.isEmpty()) { return Collections.emptyList(); } - MatcherFileVisitor visitor = new MatcherFileVisitor(path(), patterns); + MatcherFileVisitor visitor = new MatcherFileVisitor(PathMatcherPredicate.matches(path(), patterns)); Files.walkFileTree(path(), visitor); return visitor.getMatchedFiles(); } @@ -175,23 +214,16 @@ public void cleanup() throws IOException { private static class MatcherFileVisitor extends SimpleFileVisitor { - private static final String SYNTAX_GLOB = "glob:"; - private static final String SYNTAX_REGEX = "regex:"; - - private final List matchers; + private final Predicate predicate; private final List matchedFiles = new ArrayList<>(); - public MatcherFileVisitor(final Path basePath, final List patterns) { - FileSystem fs = FileSystems.getDefault(); - this.matchers = patterns.stream() - .map(pattern -> { - var syntaxAndPattern = isPrefixWithSyntax(pattern) ? pattern : SYNTAX_GLOB + basePath + addLeadingSlash(pattern); - return fs.getPathMatcher(syntaxAndPattern); - }) - .toList(); + public MatcherFileVisitor(final Predicate predicate) { + this.predicate = predicate; } - /** {@inheritDoc} **/ + /** + * {@inheritDoc} + **/ @Override public FileVisitResult visitFile(final Path path, final BasicFileAttributes basicFileAttributes) { if (!basicFileAttributes.isRegularFile()) { @@ -199,7 +231,7 @@ public FileVisitResult visitFile(final Path path, final BasicFileAttributes basi return FileVisitResult.CONTINUE; } - if (matchers.stream().anyMatch(pathMatcher -> pathMatcher.matches(path))) { + if (predicate.test(path)) { matchedFiles.add(path); } @@ -209,13 +241,5 @@ public FileVisitResult visitFile(final Path path, final BasicFileAttributes basi public List getMatchedFiles() { return matchedFiles; } - - private static String addLeadingSlash(final String path) { - return path.startsWith("/") ? path : "/" + path; - } - - private static boolean isPrefixWithSyntax(final String pattern) { - return pattern.startsWith(SYNTAX_REGEX) | pattern.startsWith(SYNTAX_GLOB); - } } } diff --git a/core/src/main/java/io/kestra/core/runners/NamespaceFilesService.java b/core/src/main/java/io/kestra/core/runners/NamespaceFilesService.java index 196c14f87b3..e6d08965b44 100644 --- a/core/src/main/java/io/kestra/core/runners/NamespaceFilesService.java +++ b/core/src/main/java/io/kestra/core/runners/NamespaceFilesService.java @@ -1,70 +1,53 @@ package io.kestra.core.runners; import io.kestra.core.models.tasks.NamespaceFiles; +import io.kestra.core.storages.InternalNamespace; +import io.kestra.core.storages.Namespace; +import io.kestra.core.storages.NamespaceFile; import io.kestra.core.storages.StorageContext; import io.kestra.core.storages.StorageInterface; +import io.kestra.core.utils.Rethrow; import io.micronaut.core.annotation.Nullable; import jakarta.inject.Inject; import jakarta.inject.Singleton; -import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; import java.net.URI; -import java.nio.file.FileSystems; -import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; - -import static io.kestra.core.utils.Rethrow.throwConsumer; -import static io.kestra.core.utils.Rethrow.throwPredicate; -import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; +@Deprecated @Singleton -@Slf4j public class NamespaceFilesService { + + private static final Logger log = LoggerFactory.getLogger(NamespaceFilesService.class); + + private final StorageInterface storageInterface; + @Inject - private StorageInterface storageInterface; + public NamespaceFilesService(final StorageInterface storageInterface) { + this.storageInterface = storageInterface; + } + @Deprecated public List inject(RunContext runContext, String tenantId, String namespace, Path basePath, NamespaceFiles namespaceFiles) throws Exception { if (!namespaceFiles.getEnabled()) { return Collections.emptyList(); } - List list = recursiveList(tenantId, namespace, null); - - list = list - .stream() - .filter(throwPredicate(f -> { - var file = f.getPath(); - - if (namespaceFiles.getExclude() != null) { - boolean b = match(runContext.render(namespaceFiles.getExclude()), file); - - if (b) { - return false; - } - } - - if (namespaceFiles.getInclude() != null) { - boolean b = match(namespaceFiles.getInclude(), file); - - if (!b) { - return false; - } - } - - return true; - })) - .collect(Collectors.toList()); + List matchedNamespaceFiles = namespaceFor(tenantId, namespace) + .findAllFilesMatching(namespaceFiles.getInclude(), namespaceFiles.getExclude()); - copy(tenantId, namespace, basePath, list); - - return list; + matchedNamespaceFiles.forEach(Rethrow.throwConsumer(namespaceFile -> { + InputStream content = storageInterface.get(tenantId, namespaceFile.uri()); + runContext.workingDir().putFile(namespaceFile.path(), content); + })); + return matchedNamespaceFiles.stream().map(NamespaceFile::path).map(Path::toUri).toList(); } public URI uri(String namespace, @Nullable URI path) { @@ -75,61 +58,41 @@ public URI uri(String namespace, @Nullable URI path) { } public List recursiveList(String tenantId, String namespace, @Nullable URI path) throws IOException { - return this.recursiveList(tenantId, namespace, path, false); + return recursiveList(tenantId, namespace, path, false); } public List recursiveList(String tenantId, String namespace, @Nullable URI path, boolean includeDirectoryEntries) throws IOException { - return storageInterface.allByPrefix(tenantId, URI.create(this.uri(namespace, path) + "/"), includeDirectoryEntries) - // We get rid of Kestra schema as we want to work on a folder-like basis - .stream().map(URI::getPath) - .map(URI::create) - .map(uri -> URI.create("/" + this.uri(namespace, null).relativize(uri))) + return namespaceFor(tenantId, namespace) + .all(path.getPath(), includeDirectoryEntries) + .stream() + .map(NamespaceFile::path) + .map(Path::toUri) .toList(); } + private InternalNamespace namespaceFor(String tenantId, String namespace) { + return new InternalNamespace(log, tenantId, namespace, storageInterface); + } + public boolean delete(String tenantId, String namespace, URI path) throws IOException { - return storageInterface.delete(tenantId, this.uri(namespace, path)); + return namespaceFor(tenantId, namespace).delete(Path.of(path)); } public URI createFile(String tenantId, String namespace, URI path, InputStream inputStream) throws IOException { - return storageInterface.put(tenantId, this.uri(namespace, path), inputStream); + return namespaceFor(tenantId, namespace).putFile(Path.of(path), inputStream, Namespace.Conflicts.OVERWRITE) + .path() + .toUri(); } public URI createDirectory(String tenantId, String namespace, URI path) throws IOException { - return storageInterface.createDirectory(tenantId, this.uri(namespace, path)); - } - - private static boolean match(List patterns, String file) { - return patterns - .stream() - .anyMatch(s -> FileSystems - .getDefault() - .getPathMatcher("glob:" + (s.matches("\\w+[\\s\\S]*") ? "**/" + s : s)) - .matches(Paths.get(file)) - ); + return namespaceFor(tenantId, namespace).createDirectory(Path.of(path.getPath())); } public InputStream content(String tenantId, String namespace, URI path) throws IOException { - return storageInterface.get(tenantId, uri(namespace, path)); + return namespaceFor(tenantId, namespace).getFileContent(Path.of(path)); } public static URI toNamespacedStorageUri(String namespace, @Nullable URI relativePath) { - return URI.create("kestra://" + StorageContext.namespaceFilePrefix(namespace) + Optional.ofNullable(relativePath).map(URI::getPath).orElse("/")); - } - - private void copy(String tenantId, String namespace, Path basePath, List files) throws IOException { - files - .forEach(throwConsumer(f -> { - Path destination = Paths.get(basePath.toString(), f.getPath()); - - if (!destination.getParent().toFile().exists()) { - //noinspection ResultOfMethodCallIgnored - destination.getParent().toFile().mkdirs(); - } - - try (InputStream inputStream = this.content(tenantId, namespace, f)) { - Files.copy(inputStream, destination, REPLACE_EXISTING); - } - })); + return NamespaceFile.of(namespace, relativePath).uri(); } } diff --git a/core/src/main/java/io/kestra/core/runners/RunContextFactory.java b/core/src/main/java/io/kestra/core/runners/RunContextFactory.java index 29addc946ab..ea7bca44ade 100644 --- a/core/src/main/java/io/kestra/core/runners/RunContextFactory.java +++ b/core/src/main/java/io/kestra/core/runners/RunContextFactory.java @@ -8,6 +8,7 @@ import io.kestra.core.models.tasks.Task; import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.plugins.PluginConfigurations; +import io.kestra.core.services.FlowService; import io.kestra.core.storages.InternalStorage; import io.kestra.core.storages.StorageContext; import io.kestra.core.storages.StorageInterface; @@ -34,6 +35,9 @@ public class RunContextFactory { @Inject protected StorageInterface storageInterface; + @Inject + protected FlowService flowService; + @Inject protected MetricRegistry metricRegistry; @@ -62,7 +66,7 @@ public RunContext of(Flow flow, Execution execution) { .withLogger(runContextLogger) // Execution .withPluginConfiguration(Map.of()) - .withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forExecution(execution), storageInterface)) + .withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forExecution(execution), storageInterface, flowService)) .withVariables(newRunVariablesBuilder() .withFlow(flow) .withExecution(execution) @@ -83,7 +87,7 @@ public RunContext of(Flow flow, Task task, Execution execution, TaskRun taskRun, .withLogger(runContextLogger) // Task .withPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(task.getType(), task.getClass())) - .withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface)) + .withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, flowService)) .withVariables(newRunVariablesBuilder() .withFlow(flow) .withTask(task) @@ -122,8 +126,15 @@ public RunContext of(final Map variables) { public URI getContextStorageURI() { return URI.create(""); } + + @Override + public String getTenantId() { + var tenantId = ((Map)variables.getOrDefault("flow", Map.of())).get("tenantId"); + return Optional.ofNullable(tenantId).map(Object::toString).orElse(null); + } }, - storageInterface + storageInterface, + flowService )) .withVariables(variables) .build(); diff --git a/core/src/main/java/io/kestra/core/runners/RunContextInitializer.java b/core/src/main/java/io/kestra/core/runners/RunContextInitializer.java index 879ddcc65f4..f5d8ac606f5 100644 --- a/core/src/main/java/io/kestra/core/runners/RunContextInitializer.java +++ b/core/src/main/java/io/kestra/core/runners/RunContextInitializer.java @@ -8,6 +8,7 @@ import io.kestra.core.models.triggers.AbstractTrigger; import io.kestra.core.models.triggers.TriggerContext; import io.kestra.core.plugins.PluginConfigurations; +import io.kestra.core.services.FlowService; import io.kestra.core.storages.InternalStorage; import io.kestra.core.storages.StorageContext; import io.kestra.core.storages.StorageInterface; @@ -42,6 +43,9 @@ public class RunContextInitializer { @Inject protected StorageInterface storageInterface; + @Inject + protected FlowService flowService; + @Value("${kestra.encryption.secret-key}") protected Optional secretKey; @@ -116,7 +120,7 @@ private DefaultRunContext forWorker(final DefaultRunContext runContext, runContext.setVariables(enrichedVariables); runContext.setPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(task.getType(), task.getClass())); - runContext.setStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface)); + runContext.setStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, flowService)); runContext.setLogger(runContextLogger); return runContext; @@ -201,7 +205,8 @@ public DefaultRunContext forScheduler(final DefaultRunContext runContext, final InternalStorage storage = new InternalStorage( runContextLogger.logger(), context, - storageInterface + storageInterface, + flowService ); runContext.setLogger(runContextLogger); diff --git a/core/src/main/java/io/kestra/core/runners/WorkingDir.java b/core/src/main/java/io/kestra/core/runners/WorkingDir.java index 8807a1addd3..1cef21383a3 100644 --- a/core/src/main/java/io/kestra/core/runners/WorkingDir.java +++ b/core/src/main/java/io/kestra/core/runners/WorkingDir.java @@ -1,13 +1,19 @@ package io.kestra.core.runners; import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileAlreadyExistsException; import java.nio.file.Path; import java.util.List; /** - * Service interface for accessing a dedicated working directory. + * Service interface for accessing a specific working directory. *

- * A working directory is a local and temporary directory associated to a task-run context. + * A working directory (or working-dir) is a local and temporary directory attached + * to the execution of task. + *

+ * For example, a task can use the working directory to temporarily + * buffered data on local disk before storing them on the Kestra's internal storage. * * @see RunContext#workingDir() */ @@ -81,16 +87,21 @@ public interface WorkingDir { Path createTempFile(byte[] content, String extension) throws IOException; /** - * Creates a new empty file in the working directory with the given name. + * Creates a new empty file in the working directory with the given filename. + *

+ * This method will throw an exception if a file already exists for the given filename. * * @param filename The file name. - * @throws IOException if an error happen while creating the file. - * @throws IllegalArgumentException if the given filename is {@code null} or empty. + * @throws IOException if an error happen while creating the file. + * @throws FileAlreadyExistsException – If a file of that name already exists (optional specific + * @throws IllegalArgumentException if the given filename is {@code null} or empty. */ Path createFile(String filename) throws IOException; /** - * Creates a new empty file in the working directory with the given name and content. + * Creates a new file in the working directory with the given name and content. + *

+ * This method will throw an exception if a file already exists for the given filename. * * @param filename The file name. * @param content The file content - may be {@code null}. @@ -99,6 +110,30 @@ public interface WorkingDir { */ Path createFile(String filename, byte[] content) throws IOException; + /** + * Creates a new file in the working directory with the given name and content. + *

+ * This method will throw an exception if a file already exists for the given filename. + * + * @param filename The file name. + * @param content The file content - may be {@code null}. + * @throws IOException if an error happen while creating the file. + * @throws IllegalArgumentException if the given filename is {@code null} or empty. + */ + Path createFile(String filename, InputStream content) throws IOException; + + /** + * Creates a new file or replaces an existing one with the given content. + *

+ * This method will throw an exception if a file already exists for the given filename. + * + * @param path The path of the file. + * @param content The file content - may be {@code null}. + * @throws IOException if an error happen while creating the file. + * @throws IllegalArgumentException if the given path is {@code null}. + */ + Path putFile(Path path, InputStream content) throws IOException; + /** * Finds all files in the working directory that matches one of the given patterns. * diff --git a/core/src/main/java/io/kestra/core/storages/InternalNamespace.java b/core/src/main/java/io/kestra/core/storages/InternalNamespace.java new file mode 100644 index 00000000000..4bf7f33dcc0 --- /dev/null +++ b/core/src/main/java/io/kestra/core/storages/InternalNamespace.java @@ -0,0 +1,203 @@ +package io.kestra.core.storages; + +import jakarta.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.nio.file.Path; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Predicate; + +/** + * The default {@link Namespace} implementation. + * This class acts as a facade to the {@link StorageInterface} for manipulating namespace files. + * + * @see Storage#namespace() + * @see Storage#namespace(String) + */ +public class InternalNamespace implements Namespace { + + private static final Logger log = LoggerFactory.getLogger(InternalNamespace.class); + + private final String namespace; + private final String tenant; + private final StorageInterface storage; + private final Logger logger; + + /** + * Creates a new {@link InternalNamespace} instance. + * + * @param namespace The namespace + * @param storage The storage. + */ + public InternalNamespace(final String namespace, final StorageInterface storage) { + this(log, null, namespace, storage); + } + + /** + * Creates a new {@link InternalNamespace} instance. + * + * @param logger The logger to be used by this class. + * @param namespace The namespace + * @param tenant The tenant. + * @param storage The storage. + */ + public InternalNamespace(final Logger logger, @Nullable final String tenant, final String namespace, final StorageInterface storage) { + this.logger = Objects.requireNonNull(logger, "logger cannot be null"); + this.namespace = Objects.requireNonNull(namespace, "namespace cannot be null"); + this.storage = Objects.requireNonNull(storage, "storage cannot be null"); + this.tenant = tenant; + } + + /** + * {@inheritDoc} + **/ + @Override + public String namespace() { + return namespace; + } + + /** + * {@inheritDoc} + **/ + @Override + public List all() throws IOException { + return all(false); + } + + /** + * {@inheritDoc} + **/ + @Override + public List all(final boolean includeDirectories) throws IOException { + return all(null, includeDirectories); + } + + /** + * {@inheritDoc} + **/ + @Override + public List all(final String prefix, final boolean includeDirectories) throws IOException { + URI namespacePrefix = URI.create(NamespaceFile.of(namespace, Optional.ofNullable(prefix).map(Path::of).orElse(null)).storagePath() + "/"); + return storage.allByPrefix(tenant, namespacePrefix, includeDirectories) + .stream() + .map(uri -> new NamespaceFile(relativize(uri), uri, namespace)) + .toList(); + } + + /** + * {@inheritDoc} + **/ + @Override + public NamespaceFile get(final Path path) { + return NamespaceFile.of(namespace, path); + } + + public Path relativize(final URI uri) { + return NamespaceFile.of(namespace) + .storagePath() + .relativize(Path.of(uri.getPath())); + } + + /** + * {@inheritDoc} + **/ + @Override + public List findAllFilesMatching(final Predicate predicate) throws IOException { + return all().stream().filter(it -> predicate.test(it.path(true))).toList(); + } + + /** + * {@inheritDoc} + **/ + @Override + public InputStream getFileContent(final Path path) throws IOException { + Path namespaceFilePath = NamespaceFile.of(namespace, path).storagePath(); + return storage.get(tenant, namespaceFilePath.toUri()); + } + + /** + * {@inheritDoc} + **/ + @Override + public NamespaceFile putFile(final Path path, final InputStream content, final Conflicts onAlreadyExist) throws IOException { + Path namespaceFilesPrefix = NamespaceFile.of(namespace, path).storagePath(); + final boolean exists = storage.exists(tenant, namespaceFilesPrefix.toUri()); + + return switch (onAlreadyExist) { + case OVERWRITE -> { + URI uri = storage.put(tenant, namespaceFilesPrefix.toUri(), content); + NamespaceFile namespaceFile = new NamespaceFile(relativize(uri), uri, namespace); + if (exists) { + logger.debug(String.format( + "File '%s' overwritten into namespace '%s'.", + path, + namespace + )); + } else { + logger.debug(String.format( + "File '%s' added to namespace '%s'.", + path, + namespace + )); + } + yield namespaceFile; + } + case ERROR -> { + if (!exists) { + URI uri = storage.put(tenant, namespaceFilesPrefix.toUri(), content); + yield new NamespaceFile(relativize(uri), uri, namespace); + } else { + throw new IOException(String.format( + "File '%s' already exists in namespace '%s' and conflict is set to %s", + path, + namespace, + Conflicts.ERROR + )); + } + } + case SKIP -> { + if (!exists) { + URI uri = storage.put(tenant, namespaceFilesPrefix.toUri(), content); + NamespaceFile namespaceFile = new NamespaceFile(relativize(uri), uri, namespace); + logger.debug(String.format( + "File '%s' added to namespace '%s'.", + path, + namespace + )); + yield namespaceFile; + } else { + logger.debug(String.format( + "File '%s' already exists in namespace '%s' and conflict is set to %s. Skipping.", + path, + namespace, + Conflicts.SKIP + )); + URI uri = URI.create(StorageContext.KESTRA_PROTOCOL + namespaceFilesPrefix); + yield new NamespaceFile(relativize(uri), uri, namespace); + } + } + }; + } + + /** + * {@inheritDoc} + **/ + @Override + public URI createDirectory(Path path) throws IOException { + return storage.createDirectory(tenant, NamespaceFile.of(namespace, path).storagePath().toUri()); + } + + /** + * {@inheritDoc} + **/ + @Override + public boolean delete(Path path) throws IOException { + return storage.delete(tenant, NamespaceFile.of(namespace, path).storagePath().toUri()); + } +} diff --git a/core/src/main/java/io/kestra/core/storages/InternalStorage.java b/core/src/main/java/io/kestra/core/storages/InternalStorage.java index 2ad7f517f31..b7477906173 100644 --- a/core/src/main/java/io/kestra/core/storages/InternalStorage.java +++ b/core/src/main/java/io/kestra/core/storages/InternalStorage.java @@ -1,7 +1,7 @@ package io.kestra.core.storages; +import io.kestra.core.services.FlowService; import jakarta.annotation.Nullable; -import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +30,7 @@ public class InternalStorage implements Storage { private final Logger logger; private final StorageContext context; private final StorageInterface storage; + private final FlowService flowService; /** * Creates a new {@link InternalStorage} instance. @@ -38,7 +39,7 @@ public class InternalStorage implements Storage { * @param storage The storage to delegate operations. */ public InternalStorage(StorageContext context, StorageInterface storage) { - this(LOG, context, storage); + this(LOG, context, storage, null); } /** @@ -48,10 +49,35 @@ public InternalStorage(StorageContext context, StorageInterface storage) { * @param context The storage context. * @param storage The storage to delegate operations. */ - public InternalStorage(Logger logger, StorageContext context, StorageInterface storage) { + public InternalStorage(Logger logger, StorageContext context, StorageInterface storage, FlowService flowService) { this.logger = logger; this.context = context; this.storage = storage; + this.flowService = flowService; + } + + /** + * {@inheritDoc} + **/ + @Override + public Namespace namespace() { + return new InternalNamespace(logger, context.getTenantId(), context.getNamespace(), storage); + } + + /** + * {@inheritDoc} + **/ + @Override + public Namespace namespace(String namespace) { + boolean isExternalNamespace = !namespace.equals(context.getNamespace()); + // Checks whether the contextual namespace is allowed to access the passed namespace. + if (isExternalNamespace && flowService != null) { + flowService.checkAllowedNamespace( + context.getTenantId(), namespace, // requested Tenant/Namespace + context.getTenantId(), context.getNamespace() // from Tenant/Namespace + ); + } + return new InternalNamespace(logger, context.getTenantId(), namespace, storage); } /** diff --git a/core/src/main/java/io/kestra/core/storages/Namespace.java b/core/src/main/java/io/kestra/core/storages/Namespace.java new file mode 100644 index 00000000000..fd566bcf7ee --- /dev/null +++ b/core/src/main/java/io/kestra/core/storages/Namespace.java @@ -0,0 +1,126 @@ +package io.kestra.core.storages; + +import io.kestra.core.utils.PathMatcherPredicate; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.nio.file.Path; +import java.util.List; +import java.util.function.Predicate; + +/** + * Service interface for accessing the files attached to a namespace (a.k.a., Namespace Files). + */ +public interface Namespace { + + /** + * Gets the current namespace. + * + * @return the current namespace. + */ + String namespace(); + + /** + * Gets the URIs of all namespace files for the contextual namespace. + * + * @return The list of {@link URI}. + */ + List all() throws IOException; + + /** + * Gets the URIs of all namespace files for the contextual namespace. + * + * @return The list of {@link URI}. + */ + List all(boolean includeDirectories) throws IOException; + + /** + * Gets the URIs of all namespace files for the current namespace. + * + * @return The list of {@link URI}. + */ + List all(String prefix, boolean includeDirectories) throws IOException; + + /** + * Gets a {@link NamespaceFile} for the given path and the current namespace. + * + * @param path the file path. + * @return a new {@link NamespaceFile} + */ + NamespaceFile get(Path path); + + /** + * Retrieves the URIs of all namespace files for the current namespace matching the given predicate. + * + * @param predicate The predicate for matching files. + * @return The list of {@link URI} for matched namespace files. + */ + List findAllFilesMatching(Predicate predicate) throws IOException; + + /** + * Retrieves the URIs of all namespace files for the current namespace matching the given predicates. + * + * @param includes A list of glob expressions specifying the files to include. + * @param excludes A list of glob expressions specifying the files to exclude. + * @return A list of {@link URI} objects representing the matched namespace files. + */ + default List findAllFilesMatching(List includes, List excludes) throws IOException { + Predicate predicate = PathMatcherPredicate.builder() + .includes(includes) + .excludes(excludes) + .build(); + return findAllFilesMatching(predicate); + } + + /** + * Retrieves the content of the namespace file at the given path. + * + * @param path the file path. + * @return the {@link InputStream}. + * @throws IllegalArgumentException if the given {@link Path} is {@code null} or invalid. + * @throws IOException if an error happens while accessing the file. + */ + InputStream getFileContent(Path path) throws IOException; + + default NamespaceFile putFile(Path path, InputStream content) throws IOException { + return putFile(path, content, Conflicts.OVERWRITE); + } + + NamespaceFile putFile(Path path, InputStream content, Conflicts onAlreadyExist) throws IOException; + + /** + * Creates a new directory for the current namespace. + * + * @param path The {@link Path} of the directory. + * @return The URI of the directory in the Kestra's internal storage. + * @throws IOException if an error happens while accessing the file. + */ + URI createDirectory(Path path) throws IOException; + + /** + * Deletes any namespaces files at the given path. + * + * @param file the {@link NamespaceFile} to be deleted. + * @throws IOException if an error happens while performing the delete operation. + */ + default boolean delete(NamespaceFile file) throws IOException { + return delete(file.path()); + } + + /** + * Deletes any namespaces files at the given path. + * + * @param path the path to be deleted. + * @return {@code true} if the file was deleted by this method; {@code false} if the file could not be deleted because it did not exist + * @throws IOException if an error happens while performing the delete operation. + */ + boolean delete(Path path) throws IOException; + + enum Conflicts { + OVERWRITE, + ERROR, + SKIP + } + +} diff --git a/core/src/main/java/io/kestra/core/storages/NamespaceFile.java b/core/src/main/java/io/kestra/core/storages/NamespaceFile.java new file mode 100644 index 00000000000..0c78a4f6b62 --- /dev/null +++ b/core/src/main/java/io/kestra/core/storages/NamespaceFile.java @@ -0,0 +1,120 @@ +package io.kestra.core.storages; + +import jakarta.annotation.Nullable; + +import java.net.URI; +import java.nio.file.Path; +import java.util.Objects; + +/** + * Represents a NamespaceFile object. + * + * @param path The path of file relative to the namespace. + * @param uri The URI of the namespace file in the Kestra's internal storage. + * @param namespace The namespace of the file. + */ +public record NamespaceFile( + Path path, + URI uri, + String namespace +) { + + /** + * Static factory method for constructing a new {@link NamespaceFile} object. + *

+ * This method is equivalent to calling {@code NamespaceFile#of(String, null)} + * + * @param namespace The namespace - cannot be {@code null}. + * @return a new {@link NamespaceFile} object + */ + public static NamespaceFile of(final String namespace) { + return of(namespace, (Path) null); + } + + /** + * Static factory method for constructing a new {@link NamespaceFile} object. + * + * @param uri The path of file relative to the namespace - cannot be {@code null}. + * @param namespace The namespace - cannot be {@code null}. + * @return a new {@link NamespaceFile} object + */ + public static NamespaceFile of(final String namespace, @Nullable final URI uri) { + if (uri == null) { + return of(namespace, (Path) null); + } + + Path path = Path.of(uri.getPath()); + if (uri.getScheme() != null) { + if (!uri.getScheme().equalsIgnoreCase("kestra")) { + throw new IllegalArgumentException(String.format( + "Invalid Kestra URI scheme. Expected 'kestra', but was '%s'.", uri.getScheme() + )); + } + if (!uri.getPath().startsWith(StorageContext.namespaceFilePrefix(namespace))) { + throw new IllegalArgumentException(String.format( + "Invalid Kestra URI. Expected prefix for namespace '%s', but was %s.", namespace, uri) + ); + } + return of(namespace, Path.of(StorageContext.namespaceFilePrefix(namespace)).relativize(path)); + } + return of(namespace, path); + } + + /** + * Static factory method for constructing a new {@link NamespaceFile} object. + * + * @param path The path of file relative to the namespace. + * @param namespace The namespace - cannot be {@code null}. + * @return a new {@link NamespaceFile} object + */ + public static NamespaceFile of(final String namespace, @Nullable final Path path) { + Objects.requireNonNull(namespace, "namespace cannot be null"); + if (path == null || path.equals(Path.of("/"))) { + return new NamespaceFile( + Path.of(""), + URI.create(StorageContext.KESTRA_PROTOCOL + StorageContext.namespaceFilePrefix(namespace) + "/"), + namespace + ); + } + + Path namespacePrefixPath = Path.of(StorageContext.namespaceFilePrefix(namespace)); + Path filePath = path.normalize(); + if (filePath.isAbsolute()) { + filePath = filePath.getRoot().relativize(filePath); + } + return new NamespaceFile( + filePath, + URI.create(StorageContext.KESTRA_PROTOCOL + namespacePrefixPath.resolve(filePath)), + namespace + ); + } + + /** + * Returns The path of file relative to the namespace. + * + * @param withLeadingSlash specify whether to remove leading slash from the returned path. + * @return The path. + */ + public Path path(boolean withLeadingSlash) { + final String strPath = path.toString(); + if (!withLeadingSlash) { + if (strPath.startsWith("/")) { + return Path.of(strPath.substring(1)); + } + } else { + if (!strPath.startsWith("/")) { + return Path.of("/").resolve(path); + } + } + return path; + } + + /** + * Get the full storage path of this namespace file. + * + * @return The {@link Path}. + */ + public Path storagePath() { + return Path.of(uri().getPath()); + } +} diff --git a/core/src/main/java/io/kestra/core/storages/Storage.java b/core/src/main/java/io/kestra/core/storages/Storage.java index 5c1cfe96a8f..cbdf20e6981 100644 --- a/core/src/main/java/io/kestra/core/storages/Storage.java +++ b/core/src/main/java/io/kestra/core/storages/Storage.java @@ -15,6 +15,20 @@ */ public interface Storage { + /** + * Gets access to the namespace files for the contextual namespace. + * + * @return The {@link Namespace}. + */ + Namespace namespace(); + + /** + * Gets access to the namespace files for the given namespace. + * + * @return The {@link Namespace}. + */ + Namespace namespace(String namespace); + /** * Checks whether the given URI points to an exiting file/object in the internal storage. * diff --git a/core/src/main/java/io/kestra/core/utils/FileUtils.java b/core/src/main/java/io/kestra/core/utils/FileUtils.java index edfb8556d59..1b0f2c48fec 100644 --- a/core/src/main/java/io/kestra/core/utils/FileUtils.java +++ b/core/src/main/java/io/kestra/core/utils/FileUtils.java @@ -4,8 +4,12 @@ import org.apache.commons.lang3.StringUtils; import java.net.URI; +import java.util.Optional; -public class FileUtils { +/** + * Utility methods for manipulating files. + */ +public final class FileUtils { /** * Get the file extension prefixed the '.' from the given file URI. @@ -28,4 +32,30 @@ public static String getExtension(final String file) { String extension = FilenameUtils.getExtension(file); return StringUtils.isEmpty(extension) ? null : "." + extension; } + + /** + * Creates a new {@link URI} from the given string path. + * + * @param path the string path - may be {@link null}. + * @return an optional URI, or {@link Optional#empty()} if the given path represent an invalid URI. + */ + public static Optional getURI(final String path) { + if (path == null) return Optional.empty(); + try { + return Optional.of(URI.create(path)); + } catch (IllegalArgumentException e) { + return Optional.empty(); + } + } + + /** + * Extracts the file name from the given URI. + * + * @param uri the file URI. + * @return the string file name. + */ + public static String getFileName(final URI uri) { + String path = uri.getPath(); + return path.substring(path.lastIndexOf('/') + 1); + } } diff --git a/core/src/main/java/io/kestra/core/utils/PathMatcherPredicate.java b/core/src/main/java/io/kestra/core/utils/PathMatcherPredicate.java new file mode 100644 index 00000000000..7c17fd0b73a --- /dev/null +++ b/core/src/main/java/io/kestra/core/utils/PathMatcherPredicate.java @@ -0,0 +1,134 @@ +package io.kestra.core.utils; + +import jakarta.annotation.Nullable; + +import java.nio.file.FileSystem; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.nio.file.PathMatcher; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Predicate; + +import static java.util.function.Predicate.not; + +/** + * Simple {@link Predicate} implementation for matching {@link Path paths} + * based on given glob or regex expressions. + */ +public final class PathMatcherPredicate implements Predicate { + + private static final String SYNTAX_GLOB = "glob:"; + private static final String SYNTAX_REGEX = "regex:"; + + /** + * Static factory method for constructing a new {@link PathMatcherPredicate} instance. + * + * @param patterns a list of glob or regex expressions. + * @return a new {@link PathMatcherPredicate}. + */ + public static PathMatcherPredicate matches(final List patterns) { + return new PathMatcherPredicate(null, patterns); + } + + /** + * Static factory method for constructing a new {@link PathMatcherPredicate} instance. + * + * @param basePath a base path to chroot all patterns - may be {@code null}. + * @param patterns a list of glob or regex expressions. + * @return a new {@link PathMatcherPredicate}. + */ + public static PathMatcherPredicate matches(final Path basePath, final List patterns) { + return new PathMatcherPredicate(basePath, patterns); + } + + public static Builder builder() { + return new Builder(); + } + + /** + * Builder class for constructing new {@link PathMatcherPredicate}. + */ + public static class Builder { + private List includes = List.of(); + private List excludes = List.of(); + + public Builder includes(final List includes) { + this.includes = Optional.ofNullable(includes).orElse(this.includes); + return this; + } + + public Builder excludes(final List excludes) { + this.excludes = Optional.ofNullable(excludes).orElse(this.excludes); + return this; + } + + public Predicate build() { + if (!this.includes.isEmpty() && !this.excludes.isEmpty()) + return matches(includes).and(not(matches(this.excludes))); + + if (!this.includes.isEmpty()) + return matches(includes); + + if (!this.excludes.isEmpty()) + return not(matches(this.excludes)); + + return path -> true; + } + } + + private final List syntaxAndPatterns; + private final List matchers; + + /** + * Creates a new {@link PathMatcherPredicate} instance. + * + * @param basePath a base path to chroot all patterns - may be {@code null}. + * @param patterns a list of glob or regex expressions. + */ + private PathMatcherPredicate(@Nullable final Path basePath, final List patterns) { + Objects.requireNonNull(patterns, "patterns cannot be null"); + this.syntaxAndPatterns = patterns.stream() + .map(p -> { + String syntaxAndPattern = p; + if (!isPrefixWithSyntax(p)) { + String pattern; + if (basePath != null) { + pattern = basePath + mayAddLeadingSlash(p); + } else { + pattern = mayAddRecursiveMatch(p); + } + syntaxAndPattern = SYNTAX_GLOB + pattern; + } + return syntaxAndPattern; + }) + .toList(); + FileSystem fs = FileSystems.getDefault(); + this.matchers = this.syntaxAndPatterns.stream().map(fs::getPathMatcher).toList(); + } + + private static String mayAddRecursiveMatch(final String p) { + return p.matches("\\w+[\\s\\S]*") ? "**/" + p : p; + } + + public List syntaxAndPatterns() { + return syntaxAndPatterns; + } + + /** + * {@inheritDoc} + **/ + @Override + public boolean test(Path path) { + return matchers.stream().anyMatch(p -> p.matches(path)); + } + + private static String mayAddLeadingSlash(final String path) { + return path.startsWith("/") ? path : "/" + path; + } + + public static boolean isPrefixWithSyntax(final String pattern) { + return pattern.startsWith(SYNTAX_REGEX) | pattern.startsWith(SYNTAX_GLOB); + } +} diff --git a/core/src/main/java/io/kestra/plugin/core/flow/WorkingDirectory.java b/core/src/main/java/io/kestra/plugin/core/flow/WorkingDirectory.java index bf32a7c0341..0fb39411dff 100644 --- a/core/src/main/java/io/kestra/plugin/core/flow/WorkingDirectory.java +++ b/core/src/main/java/io/kestra/plugin/core/flow/WorkingDirectory.java @@ -17,12 +17,12 @@ import io.kestra.core.models.tasks.Task; import io.kestra.core.models.tasks.VoidOutput; import io.kestra.core.runners.FilesService; -import io.kestra.core.runners.DefaultRunContext; -import io.kestra.core.runners.NamespaceFilesService; import io.kestra.core.runners.RunContext; import io.kestra.core.runners.WorkerTask; import io.kestra.core.serializers.FileSerde; +import io.kestra.core.storages.NamespaceFile; import io.kestra.core.utils.IdUtils; +import io.kestra.core.utils.Rethrow; import io.kestra.core.validations.WorkingDirectoryTaskValidation; import io.swagger.v3.oas.annotations.media.Schema; import lombok.*; @@ -34,17 +34,9 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; -import java.nio.file.FileSystem; -import java.nio.file.FileSystems; -import java.nio.file.FileVisitResult; -import java.nio.file.FileVisitor; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.PathMatcher; -import java.nio.file.SimpleFileVisitor; -import java.nio.file.attribute.BasicFileAttributes; import java.time.Duration; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -271,9 +263,14 @@ public void preExecuteTasks(RunContext runContext, TaskRun taskRun) throws Excep } } - if (this.namespaceFiles != null ) { - NamespaceFilesService namespaceFilesService = ((DefaultRunContext)runContext).getApplicationContext().getBean(NamespaceFilesService.class); - namespaceFilesService.inject(runContext, taskRun.getTenantId(), taskRun.getNamespace(), runContext.workingDir().path(), this.namespaceFiles); + if (this.namespaceFiles != null && this.namespaceFiles.getEnabled()) { + runContext.storage() + .namespace() + .findAllFilesMatching(this.namespaceFiles.getInclude(), this.namespaceFiles.getExclude()) + .forEach(Rethrow.throwConsumer(namespaceFile -> { + InputStream content = runContext.storage().getFile(namespaceFile.uri()); + runContext.workingDir().putFile(namespaceFile.path(), content); + })); } if (this.inputFiles != null) { diff --git a/core/src/main/java/io/kestra/plugin/core/namespace/DeleteFiles.java b/core/src/main/java/io/kestra/plugin/core/namespace/DeleteFiles.java index d5820b504c2..2089fea0209 100644 --- a/core/src/main/java/io/kestra/plugin/core/namespace/DeleteFiles.java +++ b/core/src/main/java/io/kestra/plugin/core/namespace/DeleteFiles.java @@ -6,10 +6,10 @@ import io.kestra.core.models.executions.metrics.Counter; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.models.tasks.Task; -import io.kestra.core.runners.DefaultRunContext; -import io.kestra.core.runners.NamespaceFilesService; import io.kestra.core.runners.RunContext; -import io.kestra.core.services.FlowService; +import io.kestra.core.storages.Namespace; +import io.kestra.core.storages.NamespaceFile; +import io.kestra.core.utils.PathMatcherPredicate; import io.kestra.core.utils.Rethrow; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Builder; @@ -20,14 +20,8 @@ import org.slf4j.Logger; import java.net.URI; -import java.nio.file.FileSystems; -import java.nio.file.Path; -import java.nio.file.PathMatcher; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -import static io.kestra.core.utils.PathUtil.checkLeadingSlash; @SuperBuilder @Getter @@ -91,14 +85,9 @@ public class DeleteFiles extends Task implements RunnableTask renderedFiles; if (files instanceof String filesString) { @@ -109,18 +98,20 @@ public Output run(RunContext runContext) throws Exception { throw new IllegalArgumentException("Files must be a String or a list of String"); } - List patterns = renderedFiles.stream().map(reg -> FileSystems.getDefault().getPathMatcher("glob:" + checkLeadingSlash(reg))).toList(); - AtomicInteger count = new AtomicInteger(); - - namespaceFilesService.recursiveList(flowInfo.tenantId(), renderedNamespace, null).forEach(Rethrow.throwConsumer(uri -> { - if (patterns.stream().anyMatch(p -> p.matches(Path.of(uri.getPath())))) { - namespaceFilesService.delete(flowInfo.tenantId(), renderedNamespace, uri); - logger.debug(String.format("Deleted %s", uri)); - count.getAndIncrement(); - } - })); + List matched = namespace.findAllFilesMatching(PathMatcherPredicate.matches(renderedFiles)); + long count = matched + .stream() + .map(Rethrow.throwFunction(file -> { + if (namespace.delete(file)) { + logger.debug(String.format("Deleted %s", (file.path()))); + return true; + } + return false; + })) + .filter(Boolean::booleanValue) + .count(); - runContext.metric(Counter.of("deleted", count.get())); + runContext.metric(Counter.of("deleted", count)); return Output.builder().build(); } diff --git a/core/src/main/java/io/kestra/plugin/core/namespace/DownloadFiles.java b/core/src/main/java/io/kestra/plugin/core/namespace/DownloadFiles.java index 2e1b8287483..f0f80ce347c 100644 --- a/core/src/main/java/io/kestra/plugin/core/namespace/DownloadFiles.java +++ b/core/src/main/java/io/kestra/plugin/core/namespace/DownloadFiles.java @@ -6,10 +6,9 @@ import io.kestra.core.models.executions.metrics.Counter; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.models.tasks.Task; -import io.kestra.core.runners.DefaultRunContext; -import io.kestra.core.runners.NamespaceFilesService; import io.kestra.core.runners.RunContext; -import io.kestra.core.services.FlowService; +import io.kestra.core.storages.Namespace; +import io.kestra.core.utils.PathMatcherPredicate; import io.kestra.core.utils.Rethrow; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Builder; @@ -22,14 +21,10 @@ import java.io.InputStream; import java.net.URI; -import java.nio.file.FileSystems; -import java.nio.file.Path; -import java.nio.file.PathMatcher; -import java.util.HashMap; +import java.util.AbstractMap; import java.util.List; import java.util.Map; - -import static io.kestra.core.utils.PathUtil.checkLeadingSlash; +import java.util.stream.Collectors; @Slf4j @SuperBuilder @@ -101,16 +96,13 @@ public class DownloadFiles extends Task implements RunnableTask renderedFiles; if (files instanceof String filesString) { @@ -121,17 +113,16 @@ public Output run(RunContext runContext) throws Exception { throw new IllegalArgumentException("The files property must be a string or a list of strings"); } - List patterns = renderedFiles.stream().map(reg -> FileSystems.getDefault().getPathMatcher("glob:" + checkLeadingSlash(reg))).toList(); - Map downloaded = new HashMap<>(); - - namespaceFilesService.recursiveList(flowInfo.tenantId(), renderedNamespace, null).forEach(Rethrow.throwConsumer(uri -> { - if (patterns.stream().anyMatch(p -> p.matches(Path.of(uri.getPath())))) { - try (InputStream inputStream = namespaceFilesService.content(flowInfo.tenantId(), renderedNamespace, uri)) { - downloaded.put(uri.getPath(), runContext.storage().putFile(inputStream, destination + uri.getPath())); + Map downloaded = namespace.findAllFilesMatching(PathMatcherPredicate.matches(renderedFiles)) + .stream() + .map(Rethrow.throwFunction(file -> { + try (InputStream is = runContext.storage().getFile(file.uri())) { + URI uri = runContext.storage().putFile(is, renderedDestination + file.path()); logger.debug(String.format("Downloaded %s", uri)); + return new AbstractMap.SimpleEntry<>(file.path(true).toString(), uri); } - } - })); + })) + .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue)); runContext.metric(Counter.of("downloaded", downloaded.size())); return Output.builder().files(downloaded).build(); } diff --git a/core/src/main/java/io/kestra/plugin/core/namespace/UploadFiles.java b/core/src/main/java/io/kestra/plugin/core/namespace/UploadFiles.java index c56ad60cd12..820442a32a6 100644 --- a/core/src/main/java/io/kestra/plugin/core/namespace/UploadFiles.java +++ b/core/src/main/java/io/kestra/plugin/core/namespace/UploadFiles.java @@ -1,34 +1,30 @@ package io.kestra.plugin.core.namespace; -import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.models.tasks.Task; -import io.kestra.core.runners.DefaultRunContext; import io.kestra.core.runners.RunContext; -import io.kestra.core.services.FlowService; -import io.kestra.core.storages.StorageInterface; +import io.kestra.core.storages.Namespace; +import io.kestra.core.utils.FileUtils; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Builder; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.experimental.SuperBuilder; import org.codehaus.commons.nullanalysis.NotNull; -import org.slf4j.Logger; import java.io.File; import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; import java.net.URI; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; -import static io.kestra.core.runners.NamespaceFilesService.toNamespacedStorageUri; import static io.kestra.core.utils.PathUtil.checkLeadingSlash; @SuperBuilder(toBuilder = true) @@ -60,7 +56,7 @@ - id: python type: io.kestra.plugin.scripts.python.Commands - namespaceFiles: + namespace: enabled: true commands: - python scripts/main.py""" @@ -99,21 +95,17 @@ public class UploadFiles extends Task implements RunnableTask filesList) { if (renderedDestination == null) { throw new RuntimeException("Destination must be set when providing a List for the `files` property."); } @@ -121,79 +113,46 @@ public UploadFiles.Output run(RunContext runContext) throws Exception { final List regexs = new ArrayList<>(); - // First handle string that are full URI - ((List) filesList).forEach(path -> { - if (storageInterface.exists(flowInfo.tenantId(), URI.create(path))) { - String newFilePath = null; - try { - newFilePath = buildPath(renderedDestination, storageInterface.getAttributes(flowInfo.tenantId(), URI.create(path)).getFileName()); - storeNewFile(logger, runContext, storageInterface, flowInfo.tenantId(), newFilePath, storageInterface.get(flowInfo.tenantId(), URI.create(path))); - } catch (IOException | IllegalVariableEvaluationException e) { - throw new RuntimeException(e); + for (Object file : filesList) { + Optional uri = FileUtils.getURI(file.toString()); + // Immediately handle strings that are full URI + if (uri.isPresent()) { + if (runContext.storage().isFileExist(uri.get())) { + Path targetFilePath = Path.of(renderedDestination, FileUtils.getFileName(uri.get())); + storageNamespace.putFile(targetFilePath, runContext.storage().getFile(uri.get()), conflict); } + // else ignore } else { - regexs.add(path); + regexs.add(file.toString()); } - }); + } - // check for file in current tempDir that match regexs + // Check for files in the current WORKING_DIR that match the expressions for (Path path : runContext.workingDir().findAllFilesMatching(regexs)) { File file = path.toFile(); - String newFilePath = buildPath(renderedDestination, file.getPath().replace(runContext.workingDir().path().toString(), "")); - storeNewFile(logger, runContext, storageInterface, flowInfo.tenantId(), newFilePath, new FileInputStream(file)); + Path resolve = Paths.get("/").resolve(runContext.workingDir().path().relativize(file.toPath())); + + Path targetFilePath = Path.of(renderedDestination, resolve.toString()); + storageNamespace.putFile(targetFilePath, new FileInputStream(file), conflict); } } else if (files instanceof Map map) { // Using a Map for the `files` property, there must be only URI Map renderedMap = runContext.render((Map) map); - renderedMap.forEach((key, value) -> { - if (key instanceof String keyString && value instanceof String valueString) { - URI toUpload = URI.create(valueString); - if (storageInterface.exists(flowInfo.tenantId(), toUpload)) { - try { - storeNewFile(logger, runContext, storageInterface, flowInfo.tenantId(), checkLeadingSlash(keyString), storageInterface.get(flowInfo.tenantId(), toUpload)); - } catch (IOException | IllegalVariableEvaluationException e) { - throw new RuntimeException(e); - } + for (Map.Entry entry : renderedMap.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + if (key instanceof String targetFilePath && value instanceof String stringSourceFileURI) { + URI sourceFileURI = URI.create(stringSourceFileURI); + if (runContext.storage().isFileExist(sourceFileURI)) { + storageNamespace.putFile(Path.of(targetFilePath), runContext.storage().getFile(sourceFileURI), conflict); } } else { throw new IllegalArgumentException("files must be a List or a Map"); } - }); - } - - return Output.builder().build(); - } - - private String buildPath(String destination, String filename) { - return checkLeadingSlash(String.join("/", destination, filename)); - } - - private void storeNewFile(Logger logger, RunContext runContext, StorageInterface storageInterface, String tenantId, String filePath, InputStream fileContent) throws IOException, IllegalVariableEvaluationException { - String renderedNamespace = runContext.render(namespace); - URI newFileURI = toNamespacedStorageUri( - renderedNamespace, - URI.create(filePath) - ); - - boolean fileExists = storageInterface.exists(tenantId, newFileURI); - if (!conflict.equals(ConflictAction.OVERWRITE) && fileExists) { - if (conflict.equals(ConflictAction.ERROR)) { - throw new IOException(String.format("File %s already exists and conflict is set to %s", filePath, ConflictAction.ERROR)); } - logger.debug(String.format("File %s already exists and conflict is set to %s, skipping", filePath, ConflictAction.ERROR)); - return; } - storageInterface.put( - tenantId, - newFileURI, - fileContent - ); - if (fileExists) { - logger.debug(String.format("File %s overwritten", filePath)); - } else { - logger.debug(String.format("File %s created", filePath)); - } + return Output.builder().build(); } @Builder @@ -202,10 +161,4 @@ public static class Output implements io.kestra.core.models.tasks.Output { private final Map files; } - public enum ConflictAction { - OVERWRITE, - ERROR, - SKIP - } - } diff --git a/core/src/test/java/io/kestra/core/runners/LocalWorkingDirTest.java b/core/src/test/java/io/kestra/core/runners/LocalWorkingDirTest.java index f833ee9e3f0..f56988f4236 100644 --- a/core/src/test/java/io/kestra/core/runners/LocalWorkingDirTest.java +++ b/core/src/test/java/io/kestra/core/runners/LocalWorkingDirTest.java @@ -1,9 +1,13 @@ package io.kestra.core.runners; import io.kestra.core.utils.IdUtils; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; import java.nio.file.Path; import java.util.List; @@ -52,9 +56,8 @@ void shouldCreatedTempFile() throws IOException { assertThat(workingDirectory.getAllCreatedTempFiles().size(), is(1)); } - @Test - void shouldCreatedFile() throws IOException { + void shouldCreateFile() throws IOException { String workingDirId = IdUtils.create(); TestWorkingDir workingDirectory = new TestWorkingDir(workingDirId, new LocalWorkingDir(Path.of("/tmp/sub/dir/tmp/"), workingDirId)); Path path = workingDirectory.createFile("folder/file.txt"); @@ -64,6 +67,17 @@ void shouldCreatedFile() throws IOException { assertThat(workingDirectory.getAllCreatedFiles().size(), is(1)); } + @Test + void shouldThrowExceptionGivenFileAlreadyExist() throws IOException { + String workingDirId = IdUtils.create(); + TestWorkingDir workingDirectory = new TestWorkingDir(workingDirId, new LocalWorkingDir(Path.of("/tmp/sub/dir/tmp/"), workingDirId)); + + workingDirectory.createFile("folder/file.txt", "1".getBytes(StandardCharsets.UTF_8)); + Assertions.assertThrows(FileAlreadyExistsException.class, () -> { + workingDirectory.createFile("folder/file.txt", "2".getBytes(StandardCharsets.UTF_8)); + }); + } + @Test void shouldFindAllFilesMatchingPatterns() throws IOException { // Given diff --git a/core/src/test/java/io/kestra/core/runners/NamespaceFilesServiceTest.java b/core/src/test/java/io/kestra/core/runners/NamespaceFilesServiceTest.java index af281eddfa8..5b5e05438b9 100644 --- a/core/src/test/java/io/kestra/core/runners/NamespaceFilesServiceTest.java +++ b/core/src/test/java/io/kestra/core/runners/NamespaceFilesServiceTest.java @@ -17,6 +17,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.List; +import java.util.Map; import static io.kestra.core.utils.Rethrow.throwFunction; import static org.hamcrest.MatcherAssert.assertThat; @@ -35,7 +36,6 @@ class NamespaceFilesServiceTest { @Test public void noFilter() throws Exception { - Path basePath = Files.createTempDirectory("unit"); String namespace = "io.kestra." + IdUtils.create(); put(null, namespace, "/a/b/c/1.sql", "1"); @@ -48,7 +48,7 @@ public void noFilter() throws Exception { runContext, null, namespace, - basePath, + null, // not used anymore NamespaceFiles .builder() .enabled(true) @@ -56,10 +56,11 @@ public void noFilter() throws Exception { ); assertThat(injected.size(), is(3)); - List tempDir = Files.walk(basePath).filter(path -> path.toFile().isFile()).toList(); - assertThat(tempDir.size(), is(3)); + List workingDirFiles = runContext.workingDir().findAllFilesMatching(List.of("**/**")); + assertThat(workingDirFiles.size(), is(3)); + String fileContent = FileUtils.readFileToString( - tempDir.stream().filter(path -> path.toString().contains("b/c/d/1.sql")).findFirst().orElseThrow().toFile(), + workingDirFiles.stream().filter(path -> path.toString().contains("b/c/d/1.sql")).findFirst().orElseThrow().toFile(), "UTF-8" ); assertThat(fileContent, is(expectedFileContent)); @@ -71,7 +72,7 @@ public void noFilter() throws Exception { runContext, null, namespace, - basePath, + null, // not used anymore NamespaceFiles .builder() .enabled(true) @@ -79,10 +80,10 @@ public void noFilter() throws Exception { ); assertThat(injected.size(), is(3)); - tempDir = Files.walk(basePath).filter(path -> path.toFile().isFile()).toList(); - assertThat(tempDir.size(), is(3)); + workingDirFiles = runContext.workingDir().findAllFilesMatching(List.of("**/**")); + assertThat(workingDirFiles.size(), is(3)); fileContent = FileUtils.readFileToString( - tempDir.stream().filter(path -> path.toString().contains("b/c/d/1.sql")).findFirst().orElseThrow().toFile(), + workingDirFiles.stream().filter(path -> path.toString().contains("b/c/d/1.sql")).findFirst().orElseThrow().toFile(), "UTF-8" ); assertThat(fileContent, is(expectedFileContent)); @@ -90,8 +91,7 @@ public void noFilter() throws Exception { @Test public void filter() throws Exception { - Path basePath = Files.createTempDirectory("unit"); - String namespace = "io.kestra." + IdUtils.create(); + final String namespace = "io.kestra." + IdUtils.create(); put(null, namespace, "/a/b/c/1.sql", "1"); put(null, namespace, "/a/2.sql", "2"); @@ -99,11 +99,14 @@ public void filter() throws Exception { put(null, namespace, "/b/d/4.sql", "4"); put(null, namespace, "/c/5.sql", "5"); + final RunContext runContext = runContextFactory.of(); + final Path workingDir = runContext.workingDir().path(); + List injected = namespaceFilesService.inject( - runContextFactory.of(), + runContext, null, namespace, - basePath, + null, // not used anymore NamespaceFiles.builder() .include(List.of( "/a/**", @@ -118,13 +121,12 @@ public void filter() throws Exception { hasProperty("path", endsWith("3.sql")), hasProperty("path", endsWith("5.sql")) )); - List tempDirEntries = Files.walk(basePath).filter(path -> path.toFile().isFile()) - .map(Path::toString) - .toList(); - assertThat(tempDirEntries, containsInAnyOrder( - is(Paths.get(basePath.toString(), "/a/b/c/1.sql").toString()), - is(Paths.get(basePath.toString(), "/b/c/d/3.sql").toString()), - is(Paths.get(basePath.toString(), "/c/5.sql").toString()) + + List workingDirFiles = runContext.workingDir().findAllFilesMatching(List.of("**/**")); + assertThat(workingDirFiles, containsInAnyOrder( + is(Paths.get(workingDir.toString(), "/a/b/c/1.sql")), + is(Paths.get(workingDir.toString(), "/b/c/d/3.sql")), + is(Paths.get(workingDir.toString(), "/c/5.sql")) )); } @@ -135,12 +137,12 @@ public void tenant() throws Exception { put("tenant1", namespace, "/a/b/c/1.sql", "1"); put("tenant2", namespace, "/a/b/c/1.sql", "2"); - RunContext runContext = runContextFactory.of(); + RunContext runContextTenant1 = runContextFactory.of(Map.of("flow", Map.of("tenantId", "tenant1"))); List injected = namespaceFilesService.inject( - runContextFactory.of(), + runContextTenant1, "tenant1", namespace, - runContext.workingDir().path(), + null, // not used anymore NamespaceFiles .builder() .enabled(true) @@ -148,15 +150,15 @@ public void tenant() throws Exception { ); assertThat(injected.size(), is(1)); - String content = Files.walk(runContext.workingDir().path()).filter(path -> path.toFile().isFile()).findFirst().map(throwFunction(Files::readString)).orElseThrow(); + String content = runContextTenant1.workingDir().findAllFilesMatching(List.of("**/**")).stream().findFirst().map(throwFunction(Files::readString)).orElseThrow(); assertThat(content, is("1")); - runContext = runContextFactory.of(); + RunContext runContextTenant2 = runContextFactory.of(Map.of("flow", Map.of("tenantId", "tenant2"))); injected = namespaceFilesService.inject( - runContextFactory.of(), + runContextTenant2, "tenant2", namespace, - runContext.workingDir().path(), + null, // not used anymore NamespaceFiles .builder() .enabled(true) @@ -164,7 +166,7 @@ public void tenant() throws Exception { ); assertThat(injected.size(), is(1)); - content = Files.walk(runContext.workingDir().path()).filter(path -> path.toFile().isFile()).findFirst().map(throwFunction(Files::readString)).orElseThrow(); + content = runContextTenant2.workingDir().findAllFilesMatching(List.of("**/**")).stream().findFirst().map(throwFunction(Files::readString)).orElseThrow(); assertThat(content, is("2")); } diff --git a/core/src/test/java/io/kestra/core/runners/TestWorkingDir.java b/core/src/test/java/io/kestra/core/runners/TestWorkingDir.java index f47ffea9f50..acb7f1f6483 100644 --- a/core/src/test/java/io/kestra/core/runners/TestWorkingDir.java +++ b/core/src/test/java/io/kestra/core/runners/TestWorkingDir.java @@ -3,6 +3,7 @@ import io.kestra.core.utils.IdUtils; import java.io.IOException; +import java.io.InputStream; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; @@ -84,6 +85,16 @@ public Path createFile(String filename, byte[] content) throws IOException { return captureCreateFileAndGet(delegate.createFile(filename, content)); } + @Override + public Path createFile(String filename, InputStream content) throws IOException { + return captureCreateFileAndGet(delegate.createFile(filename, content)); + } + + @Override + public Path putFile(Path path, InputStream content) throws IOException { + return captureCreateFileAndGet(delegate.putFile(path, content)); + } + @Override public List findAllFilesMatching(List patterns) throws IOException { return delegate.findAllFilesMatching(patterns); diff --git a/core/src/test/java/io/kestra/core/storages/InternalNamespaceTest.java b/core/src/test/java/io/kestra/core/storages/InternalNamespaceTest.java new file mode 100644 index 00000000000..13600ec9879 --- /dev/null +++ b/core/src/test/java/io/kestra/core/storages/InternalNamespaceTest.java @@ -0,0 +1,183 @@ +package io.kestra.core.storages; + +import io.kestra.core.utils.IdUtils; +import io.kestra.core.utils.PathMatcherPredicate; +import io.kestra.storage.local.LocalStorage; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.is; + +class InternalNamespaceTest { + + private static final Logger logger = LoggerFactory.getLogger(InternalNamespaceTest.class); + + LocalStorage storageInterface; + + @BeforeEach + public void setUp() throws IOException { + Path basePath = Files.createTempDirectory("unit"); + storageInterface = new LocalStorage(); + storageInterface.setBasePath(basePath); + storageInterface.init(); + } + + @Test + void shouldGetAllNamespaceFiles() throws IOException { + // Given + final String namespaceId = "io.kestra." + IdUtils.create(); + final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface); + + // When + namespace.putFile(Path.of("/sub/dir/file1.txt"), new ByteArrayInputStream("1".getBytes())); + namespace.putFile(Path.of("/sub/dir/file2.txt"), new ByteArrayInputStream("2".getBytes())); + namespace.putFile(Path.of("/sub/dir/file3.txt"), new ByteArrayInputStream("3".getBytes())); + + // Then + assertThat(namespace.all(), containsInAnyOrder( + is(NamespaceFile.of(namespaceId, Path.of("sub/dir/file1.txt"))), + is(NamespaceFile.of(namespaceId, Path.of("sub/dir/file2.txt"))), + is(NamespaceFile.of(namespaceId, Path.of("sub/dir/file3.txt"))) + )); + } + + @Test + void shouldPutFileGivenNoTenant() throws IOException { + // Given + final String namespaceId = "io.kestra." + IdUtils.create(); + final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface); + + // When + NamespaceFile namespaceFile = namespace.putFile(Path.of("/sub/dir/file.txt"), new ByteArrayInputStream("1".getBytes())); + + // Then + assertThat(namespaceFile, is(NamespaceFile.of(namespaceId, Path.of("sub/dir/file.txt")))); + // Then + try (InputStream is = namespace.getFileContent(namespaceFile.path())) { + assertThat(new String(is.readAllBytes()), is("1")); + } + } + + @Test + void shouldSucceedPutFileGivenExistingFileForConflictOverwrite() throws IOException { + // Given + final String namespaceId = "io.kestra." + IdUtils.create(); + final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface); + + NamespaceFile namespaceFile = namespace.get(Path.of("/sub/dir/file.txt")); + + namespace.putFile(namespaceFile.path(), new ByteArrayInputStream("1".getBytes())); + + // When + namespace.putFile(namespaceFile.path(), new ByteArrayInputStream("2".getBytes()), Namespace.Conflicts.OVERWRITE); + + // Then + try (InputStream is = namespace.getFileContent(namespaceFile.path())) { + assertThat(new String(is.readAllBytes()), is("2")); + } + } + + @Test + void shouldFailPutFileGivenExistingFileForError() throws IOException { + // Given + final String namespaceId = "io.kestra." + IdUtils.create(); + final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface); + + NamespaceFile namespaceFile = namespace.get(Path.of("/sub/dir/file.txt")); + + namespace.putFile(namespaceFile.path(), new ByteArrayInputStream("1".getBytes())); + + // When - Then + Assertions.assertThrows( + IOException.class, + () -> namespace.putFile(namespaceFile.path(), new ByteArrayInputStream("2".getBytes()), Namespace.Conflicts.ERROR) + ); + } + + @Test + void shouldIgnorePutFileGivenExistingFileForSkip() throws IOException { + // Given + final String namespaceId = "io.kestra." + IdUtils.create(); + final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface); + + NamespaceFile namespaceFile = namespace.get(Path.of("/sub/dir/file.txt")); + + namespace.putFile(namespaceFile.path(), new ByteArrayInputStream("1".getBytes())); + + // When + namespace.putFile(namespaceFile.path(), new ByteArrayInputStream("2".getBytes()), Namespace.Conflicts.SKIP); + + // Then + try (InputStream is = namespace.getFileContent(namespaceFile.path())) { + assertThat(new String(is.readAllBytes()), is("1")); + } + } + + @Test + void shouldFindAllMatchingGivenNoTenant() throws IOException { + // Given + final String namespaceId = "io.kestra." + IdUtils.create(); + final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface); + + // When + namespace.putFile(Path.of("/a/b/c/1.sql"), new ByteArrayInputStream("1".getBytes())); + namespace.putFile(Path.of("/a/2.sql"), new ByteArrayInputStream("2".getBytes())); + namespace.putFile(Path.of("/b/c/d/3.sql"), new ByteArrayInputStream("3".getBytes())); + namespace.putFile(Path.of("/b/d/4.sql"), new ByteArrayInputStream("4".getBytes())); + namespace.putFile(Path.of("/c/5.sql"), new ByteArrayInputStream("5".getBytes())); + + List namespaceFiles = namespace.findAllFilesMatching(PathMatcherPredicate.builder() + .includes(List.of("/a/**", "c/**")) + .excludes(List.of("**/2.sql")) + .build() + ); + + // Then + assertThat(namespaceFiles.stream().map(NamespaceFile::path).toList(), containsInAnyOrder( + is(Path.of("a/b/c/1.sql")), + is(Path.of("b/c/d/3.sql")), + is(Path.of("c/5.sql")) + )); + } + + @Test + void shouldFindAllGivenTenant() throws IOException { + // Given + final String namespaceId = "io.kestra." + IdUtils.create(); + final InternalNamespace namespaceTenant1 = new InternalNamespace(logger, "tenant1", namespaceId, storageInterface); + NamespaceFile namespaceFile1 = namespaceTenant1.putFile(Path.of("/a/b/c/test.txt"), new ByteArrayInputStream("1".getBytes())); + + final InternalNamespace namespaceTenant2 = new InternalNamespace(logger, "tenant2", namespaceId, storageInterface); + NamespaceFile namespaceFile2 = namespaceTenant2.putFile(Path.of("/a/b/c/test.txt"), new ByteArrayInputStream("1".getBytes())); + + // When - Then + List allTenant1 = namespaceTenant1.all(); + assertThat(allTenant1.size(), is(1)); + assertThat(allTenant1, containsInAnyOrder(is(namespaceFile1))); + + List allTenant2 = namespaceTenant2.all(); + assertThat(allTenant2.size(), is(1)); + assertThat(allTenant2, containsInAnyOrder(is(namespaceFile2))); + } + + @Test + void shouldReturnNoNamespaceFileForEmptyNamespace() throws IOException { + // Given + final String namespaceId = "io.kestra." + IdUtils.create(); + final InternalNamespace namespace = new InternalNamespace(logger, null, namespaceId, storageInterface); + List namespaceFiles = namespace.findAllFilesMatching((unused) -> true); + assertThat(namespaceFiles.size(), is(0)); + } +} \ No newline at end of file diff --git a/core/src/test/java/io/kestra/core/storages/NamespaceFileTest.java b/core/src/test/java/io/kestra/core/storages/NamespaceFileTest.java new file mode 100644 index 00000000000..1bf86336867 --- /dev/null +++ b/core/src/test/java/io/kestra/core/storages/NamespaceFileTest.java @@ -0,0 +1,90 @@ +package io.kestra.core.storages; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.nio.file.Path; + +class NamespaceFileTest { + + private static final String namespace = "io.kestra.test"; + + @Test + void shouldThrowExceptionGivenNullNamespace() { + Assertions.assertThrows(NullPointerException.class, () -> NamespaceFile.of(null, (Path)null)); + } + + @Test + void shouldThrowExceptionGivenInvalidScheme() { + Assertions.assertThrows(IllegalArgumentException.class, () -> NamespaceFile.of(namespace, URI.create("file:///io/kestra/test/_files/sub/dir/file.txt"))); + } + + @Test + void shouldThrowExceptionGivenInvalidNamespace() { + Assertions.assertThrows(IllegalArgumentException.class, () -> NamespaceFile.of(namespace, URI.create("kestra:///com/acme/_files/sub/dir/file.txt"))); + } + + @Test + void shouldCreateGivenNamespaceAndValidStorageURI() { + Assertions.assertEquals(new NamespaceFile( + Path.of("sub/dir/file.txt"), + URI.create("kestra:///io/kestra/test/_files/sub/dir/file.txt"), + namespace + ), NamespaceFile.of(namespace, URI.create("kestra:///io/kestra/test/_files/sub/dir/file.txt")) + ); + } + + @Test + void shouldCreateGivenNamespaceAndValidRelativeURI() { + Assertions.assertEquals(new NamespaceFile( + Path.of("sub/dir/file.txt"), + URI.create("kestra:///io/kestra/test/_files/sub/dir/file.txt"), + namespace + ), NamespaceFile.of(namespace, URI.create("/sub/dir/file.txt")) + ); + } + + @Test + void shouldCreateGivenNamespaceAndPath() { + NamespaceFile expected = new NamespaceFile( + Path.of("sub/dir/file.txt"), + URI.create("kestra:///io/kestra/test/_files/sub/dir/file.txt"), + namespace + ); + + Assertions.assertEquals(expected, NamespaceFile.of(namespace, Path.of("sub/dir/file.txt"))); + Assertions.assertEquals(expected, NamespaceFile.of(namespace, Path.of("/sub/dir/file.txt"))); + Assertions.assertEquals(expected, NamespaceFile.of(namespace, Path.of("./sub/dir/file.txt"))); + } + + @Test + void shouldCreateGivenNamespaceAndNullPath() { + Assertions.assertEquals(new NamespaceFile( + Path.of(""), + URI.create("kestra:///io/kestra/test/_files/"), + namespace + ), NamespaceFile.of(namespace) + ); + } + + @Test + void shouldCreateGivenNamespaceAndRootPath() { + Assertions.assertEquals(new NamespaceFile( + Path.of(""), + URI.create("kestra:///io/kestra/test/_files/"), + namespace + ), NamespaceFile.of(namespace, Path.of("/")) + ); + } + + @Test + void shouldGetStoragePath() { + NamespaceFile namespaceFile = new NamespaceFile( + Path.of("sub/dir/file.txt"), + URI.create("kestra:///io/kestra/test/_files/sub/dir/file.txt"), + namespace + ); + Assertions.assertEquals(Path.of("/io/kestra/test/_files/sub/dir/file.txt"), namespaceFile.storagePath()); + } +} \ No newline at end of file diff --git a/core/src/test/java/io/kestra/core/utils/PathMatcherPredicateTest.java b/core/src/test/java/io/kestra/core/utils/PathMatcherPredicateTest.java new file mode 100644 index 00000000000..deefceaa20e --- /dev/null +++ b/core/src/test/java/io/kestra/core/utils/PathMatcherPredicateTest.java @@ -0,0 +1,112 @@ +package io.kestra.core.utils; + +import org.junit.jupiter.api.Test; + +import java.nio.file.Path; +import java.util.List; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class PathMatcherPredicateTest { + + @Test + void shouldSupportGlobExpression() { + PathMatcherPredicate predicate = PathMatcherPredicate.matches(List.of("glob:**/*")); + assertEquals(List.of("glob:**/*"), predicate.syntaxAndPatterns()); + } + + @Test + void shouldSupportRegexExpression() { + PathMatcherPredicate predicate = PathMatcherPredicate.matches(List.of("regex:.*\\.json")); + assertEquals(List.of("regex:.*\\.json"), predicate.syntaxAndPatterns()); + } + + @Test + void shouldAddMissingWildcardToGlobExpressions() { + PathMatcherPredicate predicate = PathMatcherPredicate.matches(List.of("test.txt")); + assertEquals(List.of("glob:**/test.txt"), predicate.syntaxAndPatterns()); + } + + @Test + void shouldUseGlobPatternForExpressionWithNoPrefix() { + PathMatcherPredicate predicate = PathMatcherPredicate.matches(List.of("**/*")); + assertEquals(List.of("glob:**/*"), predicate.syntaxAndPatterns()); + } + + @Test + void shouldAddBasePathForExpressionWithNoPrefix() { + assertEquals(List.of("glob:/sub/dir/**/*"), + PathMatcherPredicate.matches(Path.of("/sub/dir"), List.of("**/*")).syntaxAndPatterns() + ); + + assertEquals(List.of("glob:/sub/dir/**/*"), + PathMatcherPredicate.matches(Path.of("/sub/dir"), List.of("/**/*")).syntaxAndPatterns() + ); + } + + @Test + void shouldMatchAllGivenRecursiveGlobExpressionAndNoBasePath() { + // Given + List paths = Stream.of("/base/test.txt", "/base/sub/dir/test.txt").map(Path::of).toList(); + PathMatcherPredicate predicate = PathMatcherPredicate.matches(List.of("**/*.txt")); + // When + List filtered = paths.stream().filter(predicate).toList(); + // Then + assertEquals(paths, filtered); + } + + @Test + void shouldMatchAllGivenSimpleExpressionAndNoBasePath() { + // Given + List paths = Stream.of("/base/test.txt", "/base/sub/dir/test.txt").map(Path::of).toList(); + PathMatcherPredicate predicate = PathMatcherPredicate.matches(List.of("test.txt")); + // When + List filtered = paths.stream().filter(predicate).toList(); + // Then + assertEquals(paths, filtered); + } + + @Test + void shouldMatchGivenSimpleExpressionAndBasePath() { + // Given + List paths = Stream.of("/base/test.txt", "/base/sub/dir/test.txt").map(Path::of).toList(); + PathMatcherPredicate predicate = PathMatcherPredicate.matches(Path.of("/base"), List.of("test.txt")); + // When + List filtered = paths.stream().filter(predicate).toList(); + // Then + assertEquals(List.of(Path.of("/base/test.txt")), filtered); + } + + @Test + void shouldMatchGivenIncludeAndExcludeExpressions() { + // Given + List paths = List.of( + // When + Path.of("/a/b/c/1"), + Path.of("/a/2"), + Path.of("/b/c/d/3"), + Path.of("/b/d/4"), + Path.of("/c/5") + ); + Predicate predicate = PathMatcherPredicate.builder() + .includes(List.of("/a/**", "c/**")) + .excludes(List.of("**/2")) + .build(); + + // When + List filtered = paths.stream().filter(predicate).toList(); + + // Then + assertThat(filtered, containsInAnyOrder( + is(Path.of("/a/b/c/1")), + is(Path.of("/b/c/d/3")), + is(Path.of("/c/5")) + )); + + } +} \ No newline at end of file diff --git a/core/src/test/java/io/kestra/plugin/core/flow/WorkingDirectoryTest.java b/core/src/test/java/io/kestra/plugin/core/flow/WorkingDirectoryTest.java index 44baf6b1078..2e3ee5e44b8 100644 --- a/core/src/test/java/io/kestra/plugin/core/flow/WorkingDirectoryTest.java +++ b/core/src/test/java/io/kestra/plugin/core/flow/WorkingDirectoryTest.java @@ -138,8 +138,9 @@ public void outputFiles(RunnerUtils runnerUtils) throws TimeoutException, IOExce StorageContext storageContext = StorageContext.forTask(taskRun); InternalStorage storage = new InternalStorage( null, - storageContext - , storageInterface + storageContext, + storageInterface, + null ); URI uri = ((Map) outputs.get("outputFiles")).values() @@ -159,8 +160,9 @@ public void inputFiles(RunnerUtils runnerUtils) throws TimeoutException, IOExcep StorageContext storageContext = StorageContext.forTask(execution.getTaskRunList().get(1)); InternalStorage storage = new InternalStorage( null, - storageContext - , storageInterface + storageContext, + storageInterface, + null ); TaskRun taskRun = execution.getTaskRunList().get(1); @@ -184,8 +186,9 @@ public void cache(RunnerUtils runnerUtils) throws TimeoutException, IOException ); InternalStorage storage = new InternalStorage( null, - storageContext - , storageInterface + storageContext, + storageInterface, + null ); storage.deleteCacheFile("workingDir", null); diff --git a/core/src/test/java/io/kestra/plugin/core/namespace/DeleteFilesTest.java b/core/src/test/java/io/kestra/plugin/core/namespace/DeleteFilesTest.java index f5d0b25a734..bbc123b078d 100644 --- a/core/src/test/java/io/kestra/plugin/core/namespace/DeleteFilesTest.java +++ b/core/src/test/java/io/kestra/plugin/core/namespace/DeleteFilesTest.java @@ -1,10 +1,8 @@ package io.kestra.plugin.core.namespace; -import com.google.common.collect.ImmutableMap; -import io.kestra.core.runners.NamespaceFilesService; import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContextFactory; -import io.kestra.core.storages.StorageContext; +import io.kestra.core.storages.Namespace; import io.kestra.core.storages.StorageInterface; import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.TestsUtils; @@ -13,30 +11,27 @@ import org.junit.jupiter.api.Test; import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; import java.util.List; +import java.util.Map; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; @KestraTest public class DeleteFilesTest { @Inject StorageInterface storageInterface; - @Inject - NamespaceFilesService namespaceFilesService; - @Inject RunContextFactory runContextFactory; @Test - void delete() throws Exception { - String namespace = "io.kestra." + IdUtils.create(); - - put(namespace, "/a/b/test1.txt", "1"); - put(namespace, "/a/b/test2.txt", "1"); + void shouldDeleteNamespaceFilesForMatchingExpression() throws Exception { + // Given + String namespaceId = "io.kestra." + IdUtils.create(); DeleteFiles deleteFiles = DeleteFiles.builder() .id(DeleteFiles.class.getSimpleName()) @@ -45,19 +40,18 @@ void delete() throws Exception { .namespace("{{ inputs.namespace }}") .build(); - RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, deleteFiles, ImmutableMap.of("namespace", namespace)); + final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, deleteFiles, Map.of("namespace", namespaceId)); + final Namespace namespace = runContext.storage().namespace(namespaceId); - DeleteFiles.Output output = deleteFiles.run(runContext); + namespace.putFile(Path.of("/a/b/test1.txt"), new ByteArrayInputStream("1".getBytes(StandardCharsets.UTF_8))); + namespace.putFile(Path.of("/a/b/test2.txt"), new ByteArrayInputStream("2".getBytes(StandardCharsets.UTF_8))); - assertThat(namespaceFilesService.recursiveList(null, namespace, URI.create("/a/b/")).size(), is(1)); + assertThat(namespace.all("/a/b/", false).size(), is(2)); - } + // When + assertThat(deleteFiles.run(runContext), notNullValue()); - private void put(String namespace, String path, String content) throws IOException { - storageInterface.put( - null, - URI.create(StorageContext.namespaceFilePrefix(namespace) + path), - new ByteArrayInputStream(content.getBytes()) - ); + // Then + assertThat(namespace.all("/a/b/", false).size(), is(1)); } } diff --git a/core/src/test/java/io/kestra/plugin/core/namespace/DownloadFilesTest.java b/core/src/test/java/io/kestra/plugin/core/namespace/DownloadFilesTest.java index d1717e92f22..546aba21537 100644 --- a/core/src/test/java/io/kestra/plugin/core/namespace/DownloadFilesTest.java +++ b/core/src/test/java/io/kestra/plugin/core/namespace/DownloadFilesTest.java @@ -1,10 +1,8 @@ package io.kestra.plugin.core.namespace; -import com.google.common.collect.ImmutableMap; -import io.kestra.core.runners.NamespaceFilesService; import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContextFactory; -import io.kestra.core.storages.StorageContext; +import io.kestra.core.storages.Namespace; import io.kestra.core.storages.StorageInterface; import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.TestsUtils; @@ -13,9 +11,10 @@ import org.junit.jupiter.api.Test; import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; import java.util.List; +import java.util.Map; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -26,19 +25,12 @@ public class DownloadFilesTest { @Inject StorageInterface storageInterface; - @Inject - NamespaceFilesService namespaceFilesService; - @Inject RunContextFactory runContextFactory; @Test - void download() throws Exception { - String namespace = "io.kestra." + IdUtils.create(); - - put(namespace, "/a/b/test1.txt", "1"); - put(namespace, "/a/b/test2.txt", "1"); - + void shouldDownloadNamespaceFile() throws Exception { + String namespaceId = "io.kestra." + IdUtils.create(); DownloadFiles downloadFiles = DownloadFiles.builder() .id(DownloadFiles.class.getSimpleName()) .type(DownloadFiles.class.getName()) @@ -46,7 +38,11 @@ void download() throws Exception { .namespace("{{ inputs.namespace }}") .build(); - RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, downloadFiles, ImmutableMap.of("namespace", namespace)); + final RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, downloadFiles, Map.of("namespace", namespaceId)); + final Namespace namespace = runContext.storage().namespace(namespaceId); + + namespace.putFile(Path.of("/a/b/test1.txt"), new ByteArrayInputStream("1".getBytes(StandardCharsets.UTF_8))); + namespace.putFile(Path.of("/a/b/test2.txt"), new ByteArrayInputStream("2".getBytes(StandardCharsets.UTF_8))); DownloadFiles.Output output = downloadFiles.run(runContext); @@ -54,12 +50,4 @@ void download() throws Exception { assertThat(output.getFiles().get("/a/b/test1.txt"), notNullValue()); } - - private void put(String namespace, String path, String content) throws IOException { - storageInterface.put( - null, - URI.create(StorageContext.namespaceFilePrefix(namespace) + path), - new ByteArrayInputStream(content.getBytes()) - ); - } } diff --git a/core/src/test/java/io/kestra/plugin/core/namespace/UploadFilesTest.java b/core/src/test/java/io/kestra/plugin/core/namespace/UploadFilesTest.java index 2ae2d4bebf6..df3e592852b 100644 --- a/core/src/test/java/io/kestra/plugin/core/namespace/UploadFilesTest.java +++ b/core/src/test/java/io/kestra/plugin/core/namespace/UploadFilesTest.java @@ -2,9 +2,10 @@ import com.devskiller.friendly_id.FriendlyId; import com.google.common.collect.ImmutableMap; -import io.kestra.core.runners.NamespaceFilesService; import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.storages.Namespace; +import io.kestra.core.storages.NamespaceFile; import io.kestra.core.storages.StorageInterface; import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.TestsUtils; @@ -33,14 +34,11 @@ public class UploadFilesTest { @Inject StorageInterface storageInterface; - @Inject - NamespaceFilesService namespaceFilesService; - @Inject RunContextFactory runContextFactory; @Test - void uploadConflictError() throws Exception { + void shouldThrowExceptionGivenAlreadyExistingFileWhenConflictError() throws Exception { String namespace = "io.kestra." + IdUtils.create(); File file = new File(Objects.requireNonNull(UploadFilesTest.class.getClassLoader().getResource("application-test.yml")).toURI()); @@ -54,22 +52,19 @@ void uploadConflictError() throws Exception { .type(UploadFiles.class.getName()) .files(Map.of("/path/file.txt", fileStorage.toString())) .namespace(namespace) - .conflict(UploadFiles.ConflictAction.ERROR) + .conflict(Namespace.Conflicts.ERROR) .destination("/folder") .build(); RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, uploadFile, ImmutableMap.of()); uploadFile.run(runContext); - assertThat(namespaceFilesService.recursiveList(null, namespace, URI.create("")).size(), is(1)); - - assertThrows(RuntimeException.class, () -> { - uploadFile.run(runContext); - }); + assertThat(runContext.storage().namespace(namespace).all().size(), is(1)); + assertThrows(IOException.class, () -> uploadFile.run(runContext)); } @Test - void uploadConflictOverwrite() throws Exception { + void shouldPutFileGivenAlreadyExistingFileWhenConflictOverwrite() throws Exception { String namespace = "io.kestra." + IdUtils.create(); URI fileStorage = addToStorage("application-test.yml"); @@ -85,10 +80,11 @@ void uploadConflictOverwrite() throws Exception { RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, uploadFile, ImmutableMap.of("namespace", namespace)); uploadFile.run(runContext); - List namespaceFiles = namespaceFilesService.recursiveList(null, namespace, URI.create("")); + Namespace namespaceStorage = runContext.storage().namespace(namespace); + List namespaceFiles = namespaceStorage.all(); assertThat(namespaceFiles.size(), is(1)); - String previousFile = IOUtils.toString(namespaceFilesService.content(null, namespace, namespaceFiles.getFirst()), StandardCharsets.UTF_8); + String previousFile = IOUtils.toString(namespaceStorage.getFileContent(namespaceFiles.getFirst().path()), StandardCharsets.UTF_8); fileStorage = addToStorage("logback.xml"); uploadFile = uploadFile.toBuilder() @@ -97,16 +93,16 @@ void uploadConflictOverwrite() throws Exception { uploadFile.run(runContext); - namespaceFiles = namespaceFilesService.recursiveList(null, namespace, URI.create("")); + namespaceFiles = namespaceStorage.all(); assertThat(namespaceFiles.size(), is(1)); - String newFile = IOUtils.toString(namespaceFilesService.content(null, namespace, namespaceFiles.getFirst()), StandardCharsets.UTF_8); + String newFile = IOUtils.toString(namespaceStorage.getFileContent(namespaceFiles.getFirst().path()), StandardCharsets.UTF_8); assertThat(previousFile.equals(newFile), is(false)); } @Test - void uploadConflictSkip() throws Exception { + void shouldPutFileGivenAlreadyExistingFileWhenConflictSkip() throws Exception { String namespace = "io.kestra." + IdUtils.create(); URI fileStorage = addToStorage("application-test.yml"); @@ -116,17 +112,18 @@ void uploadConflictSkip() throws Exception { .type(UploadFiles.class.getName()) .files(Map.of("/path/file.txt", fileStorage.toString())) .namespace(namespace) - .conflict(UploadFiles.ConflictAction.SKIP) + .conflict(Namespace.Conflicts.SKIP) .destination("/folder") .build(); RunContext runContext = TestsUtils.mockRunContext(this.runContextFactory, uploadFile, ImmutableMap.of()); uploadFile.run(runContext); - List namespaceFiles = namespaceFilesService.recursiveList(null, namespace, URI.create("")); + Namespace namespaceStorage = runContext.storage().namespace(namespace); + List namespaceFiles = namespaceStorage.all(); assertThat(namespaceFiles.size(), is(1)); - String previousFile = IOUtils.toString(namespaceFilesService.content(null, namespace, namespaceFiles.getFirst()), StandardCharsets.UTF_8); + String previousFile = IOUtils.toString(namespaceStorage.getFileContent(namespaceFiles.getFirst().path()), StandardCharsets.UTF_8); fileStorage = addToStorage("logback.xml"); uploadFile = uploadFile.toBuilder() @@ -135,10 +132,10 @@ void uploadConflictSkip() throws Exception { uploadFile.run(runContext); - namespaceFiles = namespaceFilesService.recursiveList(null, namespace, URI.create("")); + namespaceFiles = namespaceStorage.all(); assertThat(namespaceFiles.size(), is(1)); - String newFile = IOUtils.toString(namespaceFilesService.content(null, namespace, namespaceFiles.getFirst()), StandardCharsets.UTF_8); + String newFile = IOUtils.toString(namespaceStorage.getFileContent(namespaceFiles.getFirst().path()), StandardCharsets.UTF_8); assertThat(previousFile.equals(newFile), is(true)); } diff --git a/script/src/main/java/io/kestra/plugin/scripts/exec/scripts/runners/CommandsWrapper.java b/script/src/main/java/io/kestra/plugin/scripts/exec/scripts/runners/CommandsWrapper.java index 49fc3865075..d55e93d31be 100644 --- a/script/src/main/java/io/kestra/plugin/scripts/exec/scripts/runners/CommandsWrapper.java +++ b/script/src/main/java/io/kestra/plugin/scripts/exec/scripts/runners/CommandsWrapper.java @@ -5,10 +5,11 @@ import io.kestra.core.models.tasks.runners.*; import io.kestra.core.runners.DefaultRunContext; import io.kestra.core.runners.RunContextInitializer; +import io.kestra.core.storages.NamespaceFile; +import io.kestra.core.utils.Rethrow; import io.kestra.plugin.core.runner.Process; import io.kestra.core.models.tasks.NamespaceFiles; import io.kestra.core.runners.FilesService; -import io.kestra.core.runners.NamespaceFilesService; import io.kestra.core.runners.RunContext; import io.kestra.core.utils.IdUtils; import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions; @@ -20,6 +21,7 @@ import lombok.With; import java.io.IOException; +import java.io.InputStream; import java.net.URI; import java.nio.file.Path; import java.time.Duration; @@ -129,22 +131,20 @@ public CommandsWrapper addEnv(Map envs) { return this; } - @SuppressWarnings("unchecked") public ScriptOutput run() throws Exception { List filesToUpload = new ArrayList<>(); - if (this.namespaceFiles != null) { - String tenantId = ((Map) runContext.getVariables().get("flow")).get("tenantId"); - String namespace = ((Map) runContext.getVariables().get("flow")).get("namespace"); - - NamespaceFilesService namespaceFilesService = ((DefaultRunContext)runContext).getApplicationContext().getBean(NamespaceFilesService.class); - List injectedFiles = namespaceFilesService.inject( - runContext, - tenantId, - namespace, - this.workingDirectory, - this.namespaceFiles - ); - injectedFiles.forEach(uri -> filesToUpload.add(uri.toString().substring(1))); // we need to remove the leading '/' + if (this.namespaceFiles != null && this.namespaceFiles.getEnabled()) { + + List matchedNamespaceFiles = runContext.storage() + .namespace() + .findAllFilesMatching(this.namespaceFiles.getInclude(), this.namespaceFiles.getExclude()); + + matchedNamespaceFiles.forEach(Rethrow.throwConsumer(namespaceFile -> { + InputStream content = runContext.storage().getFile(namespaceFile.uri()); + runContext.workingDir().createFile(namespaceFile.path().toString(), content); + })); + + matchedNamespaceFiles.forEach(file -> filesToUpload.add(file.path().toString())); } TaskRunner realTaskRunner = this.getTaskRunner(); diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/api/NamespaceFileController.java b/webserver/src/main/java/io/kestra/webserver/controllers/api/NamespaceFileController.java index 5c9d8fe1cf4..7ac9e29b2b8 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/api/NamespaceFileController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/api/NamespaceFileController.java @@ -2,6 +2,7 @@ import io.kestra.core.services.FlowService; import io.kestra.core.storages.FileAttributes; +import io.kestra.core.storages.NamespaceFile; import io.kestra.core.storages.StorageInterface; import io.kestra.core.tenant.TenantService; import io.kestra.core.utils.Rethrow; @@ -22,16 +23,16 @@ import java.io.*; import java.net.URI; import java.net.URISyntaxException; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.regex.Pattern; +import java.util.stream.Collectors; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; import java.util.zip.ZipOutputStream; -import static io.kestra.core.runners.NamespaceFilesService.toNamespacedStorageUri; - @Slf4j @Validated @Controller("/api/v1/namespaces") @@ -56,7 +57,7 @@ public List search( @Parameter(description = "The namespace id") @PathVariable String namespace, @Parameter(description = "The string the file path should contain") @QueryValue String q ) throws IOException, URISyntaxException { - URI baseNamespaceFilesUri = toNamespacedStorageUri(namespace, null); + URI baseNamespaceFilesUri = NamespaceFile.of(namespace).uri(); return storageInterface.allByPrefix(tenantService.resolveTenant(), baseNamespaceFilesUri, false).stream() .map(storageUri -> "/" + baseNamespaceFilesUri.relativize(storageUri).getPath()) .filter(path -> path.contains(q)).toList(); @@ -71,8 +72,7 @@ public StreamedFile file( ) throws IOException, URISyntaxException { forbiddenPathsGuard(path); - InputStream fileHandler = storageInterface.get(tenantService.resolveTenant(), toNamespacedStorageUri(namespace, path)); - + InputStream fileHandler = storageInterface.get(tenantService.resolveTenant(), NamespaceFile.of(namespace, path).uri()); return new StreamedFile(fileHandler, MediaType.APPLICATION_OCTET_STREAM_TYPE); } @@ -85,16 +85,15 @@ public FileAttributes stats( ) throws IOException, URISyntaxException { forbiddenPathsGuard(path); - // if stats is performed upon namespace root and it doesn't exist yet, we create it + // if stats is performed upon namespace root, and it doesn't exist yet, we create it if (path == null) { - if(!storageInterface.exists(tenantService.resolveTenant(), toNamespacedStorageUri(namespace, null))) { - storageInterface.createDirectory(tenantService.resolveTenant(), toNamespacedStorageUri(namespace, null)); + if(!storageInterface.exists(tenantService.resolveTenant(), NamespaceFile.of(namespace).uri())) { + storageInterface.createDirectory(tenantService.resolveTenant(), NamespaceFile.of(namespace).uri()); } - - return storageInterface.getAttributes(tenantService.resolveTenant(), toNamespacedStorageUri(namespace, null)); + return storageInterface.getAttributes(tenantService.resolveTenant(), NamespaceFile.of(namespace).uri()); } - return storageInterface.getAttributes(tenantService.resolveTenant(), toNamespacedStorageUri(namespace, path)); + return storageInterface.getAttributes(tenantService.resolveTenant(), NamespaceFile.of(namespace, path).uri()); } @ExecuteOn(TaskExecutors.IO) @@ -112,12 +111,12 @@ public List list( String pathString = path.getPath(); - if (pathString.equals("/") && !storageInterface.exists(tenantService.resolveTenant(), toNamespacedStorageUri(namespace, null))) { - storageInterface.createDirectory(tenantService.resolveTenant(), toNamespacedStorageUri(namespace, null)); + if (pathString.equals("/") && !storageInterface.exists(tenantService.resolveTenant(), NamespaceFile.of(namespace).uri())) { + storageInterface.createDirectory(tenantService.resolveTenant(), NamespaceFile.of(namespace).uri()); return Collections.emptyList(); } - return storageInterface.list(tenantService.resolveTenant(), toNamespacedStorageUri(namespace, path)); + return storageInterface.list(tenantService.resolveTenant(), NamespaceFile.of(namespace, path).uri()); } @ExecuteOn(TaskExecutors.IO) @@ -129,7 +128,7 @@ public void createDirectory( ) throws IOException, URISyntaxException { forbiddenPathsGuard(path); - storageInterface.createDirectory(tenantService.resolveTenant(), toNamespacedStorageUri(namespace, path)); + storageInterface.createDirectory(tenantService.resolveTenant(), NamespaceFile.of(namespace, path).uri()); } @ExecuteOn(TaskExecutors.IO) @@ -178,7 +177,7 @@ private void putNamespaceFile(String tenantId, String namespace, URI path, Buffe return; } - storageInterface.put(tenantId, toNamespacedStorageUri(namespace, path), inputStream); + storageInterface.put(tenantId, NamespaceFile.of(namespace, path).uri(), inputStream); } protected void importFlow(String tenantId, String source) { @@ -194,7 +193,7 @@ public HttpResponse export( try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); ZipOutputStream archive = new ZipOutputStream(bos)) { - URI baseNamespaceFilesUri = toNamespacedStorageUri(namespace, null); + URI baseNamespaceFilesUri = NamespaceFile.of(namespace).uri(); String tenantId = tenantService.resolveTenant(); storageInterface.allByPrefix(tenantId, baseNamespaceFilesUri, false).forEach(Rethrow.throwConsumer(uri -> { try (InputStream inputStream = storageInterface.get(tenantId, uri)) { @@ -231,7 +230,7 @@ public void move( ensureWritableNamespaceFile(from); ensureWritableNamespaceFile(to); - storageInterface.move(tenantService.resolveTenant(), toNamespacedStorageUri(namespace, from), toNamespacedStorageUri(namespace, to)); + storageInterface.move(tenantService.resolveTenant(), NamespaceFile.of(namespace, from).uri(),NamespaceFile.of(namespace, to).uri()); } @ExecuteOn(TaskExecutors.IO) @@ -244,10 +243,11 @@ public void delete( ensureWritableNamespaceFile(path); String pathWithoutScheme = path.getPath(); - List allNamespaceFilesPaths = new ArrayList<>(storageInterface.allByPrefix(tenantService.resolveTenant(), toNamespacedStorageUri(namespace, null), true).stream() - .map(toNamespacedStorageUri(namespace, null)::relativize) - .map(uri -> "/" + uri.getPath()) - .toList()); + + List allNamespaceFilesPaths = storageInterface.allByPrefix(tenantService.resolveTenant(), NamespaceFile.of(namespace).storagePath().toUri(), true) + .stream() + .map(uri -> NamespaceFile.of(namespace, uri).path(true).toString()) + .collect(Collectors.toCollection(ArrayList::new)); if (allNamespaceFilesPaths.contains(pathWithoutScheme + "/")) { // the given path to delete is a directory @@ -267,7 +267,7 @@ public void delete( allNamespaceFilesPaths.removeIf(filesInParentFolder::contains); pathWithoutScheme = parentFolder.endsWith("/") ? parentFolder.substring(0, parentFolder.length() - 1) : parentFolder; } - storageInterface.delete(tenantService.resolveTenant(), toNamespacedStorageUri(namespace, URI.create(pathWithoutScheme))); + storageInterface.delete(tenantService.resolveTenant(), NamespaceFile.of(namespace, Path.of(pathWithoutScheme)).uri()); } private void forbiddenPathsGuard(URI path) { diff --git a/webserver/src/test/java/io/kestra/webserver/controllers/api/NamespaceFileControllerTest.java b/webserver/src/test/java/io/kestra/webserver/controllers/api/NamespaceFileControllerTest.java index 454fa6ecca4..2ea622a682d 100644 --- a/webserver/src/test/java/io/kestra/webserver/controllers/api/NamespaceFileControllerTest.java +++ b/webserver/src/test/java/io/kestra/webserver/controllers/api/NamespaceFileControllerTest.java @@ -4,6 +4,7 @@ import io.kestra.core.repositories.FlowRepositoryInterface; import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.storages.FileAttributes; +import io.kestra.core.storages.NamespaceFile; import io.kestra.core.storages.StorageContext; import io.kestra.core.storages.StorageInterface; import io.kestra.plugin.core.flow.Subflow; @@ -32,6 +33,7 @@ import java.io.InputStream; import java.net.URI; import java.nio.file.Files; +import java.nio.file.Path; import java.util.List; import java.util.Optional; @@ -264,7 +266,7 @@ private void assertForbiddenErrorThrown(Executable executable) { } private URI toNamespacedStorageUri(String namespace, @Nullable URI relativePath) { - return URI.create(StorageContext.namespaceFilePrefix(namespace) + Optional.ofNullable(relativePath).map(URI::getPath).orElse("")); + return NamespaceFile.of(namespace, relativePath).storagePath().toUri(); } @Getter