Skip to content

Commit

Permalink
Fix PostCommit Java Nexmark Dataflow job (#33979)
Browse files Browse the repository at this point in the history
* Synchronize the singleton ServiceLoader to avoid thread races
  • Loading branch information
Amar3tto authored Feb 17, 2025
1 parent 53080f1 commit 7b51e49
Showing 1 changed file with 51 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Type;
import java.util.ServiceLoader;
import javax.annotation.concurrent.GuardedBy;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.asm.AsmVisitorWrapper;
import net.bytebuddy.description.type.TypeDescription;
Expand Down Expand Up @@ -57,12 +58,15 @@
"rawtypes"
})
public class ConvertHelpers {

private static class SchemaInformationProviders {
@GuardedBy("lock")
private static final ServiceLoader<SchemaInformationProvider> INSTANCE =
ServiceLoader.load(SchemaInformationProvider.class);
}

private static final Logger LOG = LoggerFactory.getLogger(ConvertHelpers.class);
private static final Object lock = new Object();

/** Return value after converting a schema. */
public static class ConvertedSchemaInformation<T> implements Serializable {
Expand All @@ -85,57 +89,60 @@ public ConvertedSchemaInformation(
public static <T> ConvertedSchemaInformation<T> getConvertedSchemaInformation(
Schema inputSchema, TypeDescriptor<T> outputType, SchemaRegistry schemaRegistry) {

ConvertedSchemaInformation<T> schemaInformation = null;
// Try to load schema information from loaded providers
for (SchemaInformationProvider provider : SchemaInformationProviders.INSTANCE) {
schemaInformation = provider.getConvertedSchemaInformation(inputSchema, outputType);
if (schemaInformation != null) {
return schemaInformation;
try {
synchronized (lock) {
for (SchemaInformationProvider provider : SchemaInformationProviders.INSTANCE) {
ConvertedSchemaInformation<T> schemaInformation =
provider.getConvertedSchemaInformation(inputSchema, outputType);
if (schemaInformation != null) {
return schemaInformation;
}
}
}
} catch (Exception e) {
LOG.debug("No Schema information from loaded providers found for type {}", outputType, e);
}

if (schemaInformation == null) {
// Otherwise, try to find a schema for the output type in the schema registry.
Schema outputSchema = null;
SchemaCoder<T> outputSchemaCoder = null;
try {
outputSchema = schemaRegistry.getSchema(outputType);
outputSchemaCoder =
SchemaCoder.of(
outputSchema,
outputType,
schemaRegistry.getToRowFunction(outputType),
schemaRegistry.getFromRowFunction(outputType));
} catch (NoSuchSchemaException e) {
LOG.debug("No schema found for type " + outputType, e);
}
FieldType unboxedType = null;
// TODO: Properly handle nullable.
if (outputSchema == null || !outputSchema.assignableToIgnoreNullable(inputSchema)) {
// The schema is not convertible directly. Attempt to unbox it and see if the schema matches
// then.
Schema checkedSchema = inputSchema;
if (inputSchema.getFieldCount() == 1) {
unboxedType = inputSchema.getField(0).getType();
if (unboxedType.getTypeName().isCompositeType()
&& !outputSchema.assignableToIgnoreNullable(unboxedType.getRowSchema())) {
checkedSchema = unboxedType.getRowSchema();
} else {
checkedSchema = null;
}
}
if (checkedSchema != null) {
throw new RuntimeException(
"Cannot convert between types that don't have equivalent schemas."
+ " input schema: "
+ checkedSchema
+ " output schema: "
+ outputSchema);
// Otherwise, try to find a schema for the output type in the schema registry.
Schema outputSchema = null;
SchemaCoder<T> outputSchemaCoder = null;
try {
outputSchema = schemaRegistry.getSchema(outputType);
outputSchemaCoder =
SchemaCoder.of(
outputSchema,
outputType,
schemaRegistry.getToRowFunction(outputType),
schemaRegistry.getFromRowFunction(outputType));
} catch (NoSuchSchemaException e) {
LOG.debug("No schema found for type {}", outputType, e);
}
FieldType unboxedType = null;
// TODO: Properly handle nullable.
if (outputSchema == null || !outputSchema.assignableToIgnoreNullable(inputSchema)) {
// The schema is not convertible directly. Attempt to unbox it and see if the schema matches
// then.
Schema checkedSchema = inputSchema;
if (inputSchema.getFieldCount() == 1) {
unboxedType = inputSchema.getField(0).getType();
if (unboxedType.getTypeName().isCompositeType()
&& !outputSchema.assignableToIgnoreNullable(unboxedType.getRowSchema())) {
checkedSchema = unboxedType.getRowSchema();
} else {
checkedSchema = null;
}
}
schemaInformation = new ConvertedSchemaInformation<T>(outputSchemaCoder, unboxedType);
if (checkedSchema != null) {
throw new RuntimeException(
"Cannot convert between types that don't have equivalent schemas."
+ " input schema: "
+ checkedSchema
+ " output schema: "
+ outputSchema);
}
}
return schemaInformation;
return new ConvertedSchemaInformation<>(outputSchemaCoder, unboxedType);
}

/**
Expand Down

0 comments on commit 7b51e49

Please sign in to comment.