Skip to content

Commit

Permalink
feat(jdbc): clean more eagerly some queues based on configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Feb 12, 2025
1 parent 9d717ca commit 13ac335
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 2 deletions.
5 changes: 5 additions & 0 deletions cli/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ kestra:
initial-delay: 1h
fixed-delay: 1h
retention: 7d
types:
- type : io.kestra.core.models.executions.LogEntry
retention: 1h
- type: io.kestra.core.models.executions.MetricEntry
retention: 1h

plugins:
repositories:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.kestra.runner.h2;

import io.kestra.jdbc.repository.AbstractJdbcRepository;
import io.kestra.jdbc.runner.JdbcCleanerService;
import jakarta.inject.Singleton;
import org.jooq.Condition;

@Singleton
@H2QueueEnabled
public class H2JdbcCleanerService implements JdbcCleanerService {
@Override
public Condition buildTypeCondition(String type) {
return AbstractJdbcRepository.field("type").eq(type);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.kestra.runner.mysql;

import io.kestra.jdbc.repository.AbstractJdbcRepository;
import io.kestra.jdbc.runner.JdbcCleanerService;
import jakarta.inject.Singleton;
import org.jooq.Condition;

@Singleton
@MysqlQueueEnabled
public class MysqlJdbcCleanerService implements JdbcCleanerService {
@Override
public Condition buildTypeCondition(String type) {
return AbstractJdbcRepository.field("type").eq(type);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.kestra.runner.postgres;

import io.kestra.jdbc.runner.JdbcCleanerService;
import jakarta.inject.Singleton;
import org.jooq.Condition;
import org.jooq.impl.DSL;

@Singleton
@PostgresQueueEnabled
public class PostgresJdbcCleanerService implements JdbcCleanerService {
@Override
public Condition buildTypeCondition(String type) {
return DSL.condition("type = CAST(? AS queue_type)", type);
}
}
32 changes: 30 additions & 2 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcCleaner.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package io.kestra.jdbc.runner;

import io.kestra.core.utils.ListUtils;
import io.kestra.jdbc.JdbcTableConfig;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.kestra.jdbc.repository.AbstractJdbcRepository;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.context.annotation.EachProperty;
import io.micronaut.context.annotation.Requires;
import io.micronaut.scheduling.annotation.Scheduled;
import jakarta.inject.Inject;
Expand All @@ -17,6 +19,7 @@

import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.List;

@Singleton
@JdbcRunnerEnabled
Expand All @@ -25,20 +28,37 @@
public class JdbcCleaner {
private final JooqDSLContextWrapper dslContextWrapper;
private final Configuration configuration;
protected final Table<Record> queueTable;
private final JdbcCleanerService jdbcCleanerService;
private final Table<Record> queueTable;

@Inject
public JdbcCleaner(@Named("queues") JdbcTableConfig jdbcTableConfig,
JooqDSLContextWrapper dslContextWrapper,
Configuration configuration
Configuration configuration,
JdbcCleanerService jdbcCleanerService
) {
this.dslContextWrapper = dslContextWrapper;
this.configuration = configuration;
this.jdbcCleanerService = jdbcCleanerService;

this.queueTable = DSL.table(jdbcTableConfig.table());
}

public void deleteQueue() {
// first, delete types that are configured more specifically
ListUtils.emptyOnNull(configuration.getTypes()).forEach(type -> {
dslContextWrapper.transaction(configuration -> {
int deleted = DSL
.using(configuration)
.delete(this.queueTable)
.where(AbstractJdbcRepository.field("updated").lessOrEqual(ZonedDateTime.now().minus(type.getRetention()).toOffsetDateTime()))
.and(jdbcCleanerService.buildTypeCondition(type.getType()))
.execute();
log.info("Cleaned {} records from {} for type {}", deleted, this.queueTable.getName(), type.getType());
});
});

// then, delete all other records
dslContextWrapper.transaction(configuration -> {
int deleted = DSL
.using(configuration)
Expand All @@ -61,5 +81,13 @@ public void report() {
@Getter
public static class Configuration {
Duration retention;
List<TypeConfiguration> types;

@Getter
@EachProperty(value = "types", list = true)
public static class TypeConfiguration {
String type;
Duration retention;
}
}
}
13 changes: 13 additions & 0 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcCleanerService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.kestra.jdbc.runner;

import org.jooq.Condition;

/**
* This service is used solely by the {@link JdbcCleaner} to handle database-specific queries.
*/
public interface JdbcCleanerService {
/**
* Build the condition for the <code>types</code> column of the <code>queues</code> table.
*/
Condition buildTypeCondition(String type);
}

0 comments on commit 13ac335

Please sign in to comment.