Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FederationMetaDataRefreshEngine #34452

Merged
merged 3 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.mode.metadata.refresher.metadata.MetaDataRefreshEngine;
import org.apache.shardingsphere.mode.metadata.refresher.metadata.federation.FederationMetaDataRefreshEngine;
import org.apache.shardingsphere.sql.parser.statement.core.statement.tcl.TCLStatement;
import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
Expand Down Expand Up @@ -102,8 +102,7 @@ public boolean execute(final ShardingSphereDatabase database, final QueryContext
return null != resultSet;
}
if (sqlFederationEngine.enabled()) {
new MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(), database, metaData.getProps())
.refreshFederation(queryContext.getSqlStatementContext());
new FederationMetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(), database).refresh(queryContext.getSqlStatementContext());
return true;
}
if (transactionExecutor.decide(queryContext)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private int doExecuteUpdate(final ShardingSphereDatabase database, final Executi
.newInstance(database, executionContext.getSqlStatementContext().getSqlStatement(), updateCallback);
List<Integer> updateCounts = jdbcExecutor.execute(executionGroupContext, callback);
MetaDataRefreshEngine metaDataRefreshEngine = new MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(), database, props);
if (metaDataRefreshEngine.isNeedRefreshMetaData(executionContext.getSqlStatementContext())) {
if (metaDataRefreshEngine.isNeedRefresh(executionContext.getSqlStatementContext())) {
if (isNeedImplicitCommit(executionContext.getSqlStatementContext())) {
connection.commit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,9 @@
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.route.context.RouteUnit;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.mode.metadata.refresher.metadata.federation.FederationMetaDataRefresher;
import org.apache.shardingsphere.mode.persist.service.MetaDataManagerPersistService;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.AlterIndexStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.AlterSchemaStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.AlterTableStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.AlterViewStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.CreateIndexStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.CreateSchemaStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.CreateTableStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.CreateViewStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.DDLStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.DropIndexStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.DropSchemaStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.DropTableStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.DropViewStatement;
import org.apache.shardingsphere.sql.parser.statement.core.statement.ddl.RenameTableStatement;

import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -57,10 +41,6 @@
@RequiredArgsConstructor
public final class MetaDataRefreshEngine {

private static final Collection<Class<? extends DDLStatement>> DDL_STATEMENT_CLASSES = Arrays.asList(CreateTableStatement.class, AlterTableStatement.class, DropTableStatement.class,
CreateViewStatement.class, AlterViewStatement.class, DropViewStatement.class, CreateIndexStatement.class, AlterIndexStatement.class, DropIndexStatement.class, CreateSchemaStatement.class,
AlterSchemaStatement.class, DropSchemaStatement.class, RenameTableStatement.class);

private final MetaDataManagerPersistService metaDataManagerPersistService;

private final ShardingSphereDatabase database;
Expand All @@ -73,9 +53,9 @@ public final class MetaDataRefreshEngine {
* @param sqlStatementContext SQL statement context
* @return is need refresh meta data or not
*/
public boolean isNeedRefreshMetaData(final SQLStatementContext sqlStatementContext) {
public boolean isNeedRefresh(final SQLStatementContext sqlStatementContext) {
Class<?> sqlStatementClass = sqlStatementContext.getSqlStatement().getClass().getSuperclass();
return DDL_STATEMENT_CLASSES.contains(sqlStatementClass);
return TypedSPILoader.findService(MetaDataRefresher.class, sqlStatementClass).isPresent();
}

/**
Expand All @@ -87,9 +67,6 @@ public boolean isNeedRefreshMetaData(final SQLStatementContext sqlStatementConte
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public void refresh(final SQLStatementContext sqlStatementContext, final Collection<RouteUnit> routeUnits) throws SQLException {
if (!isNeedRefreshMetaData(sqlStatementContext)) {
return;
}
Class<?> sqlStatementClass = sqlStatementContext.getSqlStatement().getClass().getSuperclass();
Optional<MetaDataRefresher> metaDataRefresher = TypedSPILoader.findService(MetaDataRefresher.class, sqlStatementClass);
if (!metaDataRefresher.isPresent()) {
Expand All @@ -106,16 +83,4 @@ private String getSchemaName(final SQLStatementContext sqlStatementContext) {
return ((TableAvailable) sqlStatementContext).getTablesContext().getSchemaName()
.orElseGet(() -> new DatabaseTypeRegistry(sqlStatementContext.getDatabaseType()).getDefaultSchemaName(database.getName())).toLowerCase();
}

/**
* Refresh meta data for federation.
*
* @param sqlStatementContext SQL statement context
*/
@SuppressWarnings("unchecked")
public void refreshFederation(final SQLStatementContext sqlStatementContext) {
Class<?> sqlStatementClass = sqlStatementContext.getSqlStatement().getClass().getSuperclass();
TypedSPILoader.findService(FederationMetaDataRefresher.class, sqlStatementClass).ifPresent(
optional -> optional.refresh(metaDataManagerPersistService, database, getSchemaName(sqlStatementContext), sqlStatementContext.getSqlStatement()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.mode.metadata.refresher.metadata.federation;

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.mode.persist.service.MetaDataManagerPersistService;

/**
* Federation meta data refresh engine.
*/
@RequiredArgsConstructor
public final class FederationMetaDataRefreshEngine {

private final MetaDataManagerPersistService metaDataManagerPersistService;

private final ShardingSphereDatabase database;

/**
* Refresh federation meta data.
*
* @param sqlStatementContext SQL statement context
*/
@SuppressWarnings("unchecked")
public void refresh(final SQLStatementContext sqlStatementContext) {
Class<?> sqlStatementClass = sqlStatementContext.getSqlStatement().getClass().getSuperclass();
TypedSPILoader.findService(FederationMetaDataRefresher.class, sqlStatementClass).ifPresent(
optional -> optional.refresh(metaDataManagerPersistService, database, getSchemaName(sqlStatementContext), sqlStatementContext.getSqlStatement()));
}

private String getSchemaName(final SQLStatementContext sqlStatementContext) {
return ((TableAvailable) sqlStatementContext).getTablesContext().getSchemaName()
.orElseGet(() -> new DatabaseTypeRegistry(sqlStatementContext.getDatabaseType()).getDefaultSchemaName(database.getName())).toLowerCase();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.refresher.metadata.MetaDataRefreshEngine;
import org.apache.shardingsphere.mode.metadata.refresher.metadata.federation.FederationMetaDataRefreshEngine;
import org.apache.shardingsphere.proxy.backend.connector.jdbc.executor.callback.ProxyJDBCExecutorCallback;
import org.apache.shardingsphere.proxy.backend.connector.jdbc.executor.callback.ProxyJDBCExecutorCallbackFactory;
import org.apache.shardingsphere.proxy.backend.connector.jdbc.statement.JDBCBackendStatement;
Expand Down Expand Up @@ -112,6 +113,8 @@ public final class StandardDatabaseConnector implements DatabaseConnector {

private final MetaDataRefreshEngine metaDataRefreshEngine;

private final FederationMetaDataRefreshEngine federationMetaDataRefreshEngine;

private final Collection<Statement> cachedStatements = Collections.newSetFromMap(new ConcurrentHashMap<>());

private final Collection<ResultSet> cachedResultSets = Collections.newSetFromMap(new ConcurrentHashMap<>());
Expand All @@ -135,6 +138,7 @@ public StandardDatabaseConnector(final String driverType, final QueryContext que
proxySQLExecutor = new ProxySQLExecutor(driverType, databaseConnectionManager, this, queryContext);
metaDataRefreshEngine = new MetaDataRefreshEngine(
contextManager.getPersistServiceFacade().getMetaDataManagerPersistService(), database, contextManager.getMetaDataContexts().getMetaData().getProps());
federationMetaDataRefreshEngine = new FederationMetaDataRefreshEngine(contextManager.getPersistServiceFacade().getMetaDataManagerPersistService(), database);
}

private void checkBackendReady(final SQLStatementContext sqlStatementContext) {
Expand Down Expand Up @@ -184,7 +188,7 @@ public ResponseHeader execute() throws SQLException {
return processExecuteFederation(doExecuteFederation());
}
if (proxySQLExecutor.getSqlFederationEngine().enabled()) {
metaDataRefreshEngine.refreshFederation(queryContext.getSqlStatementContext());
federationMetaDataRefreshEngine.refresh(queryContext.getSqlStatementContext());
return new UpdateResponseHeader(queryContext.getSqlStatementContext().getSqlStatement());
}
ExecutionContext executionContext = generateExecutionContext();
Expand Down
Loading