Skip to content

Commit

Permalink
[FLINK-36494][table-common] Remove deprecated method Catalog#getTable…
Browse files Browse the repository at this point in the history
…Factory (#25948)
  • Loading branch information
xuyangzhong authored Jan 13, 2025
1 parent a8ec1b5 commit 3a95973
Show file tree
Hide file tree
Showing 11 changed files with 6 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,6 @@ private Optional<DynamicTableSink> getSupportsStagingDynamicTableSink(
ResolvedCatalogTable catalogTable) {
if (tableConfig.get(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED)) {
if (!TableFactoryUtil.isLegacyConnectorOptions(
catalog,
tableConfig,
isStreamingMode,
createTableOperation.getTableIdentifier(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogTable;
Expand All @@ -39,12 +38,9 @@
import org.apache.flink.table.legacy.sinks.TableSink;
import org.apache.flink.table.legacy.sources.TableSource;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/** Utility for dealing with {@link TableFactory} using the {@link TableFactoryService}. */
Expand All @@ -70,28 +66,14 @@ public static <T> TableSource<T> findAndCreateTableSource(TableSourceFactory.Con
*/
@SuppressWarnings("unchecked")
public static <T> TableSource<T> findAndCreateTableSource(
@Nullable Catalog catalog,
ObjectIdentifier objectIdentifier,
CatalogTable catalogTable,
ReadableConfig configuration,
boolean isTemporary) {
TableSourceFactory.Context context =
new TableSourceFactoryContextImpl(
objectIdentifier, catalogTable, configuration, isTemporary);
Optional<TableFactory> factoryOptional =
catalog == null ? Optional.empty() : catalog.getTableFactory();
if (factoryOptional.isPresent()) {
TableFactory factory = factoryOptional.get();
if (factory instanceof TableSourceFactory) {
return ((TableSourceFactory<T>) factory).createTableSource(context);
} else {
throw new ValidationException(
"Cannot query a sink-only table. "
+ "TableFactory provided by catalog must implement TableSourceFactory");
}
} else {
return findAndCreateTableSource(context);
}
return findAndCreateTableSource(context);
}

/** Returns a table sink matching the context. */
Expand All @@ -113,7 +95,6 @@ public static <T> TableSink<T> findAndCreateTableSink(TableSinkFactory.Context c
*/
@SuppressWarnings("unchecked")
public static <T> TableSink<T> findAndCreateTableSink(
@Nullable Catalog catalog,
ObjectIdentifier objectIdentifier,
CatalogTable catalogTable,
ReadableConfig configuration,
Expand All @@ -126,30 +107,11 @@ public static <T> TableSink<T> findAndCreateTableSink(
configuration,
!isStreamingMode,
isTemporary);
if (catalog == null) {
return findAndCreateTableSink(context);
} else {
return createTableSinkForCatalogTable(catalog, context)
.orElseGet(() -> findAndCreateTableSink(context));
}
}

/**
* Creates a table sink for a {@link CatalogTable} using table factory associated with the
* catalog.
*/
public static Optional<TableSink> createTableSinkForCatalogTable(
Catalog catalog, TableSinkFactory.Context context) {
TableFactory tableFactory = catalog.getTableFactory().orElse(null);
if (tableFactory instanceof TableSinkFactory) {
return Optional.ofNullable(((TableSinkFactory) tableFactory).createTableSink(context));
}
return Optional.empty();
return findAndCreateTableSink(context);
}

/** Checks whether the {@link CatalogTable} uses legacy connector sink options. */
public static boolean isLegacyConnectorOptions(
@Nullable Catalog catalog,
ReadableConfig configuration,
boolean isStreamingMode,
ObjectIdentifier objectIdentifier,
Expand All @@ -165,7 +127,6 @@ public static boolean isLegacyConnectorOptions(
// try to create legacy table source using the options,
// some legacy factories may use the 'type' key
TableFactoryUtil.findAndCreateTableSink(
catalog,
objectIdentifier,
catalogTable,
configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public TableResultInternal execute(Context ctx) {
}

if (TableFactoryUtil.isLegacyConnectorOptions(
catalogManager.getCatalog(objectIdentifier.getCatalogName()).orElse(null),
ctx.getTableConfig(),
ctx.isStreamingMode(),
tableIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FunctionDefinitionFactory;
import org.apache.flink.table.legacy.factories.TableFactory;
import org.apache.flink.table.procedures.Procedure;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -87,20 +86,6 @@ default Optional<Factory> getFactory() {
return Optional.empty();
}

/**
* Get an optional {@link TableFactory} instance that's responsible for generating table-related
* instances stored in this catalog, instances such as source/sink.
*
* @return an optional TableFactory instance
* @deprecated Use {@link #getFactory()} for the new factory stack. The new factory stack uses
* the new table sources and sinks defined in FLIP-95 and a slightly different discovery
* mechanism.
*/
@Deprecated
default Optional<TableFactory> getTableFactory() {
return Optional.empty();
}

/**
* Get an optional {@link FunctionDefinitionFactory} instance that's responsible for
* instantiating function definitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ContextResolvedTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
Expand Down Expand Up @@ -68,9 +67,7 @@ public class DeletePushDownUtils {
* can't get the {@link DynamicTableSink}.
*/
public static Optional<DynamicTableSink> getDynamicTableSink(
ContextResolvedTable contextResolvedTable,
LogicalTableModify tableModify,
CatalogManager catalogManager) {
ContextResolvedTable contextResolvedTable, LogicalTableModify tableModify) {
final FlinkContext context = ShortcutUtils.unwrapContext(tableModify.getCluster());

CatalogBaseTable catalogBaseTable = contextResolvedTable.getTable();
Expand All @@ -83,9 +80,6 @@ public static Optional<DynamicTableSink> getDynamicTableSink(
// only consider the CatalogTable that doesn't use legacy connector sink option
if (!contextResolvedTable.isAnonymous()
&& !TableFactoryUtil.isLegacyConnectorOptions(
catalogManager
.getCatalog(objectIdentifier.getCatalogName())
.orElse(null),
context.getTableConfig(),
!context.isBatchMode(),
objectIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1223,8 +1223,7 @@ private Operation convertDelete(SqlDelete sqlDelete) {
catalogManager.qualifyIdentifier(unresolvedTableIdentifier));
// try push down delete
Optional<DynamicTableSink> optionalDynamicTableSink =
DeletePushDownUtils.getDynamicTableSink(
contextResolvedTable, tableModify, catalogManager);
DeletePushDownUtils.getDynamicTableSink(contextResolvedTable, tableModify);
if (optionalDynamicTableSink.isPresent()) {
DynamicTableSink dynamicTableSink = optionalDynamicTableSink.get();
// if the table sink supports delete push down
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ private static boolean isLegacySourceOptions(CatalogSchemaTable schemaTable) {
TableSchemaUtils.removeTimeAttributeFromResolvedSchema(
originTable.getResolvedSchema());
TableFactoryUtil.findAndCreateTableSource(
schemaTable.getContextResolvedTable().getCatalog().orElse(null),
schemaTable.getContextResolvedTable().getIdentifier(),
new ResolvedCatalogTable(
CatalogTable.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,6 @@ abstract class PlannerBase(
if (
!contextResolvedTable.isAnonymous &&
TableFactoryUtil.isLegacyConnectorOptions(
catalogManager.getCatalog(objectIdentifier.getCatalogName).orElse(null),
tableConfig,
isStreamingMode,
objectIdentifier,
Expand All @@ -479,7 +478,6 @@ abstract class PlannerBase(
)
) {
val tableSink = TableFactoryUtil.findAndCreateTableSink(
catalog.orNull,
objectIdentifier,
tableToFind,
getTableConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ class LegacyCatalogSourceTable[T](
TableSchemaUtils.removeTimeAttributeFromResolvedSchema(
schemaTable.getContextResolvedTable.getResolvedSchema)
val tableSource = TableFactoryUtil.findAndCreateTableSource(
schemaTable.getContextResolvedTable.getCatalog.orElse(null),
identifier,
new ResolvedCatalogTable(
CatalogTable.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void testGetDynamicTableSink() {
tableId, catalog, catalogManager.resolveCatalogTable(catalogTable));
LogicalTableModify tableModify = getTableModifyFromSql("DELETE FROM t");
Optional<DynamicTableSink> optionalDynamicTableSink =
DeletePushDownUtils.getDynamicTableSink(resolvedTable, tableModify, catalogManager);
DeletePushDownUtils.getDynamicTableSink(resolvedTable, tableModify);
// verify we can get the dynamic table sink
assertThat(optionalDynamicTableSink).isPresent();
assertThat(optionalDynamicTableSink.get())
Expand All @@ -115,7 +115,7 @@ public void testGetDynamicTableSink() {
tableId, catalog, catalogManager.resolveCatalogTable(catalogTable));
tableModify = getTableModifyFromSql("DELETE FROM t1");
optionalDynamicTableSink =
DeletePushDownUtils.getDynamicTableSink(resolvedTable, tableModify, catalogManager);
DeletePushDownUtils.getDynamicTableSink(resolvedTable, tableModify);
// verify it should be empty since it's not an instance of DynamicTableSink but is legacy
// TableSink
assertThat(optionalDynamicTableSink).isEmpty();
Expand Down

This file was deleted.

0 comments on commit 3a95973

Please sign in to comment.