Skip to content

Commit

Permalink
refactor(core): allow access to NamespaceFiles through RunContext
Browse files Browse the repository at this point in the history
Changes:
* add new Namespace and NamespaceFile class
* add new methods Storage#namespace and Storage#namesapce(string)
* add new utility class PathMatcherPredicate
* Deprecate for removal NamespaceFilesService
  • Loading branch information
fhussonnois committed Jun 17, 2024
1 parent 4ff8c69 commit 1cb54f7
Show file tree
Hide file tree
Showing 29 changed files with 1,428 additions and 409 deletions.
84 changes: 54 additions & 30 deletions core/src/main/java/io/kestra/core/runners/LocalWorkingDir.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
package io.kestra.core.runners;

import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.PathMatcherPredicate;
import org.apache.commons.io.FileUtils;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.io.InputStream;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;

import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;

/**
* Default implementation of the {@link WorkingDir}.
Expand Down Expand Up @@ -128,26 +131,62 @@ public Path createTempFile(final byte[] content, final String extension) throws
**/
@Override
public Path createFile(String filename) throws IOException {
return createFile(filename, null);
return createFile(filename, (InputStream) null);
}

/**
* {@inheritDoc}
**/
@Override
public Path createFile(String filename, byte[] content) throws IOException {
return createFile(filename, content == null ? null : new ByteArrayInputStream(content));
}

/**
* {@inheritDoc}
**/
@Override
public Path createFile(String filename, InputStream content) throws IOException {
if (filename == null || filename.isBlank()) {
throw new IllegalArgumentException("Cannot create a working directory file with a null or empty name");
}
Path newFilePath = this.resolve(Path.of(filename));
Files.createDirectories(newFilePath.getParent());
Path file = Files.createFile(newFilePath);

Files.createFile(newFilePath);

if (content != null) {
Files.write(file, content);
try (content) {
Files.copy(content, newFilePath, REPLACE_EXISTING);
}
}

return file;
return newFilePath;
}

/**
* {@inheritDoc}
**/
@Override
public Path putFile(Path path, InputStream inputStream) throws IOException {
if (path == null) {
throw new IllegalArgumentException("Cannot create a working directory file with a null path");
}
if (inputStream == null) {
throw new IllegalArgumentException("Cannot create a working directory file with an empty inputStream");
}
Path newFilePath = this.resolve(path);
Files.createDirectories(newFilePath.getParent());

if (Files.notExists(newFilePath)) {
Files.createFile(newFilePath);
}

try (inputStream) {
Files.copy(inputStream, newFilePath, REPLACE_EXISTING);
}

return newFilePath;
}

/**
Expand All @@ -158,7 +197,7 @@ public List<Path> findAllFilesMatching(final List<String> patterns) throws IOExc
if (patterns == null || patterns.isEmpty()) {
return Collections.emptyList();
}
MatcherFileVisitor visitor = new MatcherFileVisitor(path(), patterns);
MatcherFileVisitor visitor = new MatcherFileVisitor(PathMatcherPredicate.matches(path(), patterns));
Files.walkFileTree(path(), visitor);
return visitor.getMatchedFiles();
}
Expand All @@ -175,31 +214,24 @@ public void cleanup() throws IOException {

private static class MatcherFileVisitor extends SimpleFileVisitor<Path> {

private static final String SYNTAX_GLOB = "glob:";
private static final String SYNTAX_REGEX = "regex:";

private final List<PathMatcher> matchers;
private final Predicate<Path> predicate;
private final List<Path> matchedFiles = new ArrayList<>();

public MatcherFileVisitor(final Path basePath, final List<String> patterns) {
FileSystem fs = FileSystems.getDefault();
this.matchers = patterns.stream()
.map(pattern -> {
var syntaxAndPattern = isPrefixWithSyntax(pattern) ? pattern : SYNTAX_GLOB + basePath + addLeadingSlash(pattern);
return fs.getPathMatcher(syntaxAndPattern);
})
.toList();
public MatcherFileVisitor(final Predicate<Path> predicate) {
this.predicate = predicate;
}

/** {@inheritDoc} **/
/**
* {@inheritDoc}
**/
@Override
public FileVisitResult visitFile(final Path path, final BasicFileAttributes basicFileAttributes) {
if (!basicFileAttributes.isRegularFile()) {
// make sure we never follow symlink
return FileVisitResult.CONTINUE;
}

if (matchers.stream().anyMatch(pathMatcher -> pathMatcher.matches(path))) {
if (predicate.test(path)) {
matchedFiles.add(path);
}

Expand All @@ -209,13 +241,5 @@ public FileVisitResult visitFile(final Path path, final BasicFileAttributes basi
public List<Path> getMatchedFiles() {
return matchedFiles;
}

private static String addLeadingSlash(final String path) {
return path.startsWith("/") ? path : "/" + path;
}

private static boolean isPrefixWithSyntax(final String pattern) {
return pattern.startsWith(SYNTAX_REGEX) | pattern.startsWith(SYNTAX_GLOB);
}
}
}
117 changes: 40 additions & 77 deletions core/src/main/java/io/kestra/core/runners/NamespaceFilesService.java
Original file line number Diff line number Diff line change
@@ -1,70 +1,53 @@
package io.kestra.core.runners;

import io.kestra.core.models.tasks.NamespaceFiles;
import io.kestra.core.storages.InternalNamespace;
import io.kestra.core.storages.Namespace;
import io.kestra.core.storages.NamespaceFile;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.Rethrow;
import io.micronaut.core.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import static io.kestra.core.utils.Rethrow.throwConsumer;
import static io.kestra.core.utils.Rethrow.throwPredicate;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;

@Deprecated
@Singleton
@Slf4j
public class NamespaceFilesService {

private static final Logger log = LoggerFactory.getLogger(NamespaceFilesService.class);

private final StorageInterface storageInterface;

@Inject
private StorageInterface storageInterface;
public NamespaceFilesService(final StorageInterface storageInterface) {
this.storageInterface = storageInterface;
}

@Deprecated
public List<URI> inject(RunContext runContext, String tenantId, String namespace, Path basePath, NamespaceFiles namespaceFiles) throws Exception {
if (!namespaceFiles.getEnabled()) {
return Collections.emptyList();
}

List<URI> list = recursiveList(tenantId, namespace, null);

list = list
.stream()
.filter(throwPredicate(f -> {
var file = f.getPath();

if (namespaceFiles.getExclude() != null) {
boolean b = match(runContext.render(namespaceFiles.getExclude()), file);

if (b) {
return false;
}
}

if (namespaceFiles.getInclude() != null) {
boolean b = match(namespaceFiles.getInclude(), file);

if (!b) {
return false;
}
}

return true;
}))
.collect(Collectors.toList());
List<NamespaceFile> matchedNamespaceFiles = namespaceFor(tenantId, namespace)
.findAllFilesMatching(namespaceFiles.getInclude(), namespaceFiles.getExclude());

copy(tenantId, namespace, basePath, list);

return list;
matchedNamespaceFiles.forEach(Rethrow.throwConsumer(namespaceFile -> {
InputStream content = storageInterface.get(tenantId, namespaceFile.uri());
runContext.workingDir().putFile(namespaceFile.path(), content);
}));
return matchedNamespaceFiles.stream().map(NamespaceFile::path).map(Path::toUri).toList();
}

public URI uri(String namespace, @Nullable URI path) {
Expand All @@ -75,61 +58,41 @@ public URI uri(String namespace, @Nullable URI path) {
}

public List<URI> recursiveList(String tenantId, String namespace, @Nullable URI path) throws IOException {
return this.recursiveList(tenantId, namespace, path, false);
return recursiveList(tenantId, namespace, path, false);
}

public List<URI> recursiveList(String tenantId, String namespace, @Nullable URI path, boolean includeDirectoryEntries) throws IOException {
return storageInterface.allByPrefix(tenantId, URI.create(this.uri(namespace, path) + "/"), includeDirectoryEntries)
// We get rid of Kestra schema as we want to work on a folder-like basis
.stream().map(URI::getPath)
.map(URI::create)
.map(uri -> URI.create("/" + this.uri(namespace, null).relativize(uri)))
return namespaceFor(tenantId, namespace)
.all(path.getPath(), includeDirectoryEntries)
.stream()
.map(NamespaceFile::path)
.map(Path::toUri)
.toList();
}

private InternalNamespace namespaceFor(String tenantId, String namespace) {
return new InternalNamespace(log, tenantId, namespace, storageInterface);
}

public boolean delete(String tenantId, String namespace, URI path) throws IOException {
return storageInterface.delete(tenantId, this.uri(namespace, path));
return namespaceFor(tenantId, namespace).delete(Path.of(path));
}

public URI createFile(String tenantId, String namespace, URI path, InputStream inputStream) throws IOException {
return storageInterface.put(tenantId, this.uri(namespace, path), inputStream);
return namespaceFor(tenantId, namespace).putFile(Path.of(path), inputStream, Namespace.Conflicts.OVERWRITE)
.path()
.toUri();
}

public URI createDirectory(String tenantId, String namespace, URI path) throws IOException {
return storageInterface.createDirectory(tenantId, this.uri(namespace, path));
}

private static boolean match(List<String> patterns, String file) {
return patterns
.stream()
.anyMatch(s -> FileSystems
.getDefault()
.getPathMatcher("glob:" + (s.matches("\\w+[\\s\\S]*") ? "**/" + s : s))
.matches(Paths.get(file))
);
return namespaceFor(tenantId, namespace).createDirectory(Path.of(path.getPath()));
}

public InputStream content(String tenantId, String namespace, URI path) throws IOException {
return storageInterface.get(tenantId, uri(namespace, path));
return namespaceFor(tenantId, namespace).getFileContent(Path.of(path));
}

public static URI toNamespacedStorageUri(String namespace, @Nullable URI relativePath) {
return URI.create("kestra://" + StorageContext.namespaceFilePrefix(namespace) + Optional.ofNullable(relativePath).map(URI::getPath).orElse("/"));
}

private void copy(String tenantId, String namespace, Path basePath, List<URI> files) throws IOException {
files
.forEach(throwConsumer(f -> {
Path destination = Paths.get(basePath.toString(), f.getPath());

if (!destination.getParent().toFile().exists()) {
//noinspection ResultOfMethodCallIgnored
destination.getParent().toFile().mkdirs();
}

try (InputStream inputStream = this.content(tenantId, namespace, f)) {
Files.copy(inputStream, destination, REPLACE_EXISTING);
}
}));
return NamespaceFile.of(namespace, relativePath).uri();
}
}
17 changes: 14 additions & 3 deletions core/src/main/java/io/kestra/core/runners/RunContextFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.plugins.PluginConfigurations;
import io.kestra.core.services.FlowService;
import io.kestra.core.storages.InternalStorage;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
Expand All @@ -34,6 +35,9 @@ public class RunContextFactory {
@Inject
protected StorageInterface storageInterface;

@Inject
protected FlowService flowService;

@Inject
protected MetricRegistry metricRegistry;

Expand Down Expand Up @@ -62,7 +66,7 @@ public RunContext of(Flow flow, Execution execution) {
.withLogger(runContextLogger)
// Execution
.withPluginConfiguration(Map.of())
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forExecution(execution), storageInterface))
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forExecution(execution), storageInterface, flowService))
.withVariables(newRunVariablesBuilder()
.withFlow(flow)
.withExecution(execution)
Expand All @@ -83,7 +87,7 @@ public RunContext of(Flow flow, Task task, Execution execution, TaskRun taskRun,
.withLogger(runContextLogger)
// Task
.withPluginConfiguration(pluginConfigurations.getConfigurationByPluginTypeOrAliases(task.getType(), task.getClass()))
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface))
.withStorage(new InternalStorage(runContextLogger.logger(), StorageContext.forTask(taskRun), storageInterface, flowService))
.withVariables(newRunVariablesBuilder()
.withFlow(flow)
.withTask(task)
Expand Down Expand Up @@ -122,8 +126,15 @@ public RunContext of(final Map<String, Object> variables) {
public URI getContextStorageURI() {
return URI.create("");
}

@Override
public String getTenantId() {
var tenantId = ((Map<String, Object>)variables.getOrDefault("flow", Map.of())).get("tenantId");
return Optional.ofNullable(tenantId).map(Object::toString).orElse(null);
}
},
storageInterface
storageInterface,
flowService
))
.withVariables(variables)
.build();
Expand Down
Loading

0 comments on commit 1cb54f7

Please sign in to comment.