Skip to content

Commit

Permalink
Revert "Expand config variables for Java plugins"
Browse files Browse the repository at this point in the history
This reverts commit dae3eec.

Fixes #11056
  • Loading branch information
danhermann committed Aug 19, 2019
1 parent 8371733 commit 920dc6f
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 284 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.Rubyfier;
import org.logstash.common.EnvironmentVariableProvider;
import org.logstash.common.SourceWithMetadata;
import org.logstash.config.ir.compiler.AbstractFilterDelegatorExt;
import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt;
Expand All @@ -23,15 +22,11 @@
import org.logstash.config.ir.graph.Vertex;
import org.logstash.config.ir.imperative.PluginStatement;
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.plugins.ConfigVariableExpander;
import org.logstash.secret.store.SecretStore;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -78,27 +73,13 @@ public final class CompiledPipeline {
*/
private final RubyIntegration.PluginFactory pluginFactory;

public CompiledPipeline(
final PipelineIR pipelineIR,
final RubyIntegration.PluginFactory pluginFactory) {
this(pipelineIR, pluginFactory, null);
}

public CompiledPipeline(
final PipelineIR pipelineIR,
final RubyIntegration.PluginFactory pluginFactory,
final SecretStore secretStore) {
public CompiledPipeline(final PipelineIR pipelineIR,
final RubyIntegration.PluginFactory pluginFactory) {
this.pipelineIR = pipelineIR;
this.pluginFactory = pluginFactory;
try (ConfigVariableExpander cve = new ConfigVariableExpander(
secretStore,
EnvironmentVariableProvider.defaultProvider())) {
inputs = setupInputs(cve);
filters = setupFilters(cve);
outputs = setupOutputs(cve);
} catch (Exception e) {
throw new IllegalStateException("Unable to configure plugins: " + e.getMessage());
}
inputs = setupInputs();
filters = setupFilters();
outputs = setupOutputs();
}

public Collection<AbstractOutputDelegatorExt> outputs() {
Expand All @@ -125,15 +106,15 @@ public Dataset buildExecution() {
/**
* Sets up all outputs learned from {@link PipelineIR}.
*/
private Map<String, AbstractOutputDelegatorExt> setupOutputs(ConfigVariableExpander cve) {
private Map<String, AbstractOutputDelegatorExt> setupOutputs() {
final Collection<PluginVertex> outs = pipelineIR.getOutputPluginVertices();
final Map<String, AbstractOutputDelegatorExt> res = new HashMap<>(outs.size());
outs.forEach(v -> {
final PluginDefinition def = v.getPluginDefinition();
final SourceWithMetadata source = v.getSourceWithMetadata();
res.put(v.getId(), pluginFactory.buildOutput(
RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()),
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def, cve)
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def)
));
});
return res;
Expand All @@ -142,7 +123,7 @@ private Map<String, AbstractOutputDelegatorExt> setupOutputs(ConfigVariableExpan
/**
* Sets up all Ruby filters learnt from {@link PipelineIR}.
*/
private Map<String, AbstractFilterDelegatorExt> setupFilters(ConfigVariableExpander cve) {
private Map<String, AbstractFilterDelegatorExt> setupFilters() {
final Collection<PluginVertex> filterPlugins = pipelineIR.getFilterPluginVertices();
final Map<String, AbstractFilterDelegatorExt> res = new HashMap<>(filterPlugins.size(), 1.0F);

Expand All @@ -151,7 +132,7 @@ private Map<String, AbstractFilterDelegatorExt> setupFilters(ConfigVariableExpan
final SourceWithMetadata source = vertex.getSourceWithMetadata();
res.put(vertex.getId(), pluginFactory.buildFilter(
RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()),
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def, cve)
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def)
));
}
return res;
Expand All @@ -160,15 +141,15 @@ private Map<String, AbstractFilterDelegatorExt> setupFilters(ConfigVariableExpan
/**
* Sets up all Ruby inputs learnt from {@link PipelineIR}.
*/
private Collection<IRubyObject> setupInputs(ConfigVariableExpander cve) {
private Collection<IRubyObject> setupInputs() {
final Collection<PluginVertex> vertices = pipelineIR.getInputPluginVertices();
final Collection<IRubyObject> nodes = new HashSet<>(vertices.size());
vertices.forEach(v -> {
final PluginDefinition def = v.getPluginDefinition();
final SourceWithMetadata source = v.getSourceWithMetadata();
IRubyObject o = pluginFactory.buildInput(
RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()),
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def, cve));
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def));
nodes.add(o);
});
return nodes;
Expand Down Expand Up @@ -209,47 +190,23 @@ private RubyHash convertArgs(final PluginDefinition def) {
* @return Map of plugin arguments as understood by the {@link RubyIntegration.PluginFactory}
* methods that create Java plugins
*/
private Map<String, Object> convertJavaArgs(final PluginDefinition def, ConfigVariableExpander cve) {
Map<String, Object> args = expandConfigVariables(cve, def.getArguments());
for (final Map.Entry<String, Object> entry : args.entrySet()) {
private Map<String, Object> convertJavaArgs(final PluginDefinition def) {
for (final Map.Entry<String, Object> entry : def.getArguments().entrySet()) {
final Object value = entry.getValue();
final String key = entry.getKey();
final IRubyObject toput;
if (value instanceof PluginStatement) {
final PluginDefinition codec = ((PluginStatement) value).getPluginDefinition();
Map<String, Object> codecArgs = expandConfigVariables(cve, codec.getArguments());
toput = pluginFactory.buildCodec(
RubyUtil.RUBY.newString(codec.getName()),
Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()),
codecArgs
codec.getArguments()
);
Codec javaCodec = (Codec)JavaUtil.unwrapJavaValue(toput);
args.put(key, javaCodec);
}
}
return args;
}

@SuppressWarnings({"rawtypes", "unchecked"})
private Map<String, Object> expandConfigVariables(ConfigVariableExpander cve, Map<String, Object> configArgs) {
Map<String, Object> expandedConfig = new HashMap<>();
for (Map.Entry<String, Object> e : configArgs.entrySet()) {
if (e.getValue() instanceof List) {
List list = (List) e.getValue();
List<Object> expandedObjects = new ArrayList<>();
for (Object o : list) {
expandedObjects.add(cve.expand(o));
}
expandedConfig.put(e.getKey(), expandedObjects);
} else if (e.getValue() instanceof Map) {
expandedConfig.put(e.getKey(), expandConfigVariables(cve, (Map<String, Object>) e.getValue()));
} else if (e.getValue() instanceof String) {
expandedConfig.put(e.getKey(), cve.expand(e.getValue()));
} else {
expandedConfig.put(e.getKey(), e.getValue());
def.getArguments().put(key, javaCodec);
}
}
return expandedConfig;
return def.getArguments();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.MetricKeys;
import org.logstash.instrument.metrics.NullMetricExt;
import org.logstash.secret.store.SecretStore;
import org.logstash.secret.store.SecretStoreExt;

@JRubyClass(name = "AbstractPipeline")
public class AbstractPipelineExt extends RubyBasicObject {
Expand Down Expand Up @@ -368,22 +366,6 @@ protected final IRubyObject getSetting(final ThreadContext context, final String
return settings.callMethod(context, "get_value", context.runtime.newString(name));
}

protected final boolean hasSetting(final ThreadContext context, final String name) {
return settings.callMethod(context, "registered?", context.runtime.newString(name)) == context.tru;
}

protected SecretStore getSecretStore(final ThreadContext context) {
String keystoreFile = hasSetting(context, "keystore.file")
? getSetting(context, "keystore.file").asJavaString()
: null;
String keystoreClassname = hasSetting(context, "keystore.classname")
? getSetting(context, "keystore.classname").asJavaString()
: null;
return (keystoreFile != null && keystoreClassname != null)
? SecretStoreExt.getIfExists(keystoreFile, keystoreClassname)
: null;
}

private AbstractNamespacedMetricExt getDlqMetric(final ThreadContext context) {
if (dlqMetric == null) {
dlqMetric = metric.namespace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public JavaBasePipelineExt initialize(final ThreadContext context, final IRubyOb
context.runtime, RubyUtil.EXECUTION_CONTEXT_FACTORY_CLASS
).initialize(context, args[3], this, dlqWriter(context)),
RubyUtil.FILTER_DELEGATOR_CLASS
),
getSecretStore(context)
)
);
inputs = RubyArray.newArray(context.runtime, lirExecution.inputs());
filters = RubyArray.newArray(context.runtime, lirExecution.filters());
Expand Down

This file was deleted.

Loading

0 comments on commit 920dc6f

Please sign in to comment.