Skip to content

Commit

Permalink
Add MySQL DataType handler to transform datatype value (opensearch-pr…
Browse files Browse the repository at this point in the history
…oject#5165)

* Add MySQL data type handler for transformation

Signed-off-by: Dinu John <[email protected]>

* Add unit test for MySQL DataType handler

Signed-off-by: Dinu John <[email protected]>

* Add Set String values and Enum String values to TableMetadata

Signed-off-by: Dinu John <[email protected]>

* Update interface

Signed-off-by: Dinu John <[email protected]>

* Update TableMetadata constructor

Signed-off-by: Dinu John <[email protected]>

* Use Number datatype for NumericTypeHandler to retain Numeric type

Signed-off-by: Dinu John <[email protected]>

---------

Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh authored Nov 4, 2024
1 parent 057ae8e commit e49c997
Show file tree
Hide file tree
Showing 16 changed files with 776 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.opensearch.dataprepper.plugins.source.rds.datatype;

import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;

/**
* Interface for handling MySQL data type conversions.
* Implementations of this interface are responsible for converting MySQL column values
* to appropriate string representations based on their data types.
*/
public interface DataTypeHandler {
/**
* Handles the conversion of a MySQL column value to its string representation.
*
* @param columnType The MySQL data type of the column being processed
* @param columnName The name of the column being processed
* @param value The value to be converted, can be null
* @param metadata Additional metadata about the table structure and properties
* @return A string representation of the converted value
*/
Object handle(MySQLDataType columnType, String columnName, Object value, TableMetadata metadata);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.opensearch.dataprepper.plugins.source.rds.datatype;

import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.BinaryTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.JsonTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.NumericTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.SpatialTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.StringTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.datatype.impl.TemporalTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;

import java.util.Map;

public class DataTypeHelper {
private static final Map<MySQLDataType.DataCategory, DataTypeHandler> typeHandlers = Map.of(
MySQLDataType.DataCategory.NUMERIC, new NumericTypeHandler(),
MySQLDataType.DataCategory.STRING, new StringTypeHandler(),
MySQLDataType.DataCategory.TEMPORAL, new TemporalTypeHandler(),
MySQLDataType.DataCategory.BINARY, new BinaryTypeHandler(),
MySQLDataType.DataCategory.JSON, new JsonTypeHandler(),
MySQLDataType.DataCategory.SPATIAL, new SpatialTypeHandler()
);

public static Object getDataByColumnType(final MySQLDataType columnType, final String columnName, final Object value,
final TableMetadata metadata) {
if (value == null) {
return null;
}

return typeHandlers.get(columnType.getCategory()).handle(columnType, columnName, value, metadata);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package org.opensearch.dataprepper.plugins.source.rds.datatype;

import java.util.HashMap;
import java.util.Map;

public enum MySQLDataType {
// Numeric types
TINYINT("tinyint", DataCategory.NUMERIC, DataSubCategory.SIGNED),
TINYINT_UNSIGNED("tinyint unsigned", DataCategory.NUMERIC, DataSubCategory.UNSIGNED),
SMALLINT("smallint", DataCategory.NUMERIC, DataSubCategory.SIGNED),
SMALLINT_UNSIGNED("smallint unsigned", DataCategory.NUMERIC, DataSubCategory.UNSIGNED),
MEDIUMINT("mediumint", DataCategory.NUMERIC, DataSubCategory.SIGNED),
MEDIUMINT_UNSIGNED("mediumint unsigned", DataCategory.NUMERIC, DataSubCategory.UNSIGNED),
INT("int", DataCategory.NUMERIC, DataSubCategory.SIGNED),
INT_UNSIGNED("int unsigned", DataCategory.NUMERIC, DataSubCategory.UNSIGNED),
BIGINT("bigint", DataCategory.NUMERIC, DataSubCategory.SIGNED),
BIGINT_UNSIGNED("bigint unsigned", DataCategory.NUMERIC, DataSubCategory.UNSIGNED),
DECIMAL("decimal", DataCategory.NUMERIC, DataSubCategory.SIGNED),
FLOAT("float", DataCategory.NUMERIC, DataSubCategory.SIGNED),
DOUBLE("double", DataCategory.NUMERIC, DataSubCategory.SIGNED),
BIT("bit", DataCategory.NUMERIC, DataSubCategory.BIT),

// String types
CHAR("char", DataCategory.STRING, DataSubCategory.CHAR),
VARCHAR("varchar", DataCategory.STRING, DataSubCategory.CHAR),
TINYTEXT("tinytext", DataCategory.STRING, DataSubCategory.BYTES),
TEXT("text", DataCategory.STRING, DataSubCategory.BYTES),
MEDIUMTEXT("mediumtext", DataCategory.STRING, DataSubCategory.BYTES),
LONGTEXT("longtext", DataCategory.STRING, DataSubCategory.BYTES),
ENUM("enum", DataCategory.STRING, DataSubCategory.ENUM),
SET("set", DataCategory.STRING, DataSubCategory.SET),

// Date and time types
DATE("date", DataCategory.TEMPORAL),
TIME("time", DataCategory.TEMPORAL),
DATETIME("datetime", DataCategory.TEMPORAL),
TIMESTAMP("timestamp", DataCategory.TEMPORAL),
YEAR("year", DataCategory.TEMPORAL),

// Binary types
BINARY("binary", DataCategory.BINARY),
VARBINARY("varbinary", DataCategory.BINARY),
TINYBLOB("tinyblob", DataCategory.BINARY),
BLOB("blob", DataCategory.BINARY),
MEDIUMBLOB("mediumblob", DataCategory.BINARY),
LONGBLOB("longblob", DataCategory.BINARY),

// Special types
JSON("json", DataCategory.JSON),
GEOMETRY("geometry", DataCategory.SPATIAL);

private static final Map<String, MySQLDataType> TYPE_MAP;

static {
TYPE_MAP = new HashMap<>(values().length);
for (MySQLDataType dataType : values()) {
TYPE_MAP.put(dataType.dataType, dataType);
}
}

private final String dataType;
private final DataCategory category;
private final DataSubCategory subCategory;

MySQLDataType(String dataType, DataCategory category) {
this.dataType = dataType;
this.category = category;
this.subCategory = null;
}

MySQLDataType(String dataType, DataCategory category, DataSubCategory subCategory) {
this.dataType = dataType;
this.category = category;
this.subCategory = subCategory;
}

public String getDataType() {
return dataType;
}

public DataCategory getCategory() {
return category;
}

public DataSubCategory getSubCategory() {
return subCategory;
}

public static MySQLDataType byDataType(final String dataType) {
final MySQLDataType type = TYPE_MAP.get(dataType.toLowerCase());
if (type == null) {
throw new IllegalArgumentException("Unsupported MySQL data type: " + dataType);
}
return type;
}

public enum DataCategory {
NUMERIC,
STRING,
TEMPORAL,
BINARY,
JSON,
SPATIAL
}

public enum DataSubCategory {
BIT,
SIGNED,
UNSIGNED,
CHAR,
BYTES,
TEMPORAL,
BINARY,
JSON,
SPATIAL,
ENUM,
SET
}

public boolean isNumeric() {
return category == DataCategory.NUMERIC;
}

public boolean isUnsigned() {
return category == DataCategory.NUMERIC && subCategory == DataSubCategory.UNSIGNED;
}

public boolean isString() {
return category == DataCategory.STRING;
}

public boolean isStringBytes() {
return category == DataCategory.STRING && subCategory == DataSubCategory.BYTES;
}

public boolean isStringSet() {
return category == DataCategory.STRING && subCategory == DataSubCategory.SET;
}

public boolean isStringEnum() {
return category == DataCategory.STRING && subCategory == DataSubCategory.ENUM;
}

public boolean isTemporal() {
return category == DataCategory.TEMPORAL;
}

public boolean isBinary() {
return category == DataCategory.BINARY;
}

public boolean isJson() {
return category == DataCategory.JSON;
}

public boolean isSpatial() {
return category == DataCategory.SPATIAL;
}

public long getUnsignedMask() {
switch (this) {
case TINYINT_UNSIGNED:
return 0xFFL;
case SMALLINT_UNSIGNED:
return 0xFFFFL;
case MEDIUMINT_UNSIGNED:
return 0xFFFFFFL;
case INT_UNSIGNED:
return 0xFFFFFFFFL;
case BIGINT_UNSIGNED:
return 0xFFFFFFFFFFFFFFFFL;
default:
throw new UnsupportedOperationException("No mask for non-unsigned type: " + this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.opensearch.dataprepper.plugins.source.rds.datatype.impl;

import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType;

import java.util.Base64;

public class BinaryTypeHandler implements DataTypeHandler {

@Override
public String handle(final MySQLDataType columnType, final String columnName, final Object value,
final TableMetadata metadata) {
return Base64.getEncoder().encodeToString((byte[]) value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.opensearch.dataprepper.plugins.source.rds.datatype.impl;

import com.github.shyiko.mysql.binlog.event.deserialization.json.JsonBinary;
import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType;

import java.io.IOException;

public class JsonTypeHandler implements DataTypeHandler {

@Override
public String handle(final MySQLDataType columnType, final String columnName, final Object value,
final TableMetadata metadata) {
return convertToJson((byte[]) value);
}

private String convertToJson(final byte[] jsonBytes) {
try {
return JsonBinary.parseAsString(jsonBytes);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.opensearch.dataprepper.plugins.source.rds.datatype.impl;

import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType;

import java.math.BigInteger;

public class NumericTypeHandler implements DataTypeHandler {

@Override
public Number handle(final MySQLDataType columnType, final String columnName, final Object value,
final TableMetadata metadata) {
if (value == null) {
return null;
}

if (!columnType.isNumeric()) {
throw new IllegalArgumentException("ColumnType is not numeric: " + columnType);
}

if (!(value instanceof Number)) {
throw new IllegalArgumentException("Value is not a number: " + value);
}

return handleNumericType(columnType, (Number) value);
}

private Number handleNumericType(final MySQLDataType columnType, final Number value) {
if (columnType.isUnsigned()) {
if (columnType == MySQLDataType.BIGINT_UNSIGNED) {
return handleUnsignedDouble(value);
} else {
return handleUnsignedNumber(value, columnType.getUnsignedMask());
}
}
return value;
}

private Number handleUnsignedNumber(final Number value, final long mask) {
final long longVal = value.longValue();
return longVal < 0 ? longVal & mask : longVal;
}

private Number handleUnsignedDouble(final Number value) {
long longVal = value.longValue();
if (longVal < 0) {
return BigInteger.valueOf(longVal & Long.MAX_VALUE)
.add(BigInteger.valueOf(Long.MAX_VALUE))
.add(BigInteger.ONE);
}
return value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.opensearch.dataprepper.plugins.source.rds.datatype.impl;

import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType;

public class SpatialTypeHandler implements DataTypeHandler {

@Override
public String handle(final MySQLDataType columnType, final String columnName, final Object value,
final TableMetadata metadata) {
// TODO: Implement the transformation
return new String((byte[]) value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.opensearch.dataprepper.plugins.source.rds.datatype.impl;

import org.opensearch.dataprepper.plugins.source.rds.datatype.DataTypeHandler;
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.datatype.MySQLDataType;

import java.util.ArrayList;
import java.util.List;

public class StringTypeHandler implements DataTypeHandler {

@Override
public String handle(final MySQLDataType columnType, final String columnName, final Object value,
final TableMetadata metadata) {
if (columnType.isStringBytes()) {
return new String((byte[]) value);
} else if (columnType.isStringEnum() && value instanceof Integer) {
return getEnumValue((int) value, metadata.getEnumStrValues().get(columnName));
} else if (columnType.isStringSet() && value instanceof Long) {
return getSetValues((long) value, metadata.getSetStrValues().get(columnName)).toString();
} else {
return value.toString();
}
}

private List<String> getSetValues(final long numericValue, final String[] setStrValues) {
final List<String> setValues = new ArrayList<>();
for (int i = 0; i < setStrValues.length; i++) {
if ((numericValue & (1L << i)) != 0) {
setValues.add(setStrValues[i].trim());
}
}

return setValues;
}

private String getEnumValue(final int numericValue, final String[] enumStrValues) {
return enumStrValues[numericValue - 1];
}
}
Loading

0 comments on commit e49c997

Please sign in to comment.