Skip to content

Commit

Permalink
[Pull-based Ingestion] use ConfigurationUtil for KafkaSourceConfig (#…
Browse files Browse the repository at this point in the history
…17223)


---------

Signed-off-by: Yupeng Fu <[email protected]>
  • Loading branch information
yupeng9 authored Feb 5, 2025
1 parent ab2f5f6 commit 5b07cf1
Show file tree
Hide file tree
Showing 4 changed files with 330 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-3.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625))
- Views, simplify data access and manipulation by providing a virtual layer over one or more indices ([#11957](https://github.com/opensearch-project/OpenSearch/pull/11957))
- Added pull-based Ingestion (APIs, for ingestion source, a Kafka plugin, and IngestionEngine that pulls data from the ingestion source) ([#16958](https://github.com/opensearch-project/OpenSearch/pull/16958))
- Added ConfigurationUtils to core for the ease of configuration parsing [#17223](https://github.com/opensearch-project/OpenSearch/pull/17223)

### Dependencies
- Update Apache Lucene to 10.1.0 ([#16366](https://github.com/opensearch-project/OpenSearch/pull/16366))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.core.util;

import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchParseException;
import org.opensearch.common.annotation.PublicApi;

import java.util.Map;

/**
* Utility class for parsing configurations.
*
* @opensearch.api
*/
@PublicApi(since = "3.0.0")
public final class ConfigurationUtils {

private ConfigurationUtils() {}

/**
* Returns and removes the specified optional property from the specified configuration map.
* <p>
* If the property value isn't of type string a {@link OpenSearchParseException} is thrown.
*/
public static String readOptionalStringProperty(Map<String, Object> configuration, String propertyName) {
Object value = configuration.get(propertyName);
return readString(propertyName, value);
}

/**
* Returns and removes the specified property from the specified configuration map.
* <p>
* If the property value isn't of type string an {@link OpenSearchParseException} is thrown.
* If the property is missing an {@link OpenSearchParseException} is thrown
*/
public static String readStringProperty(Map<String, Object> configuration, String propertyName) {
return readStringProperty(configuration, propertyName, null);
}

/**
* Returns the specified property from the specified configuration map.
* <p>
* If the property value isn't of type string a {@link OpenSearchParseException} is thrown.
* If the property is missing and no default value has been specified a {@link OpenSearchParseException} is thrown
*/
public static String readStringProperty(Map<String, Object> configuration, String propertyName, String defaultValue) {
Object value = configuration.get(propertyName);
if (value == null && defaultValue != null) {
return defaultValue;
} else if (value == null) {
throw newConfigurationException(propertyName, "required property is missing");
}
return readString(propertyName, value);
}

public static OpenSearchException newConfigurationException(String propertyName, String reason) {
String msg;
if (propertyName == null) {
msg = reason;
} else {
msg = "[" + propertyName + "] " + reason;
}
OpenSearchParseException exception = new OpenSearchParseException(msg);
addMetadataToException(exception, propertyName);
return exception;
}

private static String readString(String propertyName, Object value) {
if (value == null) {
return null;
}
if (value instanceof String) {
return (String) value;
}
throw newConfigurationException(propertyName, "property isn't a string, but of type [" + value.getClass().getName() + "]");
}

private static void addMetadataToException(OpenSearchException exception, String propertyName) {
if (propertyName != null) {
exception.addMetadata("opensearch.property_name", propertyName);
}
}

/**
* Returns the specified property from the specified configuration map.
* <p>
* If the property value isn't of type string or int a {@link OpenSearchParseException} is thrown.
* If the property is missing and no default value has been specified a {@link OpenSearchParseException} is thrown
*/
public static String readStringOrIntProperty(Map<String, Object> configuration, String propertyName, String defaultValue) {
Object value = configuration.get(propertyName);
if (value == null && defaultValue != null) {
return defaultValue;
} else if (value == null) {
throw newConfigurationException(propertyName, "required property is missing");
}
return readStringOrInt(propertyName, value);
}

private static String readStringOrInt(String propertyName, Object value) {
if (value == null) {
return null;
}
if (value instanceof String) {
return (String) value;
} else if (value instanceof Integer) {
return String.valueOf(value);
}
throw newConfigurationException(propertyName, "property isn't a string or int, but of type [" + value.getClass().getName() + "]");
}

/**
* Returns the specified property from the specified configuration map.
* <p>
* If the property value isn't of type string or int a {@link OpenSearchParseException} is thrown.
*/
public static String readOptionalStringOrIntProperty(Map<String, Object> configuration, String propertyName) {
Object value = configuration.get(propertyName);
if (value == null) {
return null;
}
return readStringOrInt(propertyName, value);
}

public static boolean readBooleanProperty(Map<String, Object> configuration, String propertyName, boolean defaultValue) {
Object value = configuration.get(propertyName);
if (value == null) {
return defaultValue;
} else {
return readBoolean(propertyName, value).booleanValue();
}
}

private static Boolean readBoolean(String propertyName, Object value) {
if (value == null) {
return null;
}
if (value instanceof Boolean) {
return (boolean) value;
}
throw newConfigurationException(propertyName, "property isn't a boolean, but of type [" + value.getClass().getName() + "]");
}

/**
* Returns the specified property from the specified configuration map.
* <p>
* If the property value isn't of type int a {@link OpenSearchParseException} is thrown.
* If the property is missing an {@link OpenSearchParseException} is thrown
*/
public static Integer readIntProperty(Map<String, Object> configuration, String propertyName, Integer defaultValue) {
Object value = configuration.get(propertyName);
if (value == null) {
return defaultValue;
}
try {
return Integer.parseInt(value.toString());
} catch (Exception e) {
throw newConfigurationException(propertyName, "property cannot be converted to an int [" + value + "]");
}
}

/**
* Returns the specified property from the specified configuration map.
* <p>
* If the property value isn't of type int a {@link OpenSearchParseException} is thrown.
* If the property is missing an {@link OpenSearchParseException} is thrown
*/
public static Double readDoubleProperty(Map<String, Object> configuration, String propertyName) {
Object value = configuration.get(propertyName);
if (value == null) {
throw newConfigurationException(propertyName, "required property is missing");
}
try {
return Double.parseDouble(value.toString());
} catch (Exception e) {
throw newConfigurationException(propertyName, "property cannot be converted to a double [" + value + "]");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.core.util;

import org.opensearch.OpenSearchParseException;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;

public class ConfigurationUtilsTests extends OpenSearchTestCase {
private Map<String, Object> config;

@Before
public void setConfig() {
config = new HashMap<>();
config.put("foo", "bar");
config.put("boolVal", true);
config.put("null", null);
config.put("arr", Arrays.asList("1", "2", "3"));
config.put("ip", "127.0.0.1");
config.put("num", 1);
config.put("double", 1.0);
}

public void testReadStringProperty() {
String val = ConfigurationUtils.readStringProperty(config, "foo");
assertThat(val, equalTo("bar"));
String val1 = ConfigurationUtils.readStringProperty(config, "foo1", "none");
assertThat(val1, equalTo("none"));
try {
ConfigurationUtils.readStringProperty(config, "foo1", null);
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[foo1] required property is missing"));
}
}

public void testOptionalReadStringProperty() {
String val = ConfigurationUtils.readOptionalStringProperty(config, "foo");
assertThat(val, equalTo("bar"));
String val1 = ConfigurationUtils.readOptionalStringProperty(config, "foo1");
assertThat(val, equalTo("bar"));
assertThat(val1, equalTo(null));
}

public void testReadStringPropertyInvalidType() {
try {
ConfigurationUtils.readStringProperty(config, "arr");
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[arr] property isn't a string, but of type [java.util.Arrays$ArrayList]"));
}
}

public void testReadBooleanProperty() {
Boolean val = ConfigurationUtils.readBooleanProperty(config, "boolVal", false);
assertThat(val, equalTo(true));
}

public void testReadNullBooleanProperty() {
Boolean val = ConfigurationUtils.readBooleanProperty(config, "null", false);
assertThat(val, equalTo(false));
}

public void testReadBooleanPropertyInvalidType() {
try {
ConfigurationUtils.readBooleanProperty(config, "arr", true);
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[arr] property isn't a boolean, but of type [java.util.Arrays$ArrayList]"));
}
}

public void testReadStringOrIntProperty() {
String val1 = ConfigurationUtils.readStringOrIntProperty(config, "foo", null);
String val2 = ConfigurationUtils.readStringOrIntProperty(config, "num", null);
assertThat(val1, equalTo("bar"));
assertThat(val2, equalTo("1"));
}

public void testOptionalReadStringOrIntProperty() {
String val1 = ConfigurationUtils.readOptionalStringOrIntProperty(config, "foo");
String val2 = ConfigurationUtils.readOptionalStringOrIntProperty(config, "num");
String val3 = ConfigurationUtils.readOptionalStringOrIntProperty(config, "num1");
assertThat(val1, equalTo("bar"));
assertThat(val2, equalTo("1"));
assertThat(val3, equalTo(null));
}

public void testReadIntProperty() {
int val = ConfigurationUtils.readIntProperty(config, "num", null);
assertThat(val, equalTo(1));
try {
ConfigurationUtils.readIntProperty(config, "foo", 2);
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[foo] property cannot be converted to an int [bar]"));
}
try {
ConfigurationUtils.readIntProperty(config, "foo1", 2);
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("required property is missing"));
}
}

public void testReadDoubleProperty() {
double val = ConfigurationUtils.readDoubleProperty(config, "double");
assertThat(val, equalTo(1.0));
try {
ConfigurationUtils.readDoubleProperty(config, "foo");
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[foo] property cannot be converted to a double [bar]"));
}
try {
ConfigurationUtils.readDoubleProperty(config, "foo1");
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[foo1] required property is missing"));
}
}

public void testReadStringOrIntPropertyInvalidType() {
try {
ConfigurationUtils.readStringOrIntProperty(config, "arr", null);
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[arr] property isn't a string or int, but of type [java.util.Arrays$ArrayList]"));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@

package org.opensearch.plugin.kafka;

import org.opensearch.core.util.ConfigurationUtils;

import java.util.Map;
import java.util.Objects;

/**
* Class encapsulating the configuration of a Kafka source.
*/
public class KafkaSourceConfig {
private final String PROP_TOPIC = "topic";
private final String PROP_BOOTSTRAP_SERVERS = "bootstrap_servers";

private final String topic;
private final String bootstrapServers;

Expand All @@ -23,10 +27,8 @@ public class KafkaSourceConfig {
* @param params the configuration parameters
*/
public KafkaSourceConfig(Map<String, Object> params) {
// TODO: better parsing and validation
this.topic = (String) Objects.requireNonNull(params.get("topic"));
this.bootstrapServers = (String) Objects.requireNonNull(params.get("bootstrap_servers"));
assert this.bootstrapServers != null;
this.topic = ConfigurationUtils.readStringProperty(params, PROP_TOPIC);
this.bootstrapServers = ConfigurationUtils.readStringProperty(params, PROP_BOOTSTRAP_SERVERS);
}

/**
Expand Down

0 comments on commit 5b07cf1

Please sign in to comment.