Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pull-based Ingestion] use ConfigurationUtil for KafkaSourceConfig #17223

Merged
merged 3 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-3.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625))
- Views, simplify data access and manipulation by providing a virtual layer over one or more indices ([#11957](https://github.com/opensearch-project/OpenSearch/pull/11957))
- Added pull-based Ingestion (APIs, for ingestion source, a Kafka plugin, and IngestionEngine that pulls data from the ingestion source) ([#16958](https://github.com/opensearch-project/OpenSearch/pull/16958))
- Added ConfigurationUtils to core for the ease of configuration parsing [#17223](https://github.com/opensearch-project/OpenSearch/pull/17223)

### Dependencies
- Update Apache Lucene to 10.1.0 ([#16366](https://github.com/opensearch-project/OpenSearch/pull/16366))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.core.util;

import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchParseException;
import org.opensearch.common.annotation.PublicApi;

import java.util.Map;

/**
* Utility class for parsing configurations.
*
* @opensearch.api
*/
@PublicApi(since = "3.0.0")
public final class ConfigurationUtils {

private ConfigurationUtils() {}

/**
* Returns and removes the specified optional property from the specified configuration map.
* <p>
* If the property value isn't of type string a {@link OpenSearchParseException} is thrown.
*/
public static String readOptionalStringProperty(Map<String, Object> configuration, String propertyName) {
Object value = configuration.get(propertyName);
return readString(propertyName, value);
}

/**
* Returns and removes the specified property from the specified configuration map.
* <p>
* If the property value isn't of type string an {@link OpenSearchParseException} is thrown.
* If the property is missing an {@link OpenSearchParseException} is thrown
*/
public static String readStringProperty(Map<String, Object> configuration, String propertyName) {
return readStringProperty(configuration, propertyName, null);
}

/**
* Returns the specified property from the specified configuration map.
* <p>
* If the property value isn't of type string a {@link OpenSearchParseException} is thrown.
* If the property is missing and no default value has been specified a {@link OpenSearchParseException} is thrown
*/
public static String readStringProperty(Map<String, Object> configuration, String propertyName, String defaultValue) {
Object value = configuration.get(propertyName);
if (value == null && defaultValue != null) {
return defaultValue;
} else if (value == null) {
throw newConfigurationException(propertyName, "required property is missing");
}
return readString(propertyName, value);
}

public static OpenSearchException newConfigurationException(String propertyName, String reason) {
String msg;
if (propertyName == null) {
msg = reason;

Check warning on line 66 in libs/core/src/main/java/org/opensearch/core/util/ConfigurationUtils.java

View check run for this annotation

Codecov / codecov/patch

libs/core/src/main/java/org/opensearch/core/util/ConfigurationUtils.java#L66

Added line #L66 was not covered by tests
} else {
msg = "[" + propertyName + "] " + reason;
}
OpenSearchParseException exception = new OpenSearchParseException(msg);
addMetadataToException(exception, propertyName);
return exception;
}

private static String readString(String propertyName, Object value) {
if (value == null) {
return null;
}
if (value instanceof String) {
return (String) value;
}
throw newConfigurationException(propertyName, "property isn't a string, but of type [" + value.getClass().getName() + "]");
}

private static void addMetadataToException(OpenSearchException exception, String propertyName) {
if (propertyName != null) {
exception.addMetadata("opensearch.property_name", propertyName);
}
}

/**
* Returns the specified property from the specified configuration map.
* <p>
* If the property value isn't of type string or int a {@link OpenSearchParseException} is thrown.
* If the property is missing and no default value has been specified a {@link OpenSearchParseException} is thrown
*/
public static String readStringOrIntProperty(Map<String, Object> configuration, String propertyName, String defaultValue) {
Object value = configuration.get(propertyName);
if (value == null && defaultValue != null) {
return defaultValue;

Check warning on line 100 in libs/core/src/main/java/org/opensearch/core/util/ConfigurationUtils.java

View check run for this annotation

Codecov / codecov/patch

libs/core/src/main/java/org/opensearch/core/util/ConfigurationUtils.java#L100

Added line #L100 was not covered by tests
} else if (value == null) {
throw newConfigurationException(propertyName, "required property is missing");

Check warning on line 102 in libs/core/src/main/java/org/opensearch/core/util/ConfigurationUtils.java

View check run for this annotation

Codecov / codecov/patch

libs/core/src/main/java/org/opensearch/core/util/ConfigurationUtils.java#L102

Added line #L102 was not covered by tests
}
return readStringOrInt(propertyName, value);
}

private static String readStringOrInt(String propertyName, Object value) {
if (value == null) {
return null;

Check warning on line 109 in libs/core/src/main/java/org/opensearch/core/util/ConfigurationUtils.java

View check run for this annotation

Codecov / codecov/patch

libs/core/src/main/java/org/opensearch/core/util/ConfigurationUtils.java#L109

Added line #L109 was not covered by tests
}
if (value instanceof String) {
return (String) value;
} else if (value instanceof Integer) {
return String.valueOf(value);
}
throw newConfigurationException(propertyName, "property isn't a string or int, but of type [" + value.getClass().getName() + "]");
}

/**
* Returns the specified property from the specified configuration map.
* <p>
* If the property value isn't of type string or int a {@link OpenSearchParseException} is thrown.
*/
public static String readOptionalStringOrIntProperty(Map<String, Object> configuration, String propertyName) {
Object value = configuration.get(propertyName);
if (value == null) {
return null;
}
return readStringOrInt(propertyName, value);
}

public static boolean readBooleanProperty(Map<String, Object> configuration, String propertyName, boolean defaultValue) {
Object value = configuration.get(propertyName);
if (value == null) {
return defaultValue;
} else {
return readBoolean(propertyName, value).booleanValue();
}
}

private static Boolean readBoolean(String propertyName, Object value) {
if (value == null) {
return null;

Check warning on line 143 in libs/core/src/main/java/org/opensearch/core/util/ConfigurationUtils.java

View check run for this annotation

Codecov / codecov/patch

libs/core/src/main/java/org/opensearch/core/util/ConfigurationUtils.java#L143

Added line #L143 was not covered by tests
}
if (value instanceof Boolean) {
return (boolean) value;
}
throw newConfigurationException(propertyName, "property isn't a boolean, but of type [" + value.getClass().getName() + "]");
}

/**
* Returns the specified property from the specified configuration map.
* <p>
* If the property value isn't of type int a {@link OpenSearchParseException} is thrown.
* If the property is missing an {@link OpenSearchParseException} is thrown
*/
public static Integer readIntProperty(Map<String, Object> configuration, String propertyName, Integer defaultValue) {
Object value = configuration.get(propertyName);
if (value == null) {
return defaultValue;
}
try {
return Integer.parseInt(value.toString());
} catch (Exception e) {
throw newConfigurationException(propertyName, "property cannot be converted to an int [" + value + "]");
}
}

/**
* Returns the specified property from the specified configuration map.
* <p>
* If the property value isn't of type int a {@link OpenSearchParseException} is thrown.
* If the property is missing an {@link OpenSearchParseException} is thrown
*/
public static Double readDoubleProperty(Map<String, Object> configuration, String propertyName) {
Object value = configuration.get(propertyName);
if (value == null) {
throw newConfigurationException(propertyName, "required property is missing");
}
try {
return Double.parseDouble(value.toString());
} catch (Exception e) {
throw newConfigurationException(propertyName, "property cannot be converted to a double [" + value + "]");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.core.util;

import org.opensearch.OpenSearchParseException;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;

public class ConfigurationUtilsTests extends OpenSearchTestCase {
private Map<String, Object> config;

@Before
public void setConfig() {
config = new HashMap<>();
config.put("foo", "bar");
config.put("boolVal", true);
config.put("null", null);
config.put("arr", Arrays.asList("1", "2", "3"));
config.put("ip", "127.0.0.1");
config.put("num", 1);
config.put("double", 1.0);
}

public void testReadStringProperty() {
String val = ConfigurationUtils.readStringProperty(config, "foo");
assertThat(val, equalTo("bar"));
String val1 = ConfigurationUtils.readStringProperty(config, "foo1", "none");
assertThat(val1, equalTo("none"));
try {
ConfigurationUtils.readStringProperty(config, "foo1", null);
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[foo1] required property is missing"));
}
}

public void testOptionalReadStringProperty() {
String val = ConfigurationUtils.readOptionalStringProperty(config, "foo");
assertThat(val, equalTo("bar"));
String val1 = ConfigurationUtils.readOptionalStringProperty(config, "foo1");
assertThat(val, equalTo("bar"));
assertThat(val1, equalTo(null));
}

public void testReadStringPropertyInvalidType() {
try {
ConfigurationUtils.readStringProperty(config, "arr");
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[arr] property isn't a string, but of type [java.util.Arrays$ArrayList]"));
}
}

public void testReadBooleanProperty() {
Boolean val = ConfigurationUtils.readBooleanProperty(config, "boolVal", false);
assertThat(val, equalTo(true));
}

public void testReadNullBooleanProperty() {
Boolean val = ConfigurationUtils.readBooleanProperty(config, "null", false);
assertThat(val, equalTo(false));
}

public void testReadBooleanPropertyInvalidType() {
try {
ConfigurationUtils.readBooleanProperty(config, "arr", true);
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[arr] property isn't a boolean, but of type [java.util.Arrays$ArrayList]"));
}
}

public void testReadStringOrIntProperty() {
String val1 = ConfigurationUtils.readStringOrIntProperty(config, "foo", null);
String val2 = ConfigurationUtils.readStringOrIntProperty(config, "num", null);
assertThat(val1, equalTo("bar"));
assertThat(val2, equalTo("1"));
}

public void testOptionalReadStringOrIntProperty() {
String val1 = ConfigurationUtils.readOptionalStringOrIntProperty(config, "foo");
String val2 = ConfigurationUtils.readOptionalStringOrIntProperty(config, "num");
String val3 = ConfigurationUtils.readOptionalStringOrIntProperty(config, "num1");
assertThat(val1, equalTo("bar"));
assertThat(val2, equalTo("1"));
assertThat(val3, equalTo(null));
}

public void testReadIntProperty() {
int val = ConfigurationUtils.readIntProperty(config, "num", null);
assertThat(val, equalTo(1));
try {
ConfigurationUtils.readIntProperty(config, "foo", 2);
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[foo] property cannot be converted to an int [bar]"));
}
try {
ConfigurationUtils.readIntProperty(config, "foo1", 2);
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("required property is missing"));
}
}

public void testReadDoubleProperty() {
double val = ConfigurationUtils.readDoubleProperty(config, "double");
assertThat(val, equalTo(1.0));
try {
ConfigurationUtils.readDoubleProperty(config, "foo");
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[foo] property cannot be converted to a double [bar]"));
}
try {
ConfigurationUtils.readDoubleProperty(config, "foo1");
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[foo1] required property is missing"));
}
}

public void testReadStringOrIntPropertyInvalidType() {
try {
ConfigurationUtils.readStringOrIntProperty(config, "arr", null);
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[arr] property isn't a string or int, but of type [java.util.Arrays$ArrayList]"));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@

package org.opensearch.plugin.kafka;

import org.opensearch.core.util.ConfigurationUtils;

import java.util.Map;
import java.util.Objects;

/**
* Class encapsulating the configuration of a Kafka source.
*/
public class KafkaSourceConfig {
private final String PROP_TOPIC = "topic";
private final String PROP_BOOTSTRAP_SERVERS = "bootstrap_servers";

private final String topic;
private final String bootstrapServers;

Expand All @@ -23,10 +27,8 @@ public class KafkaSourceConfig {
* @param params the configuration parameters
*/
public KafkaSourceConfig(Map<String, Object> params) {
// TODO: better parsing and validation
this.topic = (String) Objects.requireNonNull(params.get("topic"));
this.bootstrapServers = (String) Objects.requireNonNull(params.get("bootstrap_servers"));
assert this.bootstrapServers != null;
this.topic = ConfigurationUtils.readStringProperty(params, PROP_TOPIC);
this.bootstrapServers = ConfigurationUtils.readStringProperty(params, PROP_BOOTSTRAP_SERVERS);
}

/**
Expand Down
Loading