Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor MetaDataRefreshEngine.refreshFederation() #34450

Merged
merged 4 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,23 +101,18 @@ public boolean execute(final ShardingSphereDatabase database, final QueryContext
new ExecuteQueryCallbackFactory(prepareEngine.getType()).newInstance(database, queryContext), new SQLFederationContext(false, queryContext, metaData, connection.getProcessId()));
return null != resultSet;
}
MetaDataRefreshEngine metaDataRefreshEngine = getMetaDataRefreshEngine(database);
if (sqlFederationEngine.enabled() && metaDataRefreshEngine.isFederation(queryContext.getSqlStatementContext())) {
metaDataRefreshEngine.refresh(queryContext.getSqlStatementContext());
if (sqlFederationEngine.enabled()) {
new MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(), database, metaData.getProps())
.refreshFederation(queryContext.getSqlStatementContext());
return true;
}
if (transactionExecutor.decide(queryContext)) {
return transactionExecutor.execute((TCLStatement) queryContext.getSqlStatementContext().getSqlStatement());
}
ExecutionContext executionContext =
new KernelProcessor().generateExecutionContext(queryContext, metaData.getGlobalRuleMetaData(), metaData.getProps());
ExecutionContext executionContext = new KernelProcessor().generateExecutionContext(queryContext, metaData.getGlobalRuleMetaData(), metaData.getProps());
return executePushDown(database, executionContext, prepareEngine, executeCallback, addCallback, replayCallback);
}

private MetaDataRefreshEngine getMetaDataRefreshEngine(final ShardingSphereDatabase database) {
return new MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(), database, metaData.getProps());
}

@SuppressWarnings("rawtypes")
private boolean executePushDown(final ShardingSphereDatabase database, final ExecutionContext executionContext, final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final StatementExecuteCallback executeCallback, final StatementAddCallback addCallback, final StatementReplayCallback replayCallback) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,11 @@ private boolean doExecute(final ShardingSphereDatabase database, final Execution
processEngine.executeSQL(executionGroupContext, executionContext.getQueryContext());
List<Boolean> results = jdbcExecutor.execute(executionGroupContext,
new ExecuteCallbackFactory(prepareEngine.getType()).newInstance(database, executeCallback, executionContext.getSqlStatementContext().getSqlStatement()));
if (isNeedImplicitCommit(executionContext.getQueryContext().getSqlStatementContext())) {
if (isNeedImplicitCommit(executionContext.getSqlStatementContext())) {
connection.commit();
}
if (MetaDataRefreshEngine.isRefreshMetaDataRequired(executionContext.getSqlStatementContext())) {
new MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(), database, metaData.getProps())
.refresh(executionContext.getQueryContext().getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits());
}
new MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(), database, metaData.getProps())
.refresh(executionContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits());
return null != results && !results.isEmpty() && null != results.get(0) && results.get(0);
} finally {
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,16 @@ private int doExecuteUpdate(final ShardingSphereDatabase database, final Executi
try {
processEngine.executeSQL(executionGroupContext, executionContext.getQueryContext());
JDBCExecutorCallback<Integer> callback = new ExecuteUpdateCallbackFactory(prepareEngine.getType())
.newInstance(database, executionContext.getQueryContext().getSqlStatementContext().getSqlStatement(), updateCallback);
.newInstance(database, executionContext.getSqlStatementContext().getSqlStatement(), updateCallback);
List<Integer> updateCounts = jdbcExecutor.execute(executionGroupContext, callback);
if (MetaDataRefreshEngine.isRefreshMetaDataRequired(executionContext.getQueryContext().getSqlStatementContext())) {
if (isNeedImplicitCommit(executionContext.getQueryContext().getSqlStatementContext())) {
MetaDataRefreshEngine metaDataRefreshEngine = new MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(), database, props);
if (metaDataRefreshEngine.isNeedRefreshMetaData(executionContext.getSqlStatementContext())) {
if (isNeedImplicitCommit(executionContext.getSqlStatementContext())) {
connection.commit();
}
new MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(), database, props)
.refresh(executionContext.getQueryContext().getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits());
metaDataRefreshEngine.refresh(executionContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits());
}
return isNeedAccumulate(database.getRuleMetaData().getRules(), executionContext.getQueryContext().getSqlStatementContext()) ? accumulate(updateCounts) : updateCounts.get(0);
return isNeedAccumulate(database.getRuleMetaData().getRules(), executionContext.getSqlStatementContext()) ? accumulate(updateCounts) : updateCounts.get(0);
} finally {
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.route.context.RouteUnit;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.mode.metadata.refresher.federation.FederationMetaDataRefresher;
import org.apache.shardingsphere.mode.metadata.refresher.metadata.federation.FederationMetaDataRefresher;
import org.apache.shardingsphere.mode.persist.service.MetaDataManagerPersistService;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.AlterIndexStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.AlterSchemaStatement;
Expand Down Expand Up @@ -67,6 +67,17 @@ public final class MetaDataRefreshEngine {

private final ConfigurationProperties props;

/**
* Whether to need refresh meta data.
*
* @param sqlStatementContext SQL statement context
* @return is need refresh meta data or not
*/
public boolean isNeedRefreshMetaData(final SQLStatementContext sqlStatementContext) {
Class<?> sqlStatementClass = sqlStatementContext.getSqlStatement().getClass().getSuperclass();
return DDL_STATEMENT_CLASSES.contains(sqlStatementClass);
}

/**
* Refresh meta data.
*
Expand All @@ -76,29 +87,19 @@ public final class MetaDataRefreshEngine {
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public void refresh(final SQLStatementContext sqlStatementContext, final Collection<RouteUnit> routeUnits) throws SQLException {
Class sqlStatementClass = sqlStatementContext.getSqlStatement().getClass().getSuperclass();
if (!DDL_STATEMENT_CLASSES.contains(sqlStatementClass)) {
if (!isNeedRefreshMetaData(sqlStatementContext)) {
return;
}
Optional<MetaDataRefresher> schemaRefresher = TypedSPILoader.findService(MetaDataRefresher.class, sqlStatementClass);
if (schemaRefresher.isPresent()) {
Collection<String> logicDataSourceNames = routeUnits.stream().map(each -> each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
String schemaName = sqlStatementContext instanceof TableAvailable ? getSchemaName(sqlStatementContext) : null;
DatabaseType databaseType = routeUnits.stream().map(each -> database.getResourceMetaData().getStorageUnits().get(each.getDataSourceMapper().getActualName()))
.filter(Objects::nonNull).findFirst().map(StorageUnit::getStorageType).orElseGet(sqlStatementContext::getDatabaseType);
schemaRefresher.get().refresh(metaDataManagerPersistService, database, logicDataSourceNames, schemaName, databaseType, sqlStatementContext.getSqlStatement(), props);
Class<?> sqlStatementClass = sqlStatementContext.getSqlStatement().getClass().getSuperclass();
Optional<MetaDataRefresher> metaDataRefresher = TypedSPILoader.findService(MetaDataRefresher.class, sqlStatementClass);
if (!metaDataRefresher.isPresent()) {
return;
}
}

/**
* Refresh meta data for federation.
*
* @param sqlStatementContext SQL statement context
*/
@SuppressWarnings("unchecked")
public void refresh(final SQLStatementContext sqlStatementContext) {
getFederationMetaDataRefresher(sqlStatementContext).ifPresent(
optional -> optional.refresh(metaDataManagerPersistService, database, getSchemaName(sqlStatementContext), sqlStatementContext.getSqlStatement()));
Collection<String> logicDataSourceNames = routeUnits.stream().map(each -> each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
String schemaName = sqlStatementContext instanceof TableAvailable ? getSchemaName(sqlStatementContext) : null;
DatabaseType databaseType = routeUnits.stream().map(each -> database.getResourceMetaData().getStorageUnits().get(each.getDataSourceMapper().getActualName()))
.filter(Objects::nonNull).findFirst().map(StorageUnit::getStorageType).orElseGet(sqlStatementContext::getDatabaseType);
metaDataRefresher.get().refresh(metaDataManagerPersistService, database, logicDataSourceNames, schemaName, databaseType, sqlStatementContext.getSqlStatement(), props);
}

private String getSchemaName(final SQLStatementContext sqlStatementContext) {
Expand All @@ -107,27 +108,14 @@ private String getSchemaName(final SQLStatementContext sqlStatementContext) {
}

/**
* SQL statement is federation or not.
*
* @param sqlStatementContext SQL statement context
* @return is federation or not
*/
public boolean isFederation(final SQLStatementContext sqlStatementContext) {
return getFederationMetaDataRefresher(sqlStatementContext).isPresent();
}

@SuppressWarnings("rawtypes")
private Optional<FederationMetaDataRefresher> getFederationMetaDataRefresher(final SQLStatementContext sqlStatementContext) {
return TypedSPILoader.findService(FederationMetaDataRefresher.class, sqlStatementContext.getSqlStatement().getClass().getSuperclass());
}

/**
* Is refresh meta data required.
* Refresh meta data for federation.
*
* @param sqlStatementContext SQL statement context
* @return is refresh meta data required or not
*/
public static boolean isRefreshMetaDataRequired(final SQLStatementContext sqlStatementContext) {
return DDL_STATEMENT_CLASSES.contains(sqlStatementContext.getSqlStatement().getClass().getSuperclass());
@SuppressWarnings("unchecked")
public void refreshFederation(final SQLStatementContext sqlStatementContext) {
Class<?> sqlStatementClass = sqlStatementContext.getSqlStatement().getClass().getSuperclass();
TypedSPILoader.findService(FederationMetaDataRefresher.class, sqlStatementClass).ifPresent(
optional -> optional.refresh(metaDataManagerPersistService, database, getSchemaName(sqlStatementContext), sqlStatementContext.getSqlStatement()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.mode.metadata.refresher.federation;
package org.apache.shardingsphere.mode.metadata.refresher.metadata.federation;

import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
* limitations under the License.
*/

package org.apache.shardingsphere.mode.metadata.refresher.federation.type;
package org.apache.shardingsphere.mode.metadata.refresher.metadata.federation.type;

import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereView;
import org.apache.shardingsphere.infra.metadata.database.schema.pojo.AlterSchemaMetaDataPOJO;
import org.apache.shardingsphere.mode.metadata.refresher.federation.FederationMetaDataRefresher;
import org.apache.shardingsphere.mode.metadata.refresher.metadata.federation.FederationMetaDataRefresher;
import org.apache.shardingsphere.mode.metadata.refresher.metadata.util.TableRefreshUtils;
import org.apache.shardingsphere.mode.persist.service.MetaDataManagerPersistService;
import org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
* limitations under the License.
*/

package org.apache.shardingsphere.mode.metadata.refresher.federation.type;
package org.apache.shardingsphere.mode.metadata.refresher.metadata.federation.type;

import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereView;
import org.apache.shardingsphere.infra.metadata.database.schema.pojo.AlterSchemaMetaDataPOJO;
import org.apache.shardingsphere.mode.metadata.refresher.federation.FederationMetaDataRefresher;
import org.apache.shardingsphere.mode.metadata.refresher.metadata.federation.FederationMetaDataRefresher;
import org.apache.shardingsphere.mode.metadata.refresher.metadata.util.TableRefreshUtils;
import org.apache.shardingsphere.mode.persist.service.MetaDataManagerPersistService;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.CreateViewStatement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
* limitations under the License.
*/

package org.apache.shardingsphere.mode.metadata.refresher.federation.type;
package org.apache.shardingsphere.mode.metadata.refresher.metadata.federation.type;

import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.pojo.AlterSchemaMetaDataPOJO;
import org.apache.shardingsphere.mode.metadata.refresher.federation.FederationMetaDataRefresher;
import org.apache.shardingsphere.mode.metadata.refresher.metadata.federation.FederationMetaDataRefresher;
import org.apache.shardingsphere.mode.persist.service.MetaDataManagerPersistService;
import org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.DropViewStatement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
# limitations under the License.
#

org.apache.shardingsphere.mode.metadata.refresher.federation.type.CreateViewFederationMetaDataRefresher
org.apache.shardingsphere.mode.metadata.refresher.federation.type.AlterViewFederationMetaDataRefresher
org.apache.shardingsphere.mode.metadata.refresher.federation.type.DropViewFederationMetaDataRefresher
org.apache.shardingsphere.mode.metadata.refresher.metadata.federation.type.CreateViewFederationMetaDataRefresher
org.apache.shardingsphere.mode.metadata.refresher.metadata.federation.type.AlterViewFederationMetaDataRefresher
org.apache.shardingsphere.mode.metadata.refresher.metadata.federation.type.DropViewFederationMetaDataRefresher
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ public final class StandardDatabaseConnector implements DatabaseConnector {

private final ProxySQLExecutor proxySQLExecutor;

private final MetaDataRefreshEngine metaDataRefreshEngine;

private final Collection<Statement> cachedStatements = Collections.newSetFromMap(new ConcurrentHashMap<>());

private final Collection<ResultSet> cachedResultSets = Collections.newSetFromMap(new ConcurrentHashMap<>());
Expand All @@ -131,6 +133,8 @@ public StandardDatabaseConnector(final String driverType, final QueryContext que
prepareCursorStatementContext((CursorAvailable) sqlStatementContext);
}
proxySQLExecutor = new ProxySQLExecutor(driverType, databaseConnectionManager, this, queryContext);
metaDataRefreshEngine = new MetaDataRefreshEngine(
contextManager.getPersistServiceFacade().getMetaDataManagerPersistService(), database, contextManager.getMetaDataContexts().getMetaData().getProps());
}

private void checkBackendReady(final SQLStatementContext sqlStatementContext) {
Expand Down Expand Up @@ -179,9 +183,8 @@ public ResponseHeader execute() throws SQLException {
if (proxySQLExecutor.getSqlFederationEngine().decide(queryContext, contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData())) {
return processExecuteFederation(doExecuteFederation());
}
MetaDataRefreshEngine metaDataRefreshEngine = getMetaDataRefreshEngine();
if (proxySQLExecutor.getSqlFederationEngine().enabled() && metaDataRefreshEngine.isFederation(queryContext.getSqlStatementContext())) {
metaDataRefreshEngine.refresh(queryContext.getSqlStatementContext());
if (proxySQLExecutor.getSqlFederationEngine().enabled()) {
metaDataRefreshEngine.refreshFederation(queryContext.getSqlStatementContext());
return new UpdateResponseHeader(queryContext.getSqlStatementContext().getSqlStatement());
}
ExecutionContext executionContext = generateExecutionContext();
Expand Down Expand Up @@ -233,9 +236,7 @@ private ResponseHeader doExecute(final ExecutionContext executionContext) throws
List<ExecuteResult> executeResults = advancedExecutors.isEmpty()
? proxySQLExecutor.execute(executionContext)
: advancedExecutors.iterator().next().execute(executionContext, contextManager, database, this);
if (MetaDataRefreshEngine.isRefreshMetaDataRequired(queryContext.getSqlStatementContext())) {
getMetaDataRefreshEngine().refresh(queryContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits());
}
metaDataRefreshEngine.refresh(queryContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits());
Object executeResultSample = executeResults.iterator().next();
return executeResultSample instanceof QueryResult
? processExecuteQuery(queryContext.getSqlStatementContext(), executeResults.stream().map(QueryResult.class::cast).collect(Collectors.toList()), (QueryResult) executeResultSample)
Expand Down Expand Up @@ -271,10 +272,6 @@ private ResponseHeader processExecuteFederation(final ResultSet resultSet) throw
return new QueryResponseHeader(queryHeaders);
}

private MetaDataRefreshEngine getMetaDataRefreshEngine() {
return new MetaDataRefreshEngine(contextManager.getPersistServiceFacade().getMetaDataManagerPersistService(), database, contextManager.getMetaDataContexts().getMetaData().getProps());
}

private QueryResponseHeader processExecuteQuery(final SQLStatementContext sqlStatementContext, final List<QueryResult> queryResults, final QueryResult queryResultSample) throws SQLException {
queryHeaders = createQueryHeaders(sqlStatementContext, queryResultSample);
mergedResult = mergeQuery(sqlStatementContext, queryResults);
Expand Down
Loading