Skip to content

Commit

Permalink
Merge branch 'master' into refactor/cluster-config-mongodb-api-usage
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickmann authored Jan 22, 2025
2 parents 2649eca + 8b91f4f commit 952ef20
Show file tree
Hide file tree
Showing 126 changed files with 2,185 additions and 1,168 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/issue-19058.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type = "fixed"
message = "Fixing highlighting of message in message table by id."

issues = ["19058"]
pulls = ["21389"]
5 changes: 5 additions & 0 deletions changelog/unreleased/issue-19975.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type = "f"
message = "Catch and report exceptions during grok pattern matching."

issues=["19975"]
pulls = ["21290"]
5 changes: 5 additions & 0 deletions changelog/unreleased/issue-21185.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type = "f"
message = "Fix displaying very small percentages."

issues = ["21185"]
pulls = ["21368"]
4 changes: 4 additions & 0 deletions changelog/unreleased/pr-21123.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
type = "c"
message = "Adjust Event Priority field choices for consistency with Graylog Security."

pulls = ["21123"]
5 changes: 5 additions & 0 deletions changelog/unreleased/pr-21367.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type = "fixed"
message = "Adding missing API descriptions on method parameters for Simple Scripting API."

pulls = ["21367"]
issues = ["20821"]
Original file line number Diff line number Diff line change
Expand Up @@ -17,100 +17,81 @@
package org.graylog.plugins.pipelineprocessor.db.mongodb;

import com.google.common.collect.ImmutableSet;
import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
import com.mongodb.MongoException;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.Indexes;
import com.swrve.ratelimitedlogger.RateLimitedLog;
import jakarta.inject.Inject;
import org.graylog.plugins.pipelineprocessor.db.PipelineStreamConnectionsService;
import org.graylog.plugins.pipelineprocessor.events.PipelineConnectionsChangedEvent;
import org.graylog.plugins.pipelineprocessor.rest.PipelineConnections;
import org.graylog2.bindings.providers.MongoJackObjectMapperProvider;
import org.graylog2.database.MongoConnection;
import org.graylog2.database.MongoCollections;
import org.graylog2.database.NotFoundException;
import org.graylog2.database.utils.MongoUtils;
import org.graylog2.events.ClusterEventBus;
import org.mongojack.DBCursor;
import org.mongojack.DBQuery;
import org.mongojack.DBSort;
import org.mongojack.JacksonDBCollection;
import org.mongojack.WriteResult;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static com.mongodb.client.model.Filters.eq;
import static com.mongodb.client.model.Filters.in;
import static org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter.getRateLimitedLog;

public class MongoDbPipelineStreamConnectionsService implements PipelineStreamConnectionsService {
private static final RateLimitedLog log = getRateLimitedLog(MongoDbPipelineStreamConnectionsService.class);

private static final String COLLECTION = "pipeline_processor_pipelines_streams";

private final JacksonDBCollection<PipelineConnections, String> dbCollection;
private final ClusterEventBus clusterBus;
private final MongoCollection<PipelineConnections> collection;
private final MongoUtils<PipelineConnections> mongoUtils;

@Inject
public MongoDbPipelineStreamConnectionsService(MongoConnection mongoConnection,
MongoJackObjectMapperProvider mapper,
ClusterEventBus clusterBus) {
this.dbCollection = JacksonDBCollection.wrap(
mongoConnection.getDatabase().getCollection(COLLECTION),
PipelineConnections.class,
String.class,
mapper.get());
public MongoDbPipelineStreamConnectionsService(MongoCollections mongoCollections, ClusterEventBus clusterBus) {
this.clusterBus = clusterBus;
dbCollection.createIndex(DBSort.asc("stream_id"), new BasicDBObject("unique", true));
this.collection = mongoCollections.collection(COLLECTION, PipelineConnections.class);
this.mongoUtils = mongoCollections.utils(collection);

collection.createIndex(Indexes.ascending("stream_id"), new IndexOptions().unique(true));
}

@Override
public PipelineConnections save(PipelineConnections connections) {
PipelineConnections existingConnections = dbCollection.findOne(DBQuery.is("stream_id", connections.streamId()));
PipelineConnections existingConnections = collection.find(eq("stream_id", connections.streamId()))
.first();
if (existingConnections == null) {
existingConnections = PipelineConnections.create(null, connections.streamId(), Collections.emptySet());
}

final PipelineConnections toSave = existingConnections.toBuilder()
.pipelineIds(connections.pipelineIds()).build();
final WriteResult<PipelineConnections, String> save = dbCollection.save(toSave);

final PipelineConnections savedConnections = save.getSavedObject();
final PipelineConnections savedConnections = mongoUtils.save(toSave);
clusterBus.post(PipelineConnectionsChangedEvent.create(savedConnections.streamId(), savedConnections.pipelineIds()));

return savedConnections;
}

@Override
public PipelineConnections load(String streamId) throws NotFoundException {
final PipelineConnections oneById = dbCollection.findOne(DBQuery.is("stream_id", streamId));
final PipelineConnections oneById = collection.find(eq("stream_id", streamId)).first();
if (oneById == null) {
throw new NotFoundException("No pipeline connections with for stream " + streamId);
throw new NotFoundException("No pipeline connections for stream " + streamId);
}
return oneById;
}

@Override
public Set<PipelineConnections> loadAll() {
try (DBCursor<PipelineConnections> connections = dbCollection.find()) {
return ImmutableSet.copyOf((Iterable<PipelineConnections>) connections);
} catch (MongoException e) {
log.error("Unable to load pipeline connections", e);
return Collections.emptySet();
}
return ImmutableSet.copyOf(collection.find());
}

@Override
public Set<PipelineConnections> loadByPipelineId(String pipelineId) {
// Thanks, MongoJack!
// https://github.com/mongojack/mongojack/issues/12
final DBObject query = new BasicDBObject("pipeline_ids", new BasicDBObject("$in", Collections.singleton(pipelineId)));
try (DBCursor<PipelineConnections> pipelineConnections = dbCollection.find(query)) {
return ImmutableSet.copyOf((Iterable<PipelineConnections>) pipelineConnections);
} catch (MongoException e) {
log.error("Unable to load pipeline connections for pipeline ID " + pipelineId, e);
return Collections.emptySet();
}
return ImmutableSet.copyOf(collection.find(in("pipeline_ids", pipelineId)));
}

@Override
Expand All @@ -119,16 +100,17 @@ public void delete(String streamId) {
final PipelineConnections connections = load(streamId);
final Set<String> pipelineIds = connections.pipelineIds();

dbCollection.removeById(connections.id());
mongoUtils.deleteById(connections.id());
clusterBus.post(PipelineConnectionsChangedEvent.create(streamId, pipelineIds));
} catch (NotFoundException e) {
log.debug("No connections found for stream " + streamId);
log.debug("No connections found for stream {}", streamId);
}
}

@Override
public Map<String, PipelineConnections> loadByStreamIds(Collection<String> streamIds) {
return dbCollection.find(DBQuery.in("stream_id", streamIds)).toArray().stream()
.collect(Collectors.toMap(PipelineConnections::streamId, conn -> conn));
try (final var stream = MongoUtils.stream(collection.find(in("stream_id", streamIds)))) {
return stream.collect(Collectors.toMap(PipelineConnections::streamId, conn -> conn));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
import com.google.common.collect.ForwardingMap;
import io.krakens.grok.api.Grok;
import io.krakens.grok.api.Match;
import jakarta.inject.Inject;
import org.graylog.plugins.pipelineprocessor.EvaluationContext;
import org.graylog.plugins.pipelineprocessor.ast.functions.AbstractFunction;
import org.graylog.plugins.pipelineprocessor.ast.functions.FunctionArgs;
import org.graylog.plugins.pipelineprocessor.ast.functions.FunctionDescriptor;
import org.graylog.plugins.pipelineprocessor.ast.functions.ParameterDescriptor;
import org.graylog2.grok.GrokPatternRegistry;

import jakarta.inject.Inject;

import java.util.Map;

import static com.google.common.collect.ImmutableList.of;
Expand Down Expand Up @@ -63,8 +62,12 @@ public GrokResult evaluate(FunctionArgs args, EvaluationContext context) {

final Grok grok = grokPatternRegistry.cachedGrokForPattern(pattern, onlyNamedCaptures);

final Match match = grok.match(value);
return new GrokResult(match.captureFlattened());
try {
final Match match = grok.match(value);
return new GrokResult(match.captureFlattened());
} catch (StackOverflowError e) {
throw new IllegalStateException("Stack overflow during grok pattern matching");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,14 @@ public RegexMatchResult evaluate(FunctionArgs args, EvaluationContext context) {
final List<String> groupNames =
(List<String>) optionalGroupNames.optional(args, context).orElse(Collections.emptyList());

final Matcher matcher = regex.matcher(value);
final boolean matches = matcher.find();
try {
final Matcher matcher = regex.matcher(value);
final boolean matches = matcher.find();

return new RegexMatchResult(matches, matcher.toMatchResult(), groupNames);
return new RegexMatchResult(matches, matcher.toMatchResult(), groupNames);
} catch (StackOverflowError e) {
throw new IllegalStateException("Stack overflow during regex pattern matching");
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.auto.value.AutoValue;
import org.graylog2.database.BuildableMongoEntity;
import org.graylog2.database.MongoEntity;
import org.mongojack.Id;
import org.mongojack.ObjectId;

Expand All @@ -28,7 +30,8 @@

@AutoValue
@JsonAutoDetect
public abstract class PipelineConnections {
public abstract class PipelineConnections implements MongoEntity,
BuildableMongoEntity<PipelineConnections, PipelineConnections.Builder> {

@JsonProperty("id")
@Nullable
Expand Down Expand Up @@ -60,7 +63,7 @@ public static Builder builder() {
public abstract Builder toBuilder();

@AutoValue.Builder
public abstract static class Builder {
public abstract static class Builder implements BuildableMongoEntity.Builder<PipelineConnections, Builder> {
public abstract PipelineConnections build();

public abstract Builder id(String id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,92 +16,63 @@
*/
package org.graylog.plugins.pipelineprocessor.rulebuilder.db;

import com.google.common.collect.ImmutableSet;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoException;
import com.swrve.ratelimitedlogger.RateLimitedLog;
import org.bson.types.ObjectId;
import org.graylog2.bindings.providers.MongoJackObjectMapperProvider;
import org.graylog2.database.MongoConnection;
import org.mongojack.DBCursor;
import org.mongojack.DBQuery;
import org.mongojack.DBSort;
import org.mongojack.JacksonDBCollection;

import com.google.common.collect.ImmutableList;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.Indexes;
import com.mongodb.client.model.Sorts;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.graylog2.database.MongoCollections;
import org.graylog2.database.utils.MongoUtils;

import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;

import static org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter.getRateLimitedLog;
import static com.mongodb.client.model.Filters.eq;

@Singleton
public class MongoDBRuleFragmentService implements RuleFragmentService {

private static final RateLimitedLog log = getRateLimitedLog(MongoDBRuleFragmentService.class);

public static final String COLLECTION_NAME = "rule_fragments";

private final JacksonDBCollection<RuleFragment, ObjectId> dbCollection;
private final MongoCollection<RuleFragment> collection;
private final MongoUtils<RuleFragment> mongoUtils;

@Inject
public MongoDBRuleFragmentService(
final MongoJackObjectMapperProvider objectMapperProvider,
final MongoConnection mongoConnection
) {
this(JacksonDBCollection.wrap(
mongoConnection.getDatabase().getCollection(COLLECTION_NAME),
RuleFragment.class,
ObjectId.class,
objectMapperProvider.get())
);
}


public MongoDBRuleFragmentService(JacksonDBCollection<RuleFragment, ObjectId> dbCollection) {
this.dbCollection = Objects.requireNonNull(dbCollection);

this.dbCollection.createIndex(new BasicDBObject("name", 1), new BasicDBObject("unique", true));
public MongoDBRuleFragmentService(MongoCollections mongoCollections) {
collection = mongoCollections.collection(COLLECTION_NAME, RuleFragment.class);
mongoUtils = mongoCollections.utils(collection);
collection.createIndex(Indexes.ascending("name"), new IndexOptions().unique(true));
}

@Override
public RuleFragment save(RuleFragment ruleFragment) {
return dbCollection.save(ruleFragment).getSavedObject();
return mongoUtils.save(ruleFragment);
}

@Override
public void delete(String name) {
dbCollection.remove(DBQuery.is("name", name));
collection.deleteOne(eq("name", name));
}

@Override
public void deleteAll() {
dbCollection.remove(DBQuery.empty());
collection.deleteMany(Filters.empty());
}


@Override
public long count(String name) {
return dbCollection.getCount(DBQuery.is("name", name));
return collection.countDocuments(eq("name", name));
}

@Override
public Optional<RuleFragment> get(String name) {
return Optional.ofNullable(dbCollection.findOne(DBQuery.is("name", name)));
return Optional.ofNullable(collection.find(eq("name", name)).first());
}

@Override
public Collection<RuleFragment> all() {
try (DBCursor<RuleFragment> ruleDaos = dbCollection.find().sort(DBSort.asc("title"))) {
return ImmutableSet.copyOf((Iterable<RuleFragment>) ruleDaos);
} catch (MongoException e) {
log.error("Unable to load rule fragments", e);
return Collections.emptySet();
}
return ImmutableList.copyOf(collection.find().sort(Sorts.ascending("title")));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.auto.value.AutoValue;
import org.graylog.plugins.pipelineprocessor.ast.functions.FunctionDescriptor;
import org.graylog2.database.BuildableMongoEntity;
import org.graylog2.database.MongoEntity;
import org.mongojack.Id;
import org.mongojack.ObjectId;

Expand All @@ -30,7 +32,7 @@

@AutoValue
@JsonIgnoreProperties(value = {"name"}, allowGetters = true)
public abstract class RuleFragment {
public abstract class RuleFragment implements MongoEntity, BuildableMongoEntity<RuleFragment, RuleFragment.Builder> {

public static final String FIELD_NAME = "name";
public static final String FIELD_FRAGMENT = "fragment";
Expand Down Expand Up @@ -63,14 +65,12 @@ public String getName() {
@JsonProperty(FIELD_DESCRIPTOR)
public abstract FunctionDescriptor descriptor();


public static Builder builder() {
return new AutoValue_RuleFragment.Builder().isCondition(false);
}


@AutoValue.Builder
public abstract static class Builder {
public abstract static class Builder implements BuildableMongoEntity.Builder<RuleFragment, Builder> {

public abstract Builder id(String id);

Expand Down
Loading

0 comments on commit 952ef20

Please sign in to comment.