Skip to content

Commit

Permalink
Refactor ClusterConfigServiceImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
thll committed Jan 9, 2025
1 parent b1e8a57 commit 2649eca
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.auto.value.AutoValue;
import jakarta.validation.constraints.NotEmpty;
import org.graylog.autovalue.WithBeanGetter;
import org.graylog2.database.MongoEntity;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.mongojack.Id;
import org.mongojack.ObjectId;

import javax.annotation.Nullable;

import jakarta.validation.constraints.NotEmpty;

@AutoValue
@WithBeanGetter
public abstract class ClusterConfig {
public abstract class ClusterConfig implements MongoEntity {
@Id
@ObjectId
@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,28 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.eventbus.EventBus;
import com.mongodb.DBCollection;
import com.google.common.primitives.Ints;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.Indexes;
import com.mongodb.client.model.ReplaceOptions;
import jakarta.inject.Inject;
import org.graylog2.bindings.providers.MongoJackObjectMapperProvider;
import org.graylog2.database.MongoCollections;
import org.graylog2.database.MongoConnection;
import org.graylog2.events.ClusterEventBus;
import org.graylog2.plugin.cluster.ClusterConfigService;
import org.graylog2.plugin.system.NodeId;
import org.graylog2.security.RestrictedChainingClassLoader;
import org.graylog2.security.SafeClasses;
import org.graylog2.security.UnsafeClassLoadingAttemptException;
import org.graylog2.shared.plugins.ChainingClassLoader;
import org.graylog2.shared.utilities.AutoValueUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.mongojack.DBCursor;
import org.mongojack.DBQuery;
import org.mongojack.DBSort;
import org.mongojack.JacksonDBCollection;
import org.mongojack.WriteResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.inject.Inject;

import java.util.Set;

import static com.google.common.base.MoreObjects.firstNonNull;
Expand All @@ -53,7 +51,7 @@ public class ClusterConfigServiceImpl implements ClusterConfigService {
@VisibleForTesting
static final String COLLECTION_NAME = "cluster_config";
private static final Logger LOG = LoggerFactory.getLogger(ClusterConfigServiceImpl.class);
private final JacksonDBCollection<ClusterConfig, String> dbCollection;
private final MongoCollection<ClusterConfig> collection;
private final NodeId nodeId;
private final ObjectMapper objectMapper;
private final RestrictedChainingClassLoader chainingClassLoader;
Expand All @@ -65,39 +63,21 @@ public ClusterConfigServiceImpl(final MongoJackObjectMapperProvider mapperProvid
final NodeId nodeId,
final RestrictedChainingClassLoader chainingClassLoader,
final ClusterEventBus clusterEventBus) {
this(JacksonDBCollection.wrap(prepareCollection(mongoConnection), ClusterConfig.class, String.class, mapperProvider.get()),
nodeId, mapperProvider.get(), chainingClassLoader, clusterEventBus);
}

@Deprecated
public ClusterConfigServiceImpl(final MongoJackObjectMapperProvider mapperProvider,
final MongoConnection mongoConnection,
final NodeId nodeId,
final ChainingClassLoader chainingClassLoader,
final ClusterEventBus clusterEventBus) {
this(JacksonDBCollection.wrap(prepareCollection(mongoConnection), ClusterConfig.class, String.class, mapperProvider.get()),
nodeId, mapperProvider.get(), new RestrictedChainingClassLoader(chainingClassLoader, SafeClasses.allGraylogInternal()), clusterEventBus);
}

private ClusterConfigServiceImpl(final JacksonDBCollection<ClusterConfig, String> dbCollection,
final NodeId nodeId,
final ObjectMapper objectMapper,
final RestrictedChainingClassLoader chainingClassLoader,
final EventBus clusterEventBus) {
this.nodeId = checkNotNull(nodeId);
this.dbCollection = checkNotNull(dbCollection);
this.objectMapper = checkNotNull(objectMapper);
this.collection = prepareCollection(mongoConnection, mapperProvider);
this.objectMapper = checkNotNull(mapperProvider.get());
this.chainingClassLoader = chainingClassLoader;
this.clusterEventBus = checkNotNull(clusterEventBus);
}

@VisibleForTesting
static DBCollection prepareCollection(final MongoConnection mongoConnection) {
DBCollection coll = mongoConnection.getDatabase().getCollection(COLLECTION_NAME);
coll.createIndex(DBSort.asc("type"), "unique_type", true);
coll.setWriteConcern(WriteConcern.JOURNALED);

return coll;
static MongoCollection<ClusterConfig> prepareCollection(final MongoConnection mongoConnection,
MongoJackObjectMapperProvider mapperProvider) {
final MongoCollection<ClusterConfig> collection =
new MongoCollections(mapperProvider, mongoConnection).collection(COLLECTION_NAME, ClusterConfig.class)
.withWriteConcern(WriteConcern.JOURNALED);
collection.createIndex(Indexes.ascending("type"), new IndexOptions().name("unique_type").unique(true));
return collection;
}

@Override
Expand Down Expand Up @@ -128,7 +108,7 @@ public <T> T get(String key, Class<T> type) {
}

private ClusterConfig findClusterConfig(String key) {
return dbCollection.findOne(DBQuery.is("type", key));
return collection.find(Filters.eq("type", key)).first();
}

@Override
Expand Down Expand Up @@ -166,7 +146,7 @@ public <T> void write(String key, T payload) {

ClusterConfig clusterConfig = ClusterConfig.create(key, payload, nodeId.getNodeId());

dbCollection.update(DBQuery.is("type", key), clusterConfig, true, false, WriteConcern.JOURNALED);
collection.replaceOne(Filters.eq("type", key), clusterConfig, new ReplaceOptions().upsert(true));

ClusterConfigChangedEvent event = ClusterConfigChangedEvent.create(
DateTime.now(DateTimeZone.UTC), nodeId.getNodeId(), key);
Expand All @@ -176,27 +156,24 @@ public <T> void write(String key, T payload) {
@Override
public <T> int remove(Class<T> type) {
final String canonicalName = type.getCanonicalName();
final WriteResult<ClusterConfig, String> result = dbCollection.remove(DBQuery.is("type", canonicalName));
return result.getN();
return Ints.saturatedCast(collection.deleteMany(Filters.eq("type", canonicalName)).getDeletedCount());
}

@Override
public Set<Class<?>> list() {
final ImmutableSet.Builder<Class<?>> classes = ImmutableSet.builder();

try (DBCursor<ClusterConfig> clusterConfigs = dbCollection.find()) {
for (ClusterConfig clusterConfig : clusterConfigs) {
final String type = clusterConfig.type();
try {
final Class<?> cls = chainingClassLoader.loadClassSafely(type);
classes.add(cls);
} catch (ClassNotFoundException e) {
LOG.debug("Couldn't find configuration class \"{}\"", type, e);
} catch (UnsafeClassLoadingAttemptException e) {
LOG.warn("Couldn't load class <{}>.", type, e);
}
collection.find().forEach(clusterConfig -> {
final String type = clusterConfig.type();
try {
final Class<?> cls = chainingClassLoader.loadClassSafely(type);
classes.add(cls);
} catch (ClassNotFoundException e) {
LOG.debug("Couldn't find configuration class \"{}\"", type, e);
} catch (UnsafeClassLoadingAttemptException e) {
LOG.warn("Couldn't load class <{}>.", type, e);
}
}
});

return classes.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import org.graylog.testing.mongodb.MongoDBInstance;
import org.graylog2.bindings.providers.MongoJackObjectMapperProvider;
import org.graylog2.database.MongoConnection;
Expand Down Expand Up @@ -67,17 +68,18 @@ public class ClusterConfigServiceImplTest {
private ClusterEventBus clusterEventBus;
private MongoConnection mongoConnection;
private ClusterConfigService clusterConfigService;
private MongoJackObjectMapperProvider mapperProvider;

@Before
public void setUpService() throws Exception {
DateTimeUtils.setCurrentMillisFixed(TIME.getMillis());

this.mongoConnection = mongodb.mongoConnection();

MongoJackObjectMapperProvider provider = new MongoJackObjectMapperProvider(objectMapper);
this.mapperProvider = new MongoJackObjectMapperProvider(objectMapper);

this.clusterConfigService = new ClusterConfigServiceImpl(
provider,
mapperProvider,
mongodb.mongoConnection(),
nodeId,
new RestrictedChainingClassLoader(new ChainingClassLoader(getClass().getClassLoader()),
Expand Down Expand Up @@ -325,9 +327,9 @@ public void prepareCollectionCreatesIndexesOnExistingCollection() throws Excepti
assertThat(original.getName()).isEqualTo(COLLECTION_NAME);
assertThat(original.getIndexInfo()).hasSize(1);

DBCollection collection = ClusterConfigServiceImpl.prepareCollection(mongoConnection);
assertThat(collection.getName()).isEqualTo(COLLECTION_NAME);
assertThat(collection.getIndexInfo()).hasSize(2);
MongoCollection<ClusterConfig> collection = ClusterConfigServiceImpl.prepareCollection(mongoConnection, mapperProvider);
assertThat(collection.getNamespace().getCollectionName()).isEqualTo(COLLECTION_NAME);
assertThat(collection.listIndexes()).hasSize(2);
assertThat(collection.getWriteConcern()).isEqualTo(WriteConcern.JOURNALED);
}

Expand All @@ -337,10 +339,10 @@ public void prepareCollectionCreatesCollectionIfItDoesNotExist() throws Exceptio
final DB database = mongoConnection.getDatabase();
database.getCollection(COLLECTION_NAME).drop();
assertThat(database.collectionExists(COLLECTION_NAME)).isFalse();
DBCollection collection = ClusterConfigServiceImpl.prepareCollection(mongoConnection);
MongoCollection<ClusterConfig> collection = ClusterConfigServiceImpl.prepareCollection(mongoConnection, mapperProvider);

assertThat(collection.getName()).isEqualTo(COLLECTION_NAME);
assertThat(collection.getIndexInfo()).hasSize(2);
assertThat(collection.getNamespace().getCollectionName()).isEqualTo(COLLECTION_NAME);
assertThat(collection.listIndexes()).hasSize(2);
assertThat(collection.getWriteConcern()).isEqualTo(WriteConcern.JOURNALED);
}

Expand Down

0 comments on commit 2649eca

Please sign in to comment.