Skip to content

Commit

Permalink
Add ability to schedule Event Definitions with a cron expression (#19563
Browse files Browse the repository at this point in the history
)

* Add ability to schedule Filter/Aggregation Events with cron scheduling
* Refactor CronUtils. Add Time Zone input label
* Fix issue when trying to catch up on missed executions during server downtime
* Resolve conflation of nextTo and nextTime. Add some executionjob tests
* Refactor endpoint for cron validation
* changelog
* Address PR feedback. Add cronstrue module to describe cron expressions on frontend
* Add enterprise pr/issue to changelog
* Add cron schedule info to EventDefinitionDescription
* Refactor cron description to utility function for uniform behavior
* Allow null cron fields. Set timezone properly if field is never touched
  • Loading branch information
kingzacko1 authored Jun 27, 2024
1 parent 914f5c1 commit b6a52c7
Show file tree
Hide file tree
Showing 22 changed files with 654 additions and 64 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/pr-19563.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type = "a"
message = "Added support for scheduling Event Definitions using cron expressions."

issues = ["graylog-plugin-enterprise#6598"]
pulls = ["19563", "graylog-plugin-enterprise#7526"]
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public abstract class AggregationEventProcessorConfigEntity implements EventProc
private static final String FIELD_SEARCH_WITHIN_MS = "search_within_ms";
private static final String FIELD_EXECUTE_EVERY_MS = "execute_every_ms";
private static final String FIELD_EVENT_LIMIT = "event_limit";
private static final String FIELD_USE_CRON_SCHEDULING = "use_cron_scheduling";
private static final String FIELD_CRON_EXPRESSION = "cron_expression";
private static final String FIELD_CRON_TIMEZONE = "cron_timezone";

@JsonProperty(FIELD_QUERY)
public abstract ValueReference query();
Expand All @@ -85,6 +88,15 @@ public abstract class AggregationEventProcessorConfigEntity implements EventProc
@JsonProperty(FIELD_EXECUTE_EVERY_MS)
public abstract long executeEveryMs();

@JsonProperty(FIELD_USE_CRON_SCHEDULING)
public abstract boolean useCronScheduling();

@JsonProperty(FIELD_CRON_EXPRESSION)
public abstract Optional<String> cronExpression();

@JsonProperty(FIELD_CRON_TIMEZONE)
public abstract Optional<String> cronTimezone();

@JsonProperty(FIELD_EVENT_LIMIT)
public abstract int eventLimit();

Expand All @@ -102,6 +114,7 @@ public static Builder create() {
return new AutoValue_AggregationEventProcessorConfigEntity.Builder()
.type(TYPE_NAME)
.filters(Collections.emptyList())
.useCronScheduling(false)
.eventLimit(0);
}

Expand Down Expand Up @@ -129,6 +142,15 @@ public static Builder create() {
@JsonProperty(FIELD_EXECUTE_EVERY_MS)
public abstract Builder executeEveryMs(long executeEveryMs);

@JsonProperty(FIELD_USE_CRON_SCHEDULING)
public abstract Builder useCronScheduling(boolean useCronScheduling);

@JsonProperty(FIELD_CRON_EXPRESSION)
public abstract Builder cronExpression(@Nullable String cronExpression);

@JsonProperty(FIELD_CRON_TIMEZONE)
public abstract Builder cronTimezone(@Nullable String cronTimezone);

@JsonProperty(FIELD_EVENT_LIMIT)
public abstract Builder eventLimit(Integer eventLimit);

Expand Down Expand Up @@ -163,6 +185,9 @@ public EventProcessorConfig toNativeEntity(Map<String, ValueReference> parameter
.conditions(conditions().orElse(null))
.executeEveryMs(executeEveryMs())
.searchWithinMs(searchWithinMs())
.useCronScheduling(useCronScheduling())
.cronExpression(cronExpression().orElse(null))
.cronTimezone(cronTimezone().orElse(null))
.eventLimit(eventLimit())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.auto.value.AutoValue;
import com.google.inject.assistedinject.Assisted;
import jakarta.inject.Inject;
import org.graylog.events.configuration.EventsConfigurationProvider;
import org.graylog.scheduler.Job;
import org.graylog.scheduler.JobDefinitionConfig;
Expand All @@ -36,8 +37,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.inject.Inject;

import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -116,8 +115,12 @@ public JobTriggerUpdate execute(JobExecutionContext ctx) throws JobExecutionExce
eventProcessorEngine.execute(config.eventDefinitionId(), parameters);

// By using the processingWindowSize and the processingHopSize we can implement hopping and tumbling
// windows. (a tumbling window is simply a hopping window where windowSize and hopSize are the same)
DateTime nextTo = to.plus(config.processingHopSize());
// windows. (a tumbling window is simply a hopping window where windowSize and hopSize are the same).
// If the job uses cron scheduling, we need to instead calculate the nextTo field based on the current to
// field as it is possible to skip contiguous time ranges with cron scheduling.
DateTime nextTo = config.isCron() ?
scheduleStrategies.nextTime(ctx.trigger(), to).orElse(to.plus(config.processingHopSize())) :
to.plus(config.processingHopSize());
DateTime nextFrom = nextTo.minus(config.processingWindowSize());

// If the event processor is catching up on old data (e.g. the server was shut down for a significant time),
Expand All @@ -127,7 +130,7 @@ public JobTriggerUpdate execute(JobExecutionContext ctx) throws JobExecutionExce
// If an event processor was configured with a processingHopSize greater than the processingWindowSize
// we can't use the catchup mode.
final long catchUpSize = configurationProvider.get().eventCatchupWindow();
if (catchUpSize > 0 && catchUpSize > config.processingWindowSize() && to.plus(catchUpSize).isBefore(now) &&
if (!config.isCron() && catchUpSize > 0 && catchUpSize > config.processingWindowSize() && to.plus(catchUpSize).isBefore(now) &&
config.processingHopSize() <= config.processingWindowSize()) {
final long chunkCount = catchUpSize / config.processingWindowSize();

Expand Down Expand Up @@ -209,6 +212,7 @@ public static abstract class Config implements JobDefinitionConfig {
private static final String FIELD_PARAMETERS = "parameters";
private static final String FIELD_PROCESSING_WINDOW_SIZE = "processing_window_size";
private static final String FIELD_PROCESSING_HOP_SIZE = "processing_hop_size";
private static final String FIELD_IS_CRON = "is_cron";

@JsonProperty(FIELD_EVENT_DEFINITION_ID)
public abstract String eventDefinitionId();
Expand All @@ -222,6 +226,9 @@ public static abstract class Config implements JobDefinitionConfig {
@JsonProperty(FIELD_PROCESSING_HOP_SIZE)
public abstract long processingHopSize();

@JsonProperty(FIELD_IS_CRON)
public abstract boolean isCron();

public static Builder builder() {
return Builder.create();
}
Expand All @@ -237,7 +244,7 @@ public boolean hasEqualSchedule(Config other) {
public static abstract class Builder implements JobDefinitionConfig.Builder<Builder> {
@JsonCreator
public static Builder create() {
return new AutoValue_EventProcessorExecutionJob_Config.Builder().type(TYPE_NAME);
return new AutoValue_EventProcessorExecutionJob_Config.Builder().type(TYPE_NAME).isCron(false);
}

@JsonProperty(FIELD_EVENT_DEFINITION_ID)
Expand All @@ -252,6 +259,9 @@ public static Builder create() {
@JsonProperty(FIELD_PROCESSING_HOP_SIZE)
public abstract Builder processingHopSize(long hopSize);

@JsonProperty(FIELD_IS_CRON)
public abstract Builder isCron(boolean isCron);

abstract Config autoBuild();

public Config build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@
import org.graylog.plugins.views.search.searchfilters.model.UsedSearchFilter;
import org.graylog.plugins.views.search.searchtypes.pivot.HasField;
import org.graylog.plugins.views.search.searchtypes.pivot.SeriesSpec;
import org.graylog.scheduler.JobSchedule;
import org.graylog.scheduler.clock.JobSchedulerClock;
import org.graylog.scheduler.schedule.CronJobSchedule;
import org.graylog.scheduler.schedule.CronUtils;
import org.graylog.scheduler.schedule.IntervalJobSchedule;
import org.graylog2.contentpacks.EntityDescriptorIds;
import org.graylog2.contentpacks.model.ModelId;
Expand Down Expand Up @@ -74,6 +77,9 @@ public abstract class AggregationEventProcessorConfig implements EventProcessorC
private static final String FIELD_CONDITIONS = "conditions";
private static final String FIELD_SEARCH_WITHIN_MS = "search_within_ms";
private static final String FIELD_EXECUTE_EVERY_MS = "execute_every_ms";
private static final String FIELD_USE_CRON_SCHEDULING = "use_cron_scheduling";
private static final String FIELD_CRON_EXPRESSION = "cron_expression";
private static final String FIELD_CRON_TIMEZONE = "cron_timezone";
private static final String FIELD_EVENT_LIMIT = "event_limit";

@JsonProperty(FIELD_QUERY)
Expand Down Expand Up @@ -103,10 +109,20 @@ public abstract class AggregationEventProcessorConfig implements EventProcessorC
@JsonProperty(FIELD_EXECUTE_EVERY_MS)
public abstract long executeEveryMs();

@JsonProperty(FIELD_USE_CRON_SCHEDULING)
public abstract boolean useCronScheduling();

@Nullable
@JsonProperty(FIELD_CRON_EXPRESSION)
public abstract String cronExpression();

@Nullable
@JsonProperty(FIELD_CRON_TIMEZONE)
public abstract String cronTimezone();

@JsonProperty(FIELD_EVENT_LIMIT)
public abstract int eventLimit();


@Override
public Set<String> requiredPermissions() {
// When there are no streams the event processor will search in all streams so we need to require the
Expand All @@ -130,7 +146,20 @@ public Optional<EventProcessorSchedulerConfig> toJobSchedulerConfig(EventDefinit
final DateTime now = clock.nowUTC();

// We need an initial timerange for the first execution of the event processor
final AbsoluteRange timerange = AbsoluteRange.create(now.minus(searchWithinMs()), now);
final AbsoluteRange timerange;
final JobSchedule schedule;
if (useCronScheduling()) {
CronJobSchedule cronJobSchedule = CronJobSchedule.builder()
.timezone(cronTimezone())
.cronExpression(cronExpression())
.build();
DateTime nextTime = cronJobSchedule.calculateNextTime(now, now, clock).orElse(now);
schedule = cronJobSchedule;
timerange = AbsoluteRange.create(nextTime.minus(searchWithinMs()), nextTime);
} else {
schedule = IntervalJobSchedule.builder().interval(executeEveryMs()).unit(TimeUnit.MILLISECONDS).build();
timerange = AbsoluteRange.create(now.minus(searchWithinMs()), now);
}

final EventProcessorExecutionJob.Config jobDefinitionConfig = EventProcessorExecutionJob.Config.builder()
.eventDefinitionId(eventDefinition.id())
Expand All @@ -139,10 +168,7 @@ public Optional<EventProcessorSchedulerConfig> toJobSchedulerConfig(EventDefinit
.parameters(AggregationEventProcessorParameters.builder()
.timerange(timerange)
.build())
.build();
final IntervalJobSchedule schedule = IntervalJobSchedule.builder()
.interval(executeEveryMs())
.unit(TimeUnit.MILLISECONDS)
.isCron(useCronScheduling())
.build();

return Optional.of(EventProcessorSchedulerConfig.create(jobDefinitionConfig, schedule));
Expand All @@ -156,6 +182,7 @@ public static Builder create() {
.queryParameters(ImmutableSet.of())
.filters(Collections.emptyList())
.type(TYPE_NAME)
.useCronScheduling(false)
.eventLimit(0);
}

Expand Down Expand Up @@ -189,6 +216,15 @@ public static Builder create() {
@JsonProperty(FIELD_EVENT_LIMIT)
public abstract Builder eventLimit(Integer eventLimit);

@JsonProperty(FIELD_USE_CRON_SCHEDULING)
public abstract Builder useCronScheduling(boolean useCronScheduling);

@JsonProperty(FIELD_CRON_EXPRESSION)
public abstract Builder cronExpression(String cronExpression);

@JsonProperty(FIELD_CRON_TIMEZONE)
public abstract Builder cronTimezone(String cronTimezone);

public abstract AggregationEventProcessorConfig build();
}

Expand Down Expand Up @@ -228,6 +264,18 @@ public ValidationResult validate() {
}
});

if (useCronScheduling()) {
if (cronExpression() == null || cronExpression().isEmpty()) {
validationResult.addError(FIELD_CRON_EXPRESSION, "Cron expression must not be empty when using cron scheduling");
} else {
try {
CronUtils.validateExpression(cronExpression());
} catch (Exception e) {
validationResult.addError(FIELD_CRON_EXPRESSION, e.getMessage());
}
}
}

return validationResult;
}

Expand Down Expand Up @@ -283,6 +331,9 @@ public EventProcessorConfigEntity toContentPackEntity(EntityDescriptorIds entity
.conditions(conditions().orElse(null))
.executeEveryMs(executeEveryMs())
.searchWithinMs(searchWithinMs())
.useCronScheduling(useCronScheduling())
.cronExpression(cronExpression())
.cronTimezone(cronTimezone())
.eventLimit(eventLimit())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.events.rest;

public record CronValidationRequest(String expression) { }
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.events.rest;

public record CronValidationResponse(String error, String description) {}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.graylog.events.processor.EventResolver;
import org.graylog.grn.GRNTypes;
import org.graylog.plugins.views.startpage.recentActivities.RecentActivityService;
import org.graylog.scheduler.schedule.CronUtils;
import org.graylog.security.UserContext;
import org.graylog2.audit.AuditEventSender;
import org.graylog2.audit.jersey.AuditEvent;
Expand Down Expand Up @@ -469,6 +470,21 @@ public ValidationResult validate(@ApiParam(name = "JSON body", required = true)
return validationResult;
}

@POST
@Path("/validate/cron_expression")
@NoAuditEvent("Validation only")
@ApiOperation(value = "Validate a cron expression")
@RequiresPermissions(RestPermissions.EVENT_DEFINITIONS_READ)
public CronValidationResponse validate(@ApiParam(name = "JSON body", required = true)
@Valid @NotNull CronValidationRequest toValidate) {
try {
CronUtils.validateExpression(toValidate.expression());
return new CronValidationResponse(null, CronUtils.describeExpression(toValidate.expression()));
} catch (IllegalArgumentException e) {
return new CronValidationResponse(e.getMessage(), null);
}
}

private void checkEventDefinitionPermissions(EventDefinitionDto dto, String action) {
final Set<String> missingPermissions = dto.requiredPermissions().stream()
.filter(permission -> !isPermitted(permission))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
*/
package org.graylog.scheduler;

import jakarta.inject.Inject;
import org.graylog.scheduler.clock.JobSchedulerClock;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.inject.Inject;

import java.util.Optional;

/**
Expand All @@ -48,10 +47,20 @@ public JobScheduleStrategies(JobSchedulerClock clock) {
* @return the next time this trigger should fire, empty optional if the trigger should not fire anymore
*/
public Optional<DateTime> nextTime(JobTriggerDto trigger) {
final DateTime lastNextTime = trigger.nextTime();
final DateTime lastExecutionTime = trigger.lock().lastLockTime();
return nextTime(trigger, trigger.nextTime());
}

return trigger.schedule().calculateNextTime(lastExecutionTime, lastNextTime, clock);
/**
* Calculates the next execution time for a trigger after a given date.
* <p>
* If this returns an empty {@link Optional}, the trigger should not be executed anymore.
*
* @param trigger the trigger to use for the calculation
* @param date the date to use when calculating the next time the trigger should fire
* @return the next time this trigger should fire, empty optional if the trigger should not fire anymore
*/
public Optional<DateTime> nextTime(JobTriggerDto trigger, DateTime date) {
return trigger.schedule().calculateNextTime(trigger.lock().lastLockTime(), date, clock);
}

/**
Expand Down
Loading

0 comments on commit b6a52c7

Please sign in to comment.