diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ConvertHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ConvertHelpers.java index 7f2403035d97..ff36faaaa1d6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ConvertHelpers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ConvertHelpers.java @@ -23,6 +23,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Type; import java.util.ServiceLoader; +import javax.annotation.concurrent.GuardedBy; import net.bytebuddy.ByteBuddy; import net.bytebuddy.asm.AsmVisitorWrapper; import net.bytebuddy.description.type.TypeDescription; @@ -57,12 +58,15 @@ "rawtypes" }) public class ConvertHelpers { + private static class SchemaInformationProviders { + @GuardedBy("lock") private static final ServiceLoader INSTANCE = ServiceLoader.load(SchemaInformationProvider.class); } private static final Logger LOG = LoggerFactory.getLogger(ConvertHelpers.class); + private static final Object lock = new Object(); /** Return value after converting a schema. */ public static class ConvertedSchemaInformation implements Serializable { @@ -85,57 +89,60 @@ public ConvertedSchemaInformation( public static ConvertedSchemaInformation getConvertedSchemaInformation( Schema inputSchema, TypeDescriptor outputType, SchemaRegistry schemaRegistry) { - ConvertedSchemaInformation schemaInformation = null; // Try to load schema information from loaded providers - for (SchemaInformationProvider provider : SchemaInformationProviders.INSTANCE) { - schemaInformation = provider.getConvertedSchemaInformation(inputSchema, outputType); - if (schemaInformation != null) { - return schemaInformation; + try { + synchronized (lock) { + for (SchemaInformationProvider provider : SchemaInformationProviders.INSTANCE) { + ConvertedSchemaInformation schemaInformation = + provider.getConvertedSchemaInformation(inputSchema, outputType); + if (schemaInformation != null) { + return schemaInformation; + } + } } + } catch (Exception e) { + LOG.debug("No Schema information from loaded providers found for type {}", outputType, e); } - if (schemaInformation == null) { - // Otherwise, try to find a schema for the output type in the schema registry. - Schema outputSchema = null; - SchemaCoder outputSchemaCoder = null; - try { - outputSchema = schemaRegistry.getSchema(outputType); - outputSchemaCoder = - SchemaCoder.of( - outputSchema, - outputType, - schemaRegistry.getToRowFunction(outputType), - schemaRegistry.getFromRowFunction(outputType)); - } catch (NoSuchSchemaException e) { - LOG.debug("No schema found for type " + outputType, e); - } - FieldType unboxedType = null; - // TODO: Properly handle nullable. - if (outputSchema == null || !outputSchema.assignableToIgnoreNullable(inputSchema)) { - // The schema is not convertible directly. Attempt to unbox it and see if the schema matches - // then. - Schema checkedSchema = inputSchema; - if (inputSchema.getFieldCount() == 1) { - unboxedType = inputSchema.getField(0).getType(); - if (unboxedType.getTypeName().isCompositeType() - && !outputSchema.assignableToIgnoreNullable(unboxedType.getRowSchema())) { - checkedSchema = unboxedType.getRowSchema(); - } else { - checkedSchema = null; - } - } - if (checkedSchema != null) { - throw new RuntimeException( - "Cannot convert between types that don't have equivalent schemas." - + " input schema: " - + checkedSchema - + " output schema: " - + outputSchema); + // Otherwise, try to find a schema for the output type in the schema registry. + Schema outputSchema = null; + SchemaCoder outputSchemaCoder = null; + try { + outputSchema = schemaRegistry.getSchema(outputType); + outputSchemaCoder = + SchemaCoder.of( + outputSchema, + outputType, + schemaRegistry.getToRowFunction(outputType), + schemaRegistry.getFromRowFunction(outputType)); + } catch (NoSuchSchemaException e) { + LOG.debug("No schema found for type {}", outputType, e); + } + FieldType unboxedType = null; + // TODO: Properly handle nullable. + if (outputSchema == null || !outputSchema.assignableToIgnoreNullable(inputSchema)) { + // The schema is not convertible directly. Attempt to unbox it and see if the schema matches + // then. + Schema checkedSchema = inputSchema; + if (inputSchema.getFieldCount() == 1) { + unboxedType = inputSchema.getField(0).getType(); + if (unboxedType.getTypeName().isCompositeType() + && !outputSchema.assignableToIgnoreNullable(unboxedType.getRowSchema())) { + checkedSchema = unboxedType.getRowSchema(); + } else { + checkedSchema = null; } } - schemaInformation = new ConvertedSchemaInformation(outputSchemaCoder, unboxedType); + if (checkedSchema != null) { + throw new RuntimeException( + "Cannot convert between types that don't have equivalent schemas." + + " input schema: " + + checkedSchema + + " output schema: " + + outputSchema); + } } - return schemaInformation; + return new ConvertedSchemaInformation<>(outputSchemaCoder, unboxedType); } /**