Skip to content

Commit

Permalink
Plugin metrics injection (#5372)
Browse files Browse the repository at this point in the history
* injectable plugin metrics

Signed-off-by: Santhosh Gandhe <[email protected]>

* removed an unused parameter

Signed-off-by: Santhosh Gandhe <[email protected]>

* fixing a flaky test

Signed-off-by: Santhosh Gandhe <[email protected]>

---------

Signed-off-by: Santhosh Gandhe <[email protected]>
  • Loading branch information
san81 authored Jan 31, 2025
1 parent 414a7c9 commit 5b16480
Show file tree
Hide file tree
Showing 15 changed files with 118 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugin;

import io.micrometer.core.instrument.Counter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -14,18 +15,19 @@
import org.opensearch.dataprepper.core.event.EventFactoryApplicationContextMarker;
import org.opensearch.dataprepper.core.validation.LoggingPluginErrorsHandler;
import org.opensearch.dataprepper.core.validation.PluginErrorCollector;
import org.opensearch.dataprepper.model.plugin.NoPluginFoundException;
import org.opensearch.dataprepper.plugins.configtest.TestComponentWithConfigInject;
import org.opensearch.dataprepper.plugins.configtest.TestDISourceWithConfig;
import org.opensearch.dataprepper.validation.PluginErrorsHandler;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.plugin.NoPluginFoundException;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.plugins.TestObjectPlugin;
import org.opensearch.dataprepper.plugins.configtest.TestComponentWithConfigInject;
import org.opensearch.dataprepper.plugins.configtest.TestDISourceWithConfig;
import org.opensearch.dataprepper.plugins.test.TestComponent;
import org.opensearch.dataprepper.plugins.test.TestDISource;
import org.opensearch.dataprepper.plugins.test.TestPlugin;
import org.opensearch.dataprepper.validation.PluginErrorsHandler;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.util.Collections;
Expand Down Expand Up @@ -129,7 +131,7 @@ void loadPlugin_should_return_a_new_plugin_instance_with_DI_context_initialized(
}

@Test
void loadPlugin_should_return_a_new_plugin_instance_with_DI_context_and_config_injected() {
void loadPlugin_should_return_a_new_plugin_instance_with_DI_context_with_config_and_plugin_metrics_injected() {

final String requiredStringValue = UUID.randomUUID().toString();
final String optionalStringValue = UUID.randomUUID().toString();
Expand All @@ -152,6 +154,9 @@ void loadPlugin_should_return_a_new_plugin_instance_with_DI_context_and_config_i
assertThat(pluginConfig.getRequiredString(), equalTo(requiredStringValue));
assertThat(pluginConfig.getOptionalString(), equalTo(optionalStringValue));
assertThat(plugin.getTestComponent().getIdentifier(), equalTo("test-component-with-plugin-config-injected"));
PluginMetrics pluginMetrics = plugin.getTestComponent().getPluginMetrics();
assertInstanceOf(PluginMetrics.class, pluginMetrics);
assertInstanceOf(Counter.class, pluginMetrics.counter("testCounter"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ public class DefaultPluginFactory implements PluginFactory {
this.pluginBeanFactoryProvider = Objects.requireNonNull(pluginBeanFactoryProvider);
this.pluginConfigurationObservableFactory = pluginConfigurationObservableFactory;

if(pluginProviders.isEmpty()) {
if (pluginProviders.isEmpty()) {
throw new RuntimeException("Data Prepper requires at least one PluginProvider. " +
"Your Data Prepper configuration may be missing the org.opensearch.dataprepper.plugin.PluginProvider file.");
}
}

@Override
public <T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSetting, final Object ... args) {
public <T> T loadPlugin(final Class<T> baseClass, final PluginSetting pluginSetting, final Object... args) {
final String pluginName = pluginSetting.getName();
final Class<? extends T> pluginClass = getPluginClass(baseClass, pluginName);

Expand Down Expand Up @@ -100,7 +100,7 @@ public <T> List<T> loadPlugins(

final Integer numberOfInstances = numberOfInstancesFunction.apply(pluginClass);

if(numberOfInstances == null || numberOfInstances < 0)
if (numberOfInstances == null || numberOfInstances < 0)
throw new IllegalArgumentException("The numberOfInstances must be provided as a non-negative integer.");

final ComponentPluginArgumentsContext constructionContext = getConstructionContext(pluginSetting, pluginClass, null);
Expand All @@ -121,7 +121,7 @@ private <T> ComponentPluginArgumentsContext getConstructionContext(final PluginS
.createDefaultPluginConfigObservable(pluginConfigurationConverter, pluginConfigurationType, pluginSetting);

Class[] markersToScan = pluginAnnotation.packagesToScan();
BeanFactory beanFactory = pluginBeanFactoryProvider.createPluginSpecificContext(markersToScan, configuration);
BeanFactory beanFactory = pluginBeanFactoryProvider.createPluginSpecificContext(markersToScan, configuration, pluginSetting);

return new ComponentPluginArgumentsContext.Builder()
.withPluginSetting(pluginSetting)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugin;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
Expand All @@ -20,8 +21,8 @@
/**
* @since 1.3
* <p>
* Used to create new instances of ApplicationContext that can be used to provide a per plugin instance isolated ApplicationContext
* scope. CoreApplicationContext is unavailable to sharedPluginApplicationContext and its children.
* Used to create new instances of ApplicationContext that can be used to provide a per plugin instance isolated ApplicationContext
* scope. CoreApplicationContext is unavailable to sharedPluginApplicationContext and its children.
* </p>
* <p>pluginIsolatedApplicationContext inherits from {@link PluginBeanFactoryProvider#sharedPluginApplicationContext}</p>
* <p>{@link PluginBeanFactoryProvider#sharedPluginApplicationContext} inherits from <i>publicContext</i></p>
Expand Down Expand Up @@ -53,20 +54,23 @@ GenericApplicationContext getCoreApplicationContext() {
}

/**
* @return BeanFactory A BeanFactory that inherits from {@link PluginBeanFactoryProvider#sharedPluginApplicationContext}
* @since 1.3
* Creates a new isolated application context that inherits from
* {@link PluginBeanFactoryProvider#sharedPluginApplicationContext} then returns new context's BeanFactory.
* {@link PluginBeanFactoryProvider#sharedPluginApplicationContext} should not be directly accessible to plugins.
* instead, a new isolated {@link ApplicationContext} should be created.
* @return BeanFactory A BeanFactory that inherits from {@link PluginBeanFactoryProvider#sharedPluginApplicationContext}
*/
public BeanFactory createPluginSpecificContext(Class[] markersToScan, Object configuration) {
public BeanFactory createPluginSpecificContext(Class[] markersToScan, Object configuration, final PluginSetting pluginSetting) {
AnnotationConfigApplicationContext isolatedPluginApplicationContext = new AnnotationConfigApplicationContext();
DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) isolatedPluginApplicationContext.getBeanFactory();
if(markersToScan !=null && markersToScan.length>0) {
if(configuration !=null && !(configuration instanceof PluginSetting)) {
if (markersToScan != null && markersToScan.length > 0) {
if (configuration != null && !(configuration instanceof PluginSetting)) {
beanFactory.registerSingleton(configuration.getClass().getName(), configuration);
}
if (pluginSetting != null) {
beanFactory.registerSingleton(PluginMetrics.class.getName(), PluginMetrics.fromPluginSetting(pluginSetting));
}
// If packages to scan is provided in this plugin annotation, which indicates
// that this plugin is interested in using Dependency Injection isolated for its module
Arrays.stream(markersToScan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ void loadPlugin_should_create_a_new_instance_of_the_plugin_with_di_initialized()
equalTo(expectedInstance));
verify(pluginConfigurationObservableFactory).createDefaultPluginConfigObservable(eq(pluginConfigurationConverter),
eq(PluginSetting.class), eq(pluginSetting));
verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{TestDISource.class}, convertedConfiguration);
verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{TestDISource.class}, convertedConfiguration, pluginSetting);
}

@Test
Expand All @@ -233,7 +233,7 @@ void loadPlugin_should_create_a_new_instance_of_the_first_plugin_found() {
equalTo(expectedInstance));
verify(pluginConfigurationObservableFactory).createDefaultPluginConfigObservable(eq(pluginConfigurationConverter),
eq(PluginSetting.class), eq(pluginSetting));
verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, convertedConfiguration);
verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, convertedConfiguration, pluginSetting);
}

@Test
Expand Down Expand Up @@ -283,7 +283,7 @@ void loadPlugins_should_return_an_empty_list_when_the_number_of_instances_is_0()
assertThat(plugins, notNullValue());
assertThat(plugins.size(), equalTo(0));

verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, null);
verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, null, pluginSetting);
verifyNoInteractions(pluginCreator);
}

Expand All @@ -299,7 +299,7 @@ void loadPlugins_should_return_a_single_instance_when_the_the_numberOfInstances_
final List<?> plugins = createObjectUnderTest().loadPlugins(
baseClass, pluginSetting, c -> 1);

verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, convertedConfiguration);
verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, convertedConfiguration, pluginSetting);
verify(pluginConfigurationObservableFactory).createDefaultPluginConfigObservable(eq(pluginConfigurationConverter),
eq(PluginSetting.class), eq(pluginSetting));
final ArgumentCaptor<ComponentPluginArgumentsContext> pluginArgumentsContextArgCapture = ArgumentCaptor.forClass(ComponentPluginArgumentsContext.class);
Expand All @@ -309,7 +309,7 @@ void loadPlugins_should_return_a_single_instance_when_the_the_numberOfInstances_
final Object[] pipelineDescriptionObj = actualPluginArgumentsContext.createArguments(classes.toArray(new Class[1]));
assertThat(pipelineDescriptionObj.length, equalTo(1));
assertThat(pipelineDescriptionObj[0], instanceOf(PipelineDescription.class));
final PipelineDescription actualPipelineDescription = (PipelineDescription)pipelineDescriptionObj[0];
final PipelineDescription actualPipelineDescription = (PipelineDescription) pipelineDescriptionObj[0];
assertThat(actualPipelineDescription.getPipelineName(), is(pipelineName));
assertThat(plugins, notNullValue());
assertThat(plugins.size(), equalTo(1));
Expand All @@ -328,7 +328,7 @@ void loadPlugin_with_varargs_should_return_a_single_instance_when_the_the_number

final Object plugin = createObjectUnderTest().loadPlugin(baseClass, pluginSetting, object);

verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, convertedConfiguration);
verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, convertedConfiguration, pluginSetting);
verify(pluginConfigurationObservableFactory).createDefaultPluginConfigObservable(eq(pluginConfigurationConverter),
eq(PluginSetting.class), eq(pluginSetting));
final ArgumentCaptor<ComponentPluginArgumentsContext> pluginArgumentsContextArgCapture = ArgumentCaptor.forClass(ComponentPluginArgumentsContext.class);
Expand All @@ -338,7 +338,7 @@ void loadPlugin_with_varargs_should_return_a_single_instance_when_the_the_number
final Object[] pipelineDescriptionObj = actualPluginArgumentsContext.createArguments(classes.toArray(new Class[1]));
assertThat(pipelineDescriptionObj.length, equalTo(1));
assertThat(pipelineDescriptionObj[0], instanceOf(PipelineDescription.class));
final PipelineDescription actualPipelineDescription = (PipelineDescription)pipelineDescriptionObj[0];
final PipelineDescription actualPipelineDescription = (PipelineDescription) pipelineDescriptionObj[0];
assertThat(actualPipelineDescription.getPipelineName(), is(pipelineName));
assertThat(plugin, notNullValue());
assertThat(plugin, equalTo(expectedInstance));
Expand Down Expand Up @@ -380,7 +380,7 @@ void loadPlugins_should_return_an_instance_for_the_total_count() {
final List<?> plugins = createObjectUnderTest().loadPlugins(
baseClass, pluginSetting, c -> 3);

verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, convertedConfiguration);
verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, convertedConfiguration, pluginSetting);
final ArgumentCaptor<ComponentPluginArgumentsContext> pluginArgumentsContextArgCapture = ArgumentCaptor.forClass(ComponentPluginArgumentsContext.class);
verify(pluginCreator, times(3)).newPluginInstance(eq(expectedPluginClass), pluginArgumentsContextArgCapture.capture(), eq(pluginName));
final List<ComponentPluginArgumentsContext> actualPluginArgumentsContextList = pluginArgumentsContextArgCapture.getAllValues();
Expand All @@ -390,7 +390,7 @@ void loadPlugins_should_return_an_instance_for_the_total_count() {
final Object[] pipelineDescriptionObj = pluginArgumentsContext.createArguments(classes.toArray(new Class[1]));
assertThat(pipelineDescriptionObj.length, equalTo(1));
assertThat(pipelineDescriptionObj[0], instanceOf(PipelineDescription.class));
final PipelineDescription actualPipelineDescription = (PipelineDescription)pipelineDescriptionObj[0];
final PipelineDescription actualPipelineDescription = (PipelineDescription) pipelineDescriptionObj[0];
assertThat(actualPipelineDescription.getPipelineName(), is(pipelineName));
});
assertThat(plugins, notNullValue());
Expand All @@ -416,7 +416,7 @@ void loadPlugins_should_return_a_single_instance_with_values_from_ApplicationCon
final List<?> plugins = createObjectUnderTest().loadPlugins(
baseClass, pluginSetting, c -> 1);

verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, convertedConfiguration);
verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, convertedConfiguration, pluginSetting);
final ArgumentCaptor<ComponentPluginArgumentsContext> pluginArgumentsContextArgCapture = ArgumentCaptor.forClass(ComponentPluginArgumentsContext.class);
verify(pluginCreator).newPluginInstance(eq(expectedPluginClass), pluginArgumentsContextArgCapture.capture(), eq(pluginName));
final ComponentPluginArgumentsContext actualPluginArgumentsContext = pluginArgumentsContextArgCapture.getValue();
Expand All @@ -425,7 +425,7 @@ void loadPlugins_should_return_a_single_instance_with_values_from_ApplicationCon
assertThat(pipelineDescriptionObj.length, equalTo(2));
assertThat(pipelineDescriptionObj[0], instanceOf(PipelineDescription.class));
assertThat(pipelineDescriptionObj[1], sameInstance(suppliedAdditionalArgument));
final PipelineDescription actualPipelineDescription = (PipelineDescription)pipelineDescriptionObj[0];
final PipelineDescription actualPipelineDescription = (PipelineDescription) pipelineDescriptionObj[0];
assertThat(actualPipelineDescription.getPipelineName(), is(pipelineName));
assertThat(plugins, notNullValue());
assertThat(plugins.size(), equalTo(1));
Expand Down Expand Up @@ -458,7 +458,7 @@ void loadPlugin_should_create_a_new_instance_of_the_first_plugin_found_with_corr

assertThat(createObjectUnderTest().loadPlugin(baseClass, pluginSetting), equalTo(expectedInstance));
MatcherAssert.assertThat(expectedInstance.getClass().getAnnotation(DataPrepperPlugin.class).deprecatedName(), equalTo(TEST_SINK_DEPRECATED_NAME));
verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, convertedConfiguration);
verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, convertedConfiguration, pluginSetting);
}
}

Expand Down Expand Up @@ -487,7 +487,7 @@ void loadPlugin_should_create_a_new_instance_of_the_first_plugin_found_with_corr

assertThat(createObjectUnderTest().loadPlugin(baseClass, pluginSetting), equalTo(expectedInstance));
MatcherAssert.assertThat(expectedInstance.getClass().getAnnotation(DataPrepperPlugin.class).alternateNames(), equalTo(new String[]{TEST_SINK_ALTERNATE_NAME}));
verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, convertedConfiguration);
verify(beanFactoryProvider).createPluginSpecificContext(new Class[]{}, convertedConfiguration, pluginSetting);
}
}
}
Loading

0 comments on commit 5b16480

Please sign in to comment.