From c4aa84532311606536a6fc2745e1371b5ce6ab57 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Wed, 1 Jan 2025 22:21:07 +0800 Subject: [PATCH] Use MetaDataChangedHandler to instead of MetaDataChangedSubscriber --- ...ShardingSphereStatisticsRefreshEngine.java | 5 +- .../database/MetaDataChangedHandler.java | 145 ++++-------------- .../database/SchemaChangedHandler.java | 58 +++++++ .../database/StorageNodeChangedHandler.java | 63 ++++++++ .../database/StorageUnitChangedHandler.java | 76 +++++++++ .../handler/database/TableChangedHandler.java | 70 +++++++++ .../handler/database/ViewChangedHandler.java | 70 +++++++++ 7 files changed, 374 insertions(+), 113 deletions(-) create mode 100644 mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/SchemaChangedHandler.java create mode 100644 mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/StorageNodeChangedHandler.java create mode 100644 mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/StorageUnitChangedHandler.java create mode 100644 mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/TableChangedHandler.java create mode 100644 mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/ViewChangedHandler.java diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngine.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngine.java index 073be7e5a019b..bfad2f96fc728 100644 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngine.java +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/ShardingSphereStatisticsRefreshEngine.java @@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey; import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder; +import org.apache.shardingsphere.infra.instance.metadata.InstanceType; import org.apache.shardingsphere.infra.lock.LockContext; import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; @@ -68,7 +69,9 @@ public ShardingSphereStatisticsRefreshEngine(final ContextManager contextManager * Async refresh. */ public void asyncRefresh() { - EXECUTOR_SERVICE.execute(this::refresh); + if (InstanceType.PROXY == contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getType()) { + EXECUTOR_SERVICE.execute(this::refresh); + } } /** diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/MetaDataChangedHandler.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/MetaDataChangedHandler.java index 26fe2103cb2b9..45b3ae2c54a66 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/MetaDataChangedHandler.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/MetaDataChangedHandler.java @@ -17,12 +17,6 @@ package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.database; -import com.google.common.base.Preconditions; -import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties; -import org.apache.shardingsphere.infra.instance.metadata.InstanceType; -import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable; -import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereView; import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode; import org.apache.shardingsphere.metadata.persist.node.metadata.DataSourceMetaDataNode; import org.apache.shardingsphere.metadata.persist.node.metadata.TableMetaDataNode; @@ -30,18 +24,31 @@ import org.apache.shardingsphere.mode.event.DataChangedEvent; import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; import org.apache.shardingsphere.mode.manager.ContextManager; -import org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine; -import java.util.Collections; import java.util.Optional; /** * Meta data changed handler. */ -@RequiredArgsConstructor public final class MetaDataChangedHandler { - private final ContextManager contextManager; + private final SchemaChangedHandler schemaChangedHandler; + + private final TableChangedHandler tableChangedHandler; + + private final ViewChangedHandler viewChangedHandler; + + private final StorageUnitChangedHandler storageUnitChangedHandler; + + private final StorageNodeChangedHandler storageNodeChangedHandler; + + public MetaDataChangedHandler(final ContextManager contextManager) { + schemaChangedHandler = new SchemaChangedHandler(contextManager); + tableChangedHandler = new TableChangedHandler(contextManager); + viewChangedHandler = new ViewChangedHandler(contextManager); + storageUnitChangedHandler = new StorageUnitChangedHandler(contextManager); + storageNodeChangedHandler = new StorageNodeChangedHandler(contextManager); + } /** * Handle meta data changed. @@ -54,7 +61,7 @@ public boolean handle(final String databaseName, final DataChangedEvent event) { String eventKey = event.getKey(); Optional schemaName = DatabaseMetaDataNode.getSchemaName(eventKey); if (schemaName.isPresent()) { - handleSchemaChanged(databaseName, schemaName.get(), event.getType()); + handleSchemaChanged(databaseName, schemaName.get(), event); return true; } schemaName = DatabaseMetaDataNode.getSchemaNameByTableNode(eventKey); @@ -73,85 +80,38 @@ public boolean handle(final String databaseName, final DataChangedEvent event) { return false; } - private void handleSchemaChanged(final String databaseName, final String schemaName, final Type type) { - switch (type) { - case ADDED: - case UPDATED: - handleSchemaCreated(databaseName, schemaName); - return; - case DELETED: - handleSchemaDropped(databaseName, schemaName); - return; - default: + private void handleSchemaChanged(final String databaseName, final String schemaName, final DataChangedEvent event) { + if (Type.ADDED == event.getType() || Type.UPDATED == event.getType()) { + schemaChangedHandler.handleCreated(databaseName, schemaName); + } else if (Type.DELETED == event.getType()) { + schemaChangedHandler.handleDropped(databaseName, schemaName); } } - private void handleSchemaCreated(final String databaseName, final String schemaName) { - contextManager.getMetaDataContextManager().getSchemaMetaDataManager().addSchema(databaseName, schemaName); - refreshStatisticsData(); - } - - private void handleSchemaDropped(final String databaseName, final String schemaName) { - contextManager.getMetaDataContextManager().getSchemaMetaDataManager().dropSchema(databaseName, schemaName); - refreshStatisticsData(); - } - private boolean isTableMetaDataChanged(final String key) { return TableMetaDataNode.isTableActiveVersionNode(key) || TableMetaDataNode.isTableNode(key); } private void handleTableChanged(final String databaseName, final String schemaName, final DataChangedEvent event) { if ((Type.ADDED == event.getType() || Type.UPDATED == event.getType()) && TableMetaDataNode.isTableActiveVersionNode(event.getKey())) { - handleTableCreatedOrAltered(databaseName, schemaName, event); + tableChangedHandler.handleCreatedOrAltered(databaseName, schemaName, event); } else if (Type.DELETED == event.getType() && TableMetaDataNode.isTableNode(event.getKey())) { - handleTableDropped(databaseName, schemaName, event); + tableChangedHandler.handleDropped(databaseName, schemaName, event); } } - private void handleTableCreatedOrAltered(final String databaseName, final String schemaName, final DataChangedEvent event) { - String tableName = TableMetaDataNode.getTableNameByActiveVersionNode(event.getKey()).orElseThrow(() -> new IllegalStateException("Table name not found.")); - Preconditions.checkArgument(event.getValue().equals( - contextManager.getPersistServiceFacade().getMetaDataPersistService().getMetaDataVersionPersistService().getActiveVersionByFullPath(event.getKey())), - "Invalid active version: %s of key: %s", event.getValue(), event.getKey()); - ShardingSphereTable table = contextManager.getPersistServiceFacade().getMetaDataPersistService().getDatabaseMetaDataFacade().getTable().load(databaseName, schemaName, tableName); - contextManager.getMetaDataContextManager().getSchemaMetaDataManager().alterSchema(databaseName, schemaName, table, null); - refreshStatisticsData(); - } - - private void handleTableDropped(final String databaseName, final String schemaName, final DataChangedEvent event) { - String tableName = TableMetaDataNode.getTableName(event.getKey()).orElseThrow(() -> new IllegalStateException("Table name not found.")); - contextManager.getMetaDataContextManager().getSchemaMetaDataManager().alterSchema(databaseName, schemaName, tableName, null); - refreshStatisticsData(); - } - private boolean isViewMetaDataChanged(final String key) { return ViewMetaDataNode.isViewActiveVersionNode(key) || ViewMetaDataNode.isViewNode(key); } private void handleViewChanged(final String databaseName, final String schemaName, final DataChangedEvent event) { if ((Type.ADDED == event.getType() || Type.UPDATED == event.getType()) && ViewMetaDataNode.isViewActiveVersionNode(event.getKey())) { - handleViewCreatedOrAltered(databaseName, schemaName, event); + viewChangedHandler.handleCreatedOrAltered(databaseName, schemaName, event); } else if (Type.DELETED == event.getType() && ViewMetaDataNode.isViewNode(event.getKey())) { - handleViewDropped(databaseName, schemaName, event); + viewChangedHandler.handleDropped(databaseName, schemaName, event); } } - private void handleViewCreatedOrAltered(final String databaseName, final String schemaName, final DataChangedEvent event) { - String viewName = ViewMetaDataNode.getViewNameByActiveVersionNode(event.getKey()).orElseThrow(() -> new IllegalStateException("View name not found.")); - Preconditions.checkArgument(event.getValue().equals( - contextManager.getPersistServiceFacade().getMetaDataPersistService().getMetaDataVersionPersistService().getActiveVersionByFullPath(event.getKey())), - "Invalid active version: %s of key: %s", event.getValue(), event.getKey()); - ShardingSphereView view = contextManager.getPersistServiceFacade().getMetaDataPersistService().getDatabaseMetaDataFacade().getView().load(databaseName, schemaName, viewName); - contextManager.getMetaDataContextManager().getSchemaMetaDataManager().alterSchema(databaseName, schemaName, null, view); - refreshStatisticsData(); - } - - private void handleViewDropped(final String databaseName, final String schemaName, final DataChangedEvent event) { - String viewName = ViewMetaDataNode.getViewName(event.getKey()).orElseThrow(() -> new IllegalStateException("View name not found.")); - contextManager.getMetaDataContextManager().getSchemaMetaDataManager().alterSchema(databaseName, schemaName, null, viewName); - refreshStatisticsData(); - } - private void handleDataSourceChanged(final String databaseName, final DataChangedEvent event) { if (DataSourceMetaDataNode.isDataSourceUnitActiveVersionNode(event.getKey()) || DataSourceMetaDataNode.isDataSourceUnitNode(event.getKey())) { handleStorageUnitChanged(databaseName, event); @@ -164,70 +124,31 @@ private void handleStorageUnitChanged(final String databaseName, final DataChang Optional dataSourceUnitName = DataSourceMetaDataNode.getDataSourceNameByDataSourceUnitActiveVersionNode(event.getKey()); if (dataSourceUnitName.isPresent()) { if (Type.ADDED == event.getType()) { - handleStorageUnitRegistered(databaseName, dataSourceUnitName.get(), event); + storageUnitChangedHandler.handleRegistered(databaseName, dataSourceUnitName.get(), event); } else if (Type.UPDATED == event.getType()) { - handleStorageUnitAltered(databaseName, dataSourceUnitName.get(), event); + storageUnitChangedHandler.handleAltered(databaseName, dataSourceUnitName.get(), event); } return; } dataSourceUnitName = DataSourceMetaDataNode.getDataSourceNameByDataSourceUnitNode(event.getKey()); if (Type.DELETED == event.getType() && dataSourceUnitName.isPresent()) { - handleStorageUnitUnregistered(databaseName, dataSourceUnitName.get()); + storageUnitChangedHandler.handleUnregistered(databaseName, dataSourceUnitName.get()); } } - private void handleStorageUnitRegistered(final String databaseName, final String dataSourceUnitName, final DataChangedEvent event) { - Preconditions.checkArgument(event.getValue().equals( - contextManager.getPersistServiceFacade().getMetaDataPersistService().getMetaDataVersionPersistService().getActiveVersionByFullPath(event.getKey())), - "Invalid active version: %s of key: %s", event.getValue(), event.getKey()); - DataSourcePoolProperties dataSourcePoolProps = contextManager.getPersistServiceFacade().getMetaDataPersistService().getDataSourceUnitService().load(databaseName, dataSourceUnitName); - contextManager.getMetaDataContextManager().getStorageUnitManager().registerStorageUnit(databaseName, Collections.singletonMap(dataSourceUnitName, dataSourcePoolProps)); - } - - private void handleStorageUnitAltered(final String databaseName, final String dataSourceUnitName, final DataChangedEvent event) { - Preconditions.checkArgument(event.getValue().equals( - contextManager.getPersistServiceFacade().getMetaDataPersistService().getMetaDataVersionPersistService().getActiveVersionByFullPath(event.getKey())), - "Invalid active version: %s of key: %s", event.getValue(), event.getKey()); - DataSourcePoolProperties dataSourcePoolProps = contextManager.getPersistServiceFacade().getMetaDataPersistService().getDataSourceUnitService().load(databaseName, dataSourceUnitName); - contextManager.getMetaDataContextManager().getStorageUnitManager().alterStorageUnit(databaseName, Collections.singletonMap(dataSourceUnitName, dataSourcePoolProps)); - } - - private void handleStorageUnitUnregistered(final String databaseName, final String dataSourceUnitName) { - Preconditions.checkState(contextManager.getMetaDataContexts().getMetaData().containsDatabase(databaseName), "No database '%s' exists.", databaseName); - contextManager.getMetaDataContextManager().getStorageUnitManager().unregisterStorageUnit(databaseName, dataSourceUnitName); - } - private void handleStorageNodeChanged(final String databaseName, final DataChangedEvent event) { Optional dataSourceNodeName = DataSourceMetaDataNode.getDataSourceNameByDataSourceNodeActiveVersionNode(event.getKey()); if (dataSourceNodeName.isPresent()) { if (Type.ADDED == event.getType()) { - handleStorageNodeRegistered(databaseName, dataSourceNodeName.get(), event); + storageNodeChangedHandler.handleRegistered(databaseName, dataSourceNodeName.get(), event); } else if (Type.UPDATED == event.getType()) { - handleStorageNodeAltered(databaseName, dataSourceNodeName.get(), event); + storageNodeChangedHandler.handleAltered(databaseName, dataSourceNodeName.get(), event); } return; } dataSourceNodeName = DataSourceMetaDataNode.getDataSourceNameByDataSourceNodeNode(event.getKey()); if (Type.DELETED == event.getType() && dataSourceNodeName.isPresent()) { - handleStorageNodeUnregistered(databaseName, dataSourceNodeName.get()); - } - } - - private void handleStorageNodeRegistered(final String databaseName, final String dataSourceNodeName, final DataChangedEvent event) { - // TODO - } - - private void handleStorageNodeAltered(final String databaseName, final String dataSourceNodeName, final DataChangedEvent event) { - // TODO - } - - private void handleStorageNodeUnregistered(final String databaseName, final String dataSourceNodeName) { - // TODO - } - - private void refreshStatisticsData() { - if (InstanceType.PROXY == contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getType()) { - new ShardingSphereStatisticsRefreshEngine(contextManager).asyncRefresh(); + storageNodeChangedHandler.handleUnregistered(databaseName, dataSourceNodeName.get()); } } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/SchemaChangedHandler.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/SchemaChangedHandler.java new file mode 100644 index 0000000000000..da82f3d46661a --- /dev/null +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/SchemaChangedHandler.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.database; + +import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine; + +/** + * Schema changed handler. + */ +public final class SchemaChangedHandler { + + private final ContextManager contextManager; + + private final ShardingSphereStatisticsRefreshEngine statisticsRefreshEngine; + + public SchemaChangedHandler(final ContextManager contextManager) { + this.contextManager = contextManager; + statisticsRefreshEngine = new ShardingSphereStatisticsRefreshEngine(contextManager); + } + + /** + * Handle schema created. + * + * @param databaseName database name + * @param schemaName schema name + */ + public void handleCreated(final String databaseName, final String schemaName) { + contextManager.getMetaDataContextManager().getSchemaMetaDataManager().addSchema(databaseName, schemaName); + statisticsRefreshEngine.asyncRefresh(); + } + + /** + * Handle schema dropped. + * + * @param databaseName database name + * @param schemaName schema name + */ + public void handleDropped(final String databaseName, final String schemaName) { + contextManager.getMetaDataContextManager().getSchemaMetaDataManager().dropSchema(databaseName, schemaName); + statisticsRefreshEngine.asyncRefresh(); + } +} diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/StorageNodeChangedHandler.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/StorageNodeChangedHandler.java new file mode 100644 index 0000000000000..081180797376c --- /dev/null +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/StorageNodeChangedHandler.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.database; + +import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.mode.event.DataChangedEvent; +import org.apache.shardingsphere.mode.manager.ContextManager; + +/** + * Storage node changed handler. + */ +@RequiredArgsConstructor +public final class StorageNodeChangedHandler { + + private final ContextManager contextManager; + + /** + * Handle storage node registered. + * + * @param databaseName database name + * @param dataSourceUnitName data source unit name + * @param event data changed event + */ + public void handleRegistered(final String databaseName, final String dataSourceUnitName, final DataChangedEvent event) { + // TODO + } + + /** + * Handle storage node altered. + * + * @param databaseName database name + * @param dataSourceUnitName data source unit name + * @param event data changed event + */ + public void handleAltered(final String databaseName, final String dataSourceUnitName, final DataChangedEvent event) { + // TODO + } + + /** + * Handle storage node unregistered. + * + * @param databaseName database name + * @param dataSourceUnitName data source unit name + */ + public void handleUnregistered(final String databaseName, final String dataSourceUnitName) { + // TODO + } +} diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/StorageUnitChangedHandler.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/StorageUnitChangedHandler.java new file mode 100644 index 0000000000000..db18ddeca3bee --- /dev/null +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/StorageUnitChangedHandler.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.database; + +import com.google.common.base.Preconditions; +import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties; +import org.apache.shardingsphere.mode.event.DataChangedEvent; +import org.apache.shardingsphere.mode.manager.ContextManager; + +import java.util.Collections; + +/** + * Storage unit changed handler. + */ +@RequiredArgsConstructor +public final class StorageUnitChangedHandler { + + private final ContextManager contextManager; + + /** + * Handle storage unit registered. + * + * @param databaseName database name + * @param dataSourceUnitName data source unit name + * @param event data changed event + */ + public void handleRegistered(final String databaseName, final String dataSourceUnitName, final DataChangedEvent event) { + Preconditions.checkArgument(event.getValue().equals( + contextManager.getPersistServiceFacade().getMetaDataPersistService().getMetaDataVersionPersistService().getActiveVersionByFullPath(event.getKey())), + "Invalid active version: %s of key: %s", event.getValue(), event.getKey()); + DataSourcePoolProperties dataSourcePoolProps = contextManager.getPersistServiceFacade().getMetaDataPersistService().getDataSourceUnitService().load(databaseName, dataSourceUnitName); + contextManager.getMetaDataContextManager().getStorageUnitManager().registerStorageUnit(databaseName, Collections.singletonMap(dataSourceUnitName, dataSourcePoolProps)); + } + + /** + * Handle storage unit altered. + * + * @param databaseName database name + * @param dataSourceUnitName data source unit name + * @param event data changed event + */ + public void handleAltered(final String databaseName, final String dataSourceUnitName, final DataChangedEvent event) { + Preconditions.checkArgument(event.getValue().equals( + contextManager.getPersistServiceFacade().getMetaDataPersistService().getMetaDataVersionPersistService().getActiveVersionByFullPath(event.getKey())), + "Invalid active version: %s of key: %s", event.getValue(), event.getKey()); + DataSourcePoolProperties dataSourcePoolProps = contextManager.getPersistServiceFacade().getMetaDataPersistService().getDataSourceUnitService().load(databaseName, dataSourceUnitName); + contextManager.getMetaDataContextManager().getStorageUnitManager().alterStorageUnit(databaseName, Collections.singletonMap(dataSourceUnitName, dataSourcePoolProps)); + } + + /** + * Handle storage unit unregistered. + * + * @param databaseName database name + * @param dataSourceUnitName data source unit name + */ + public void handleUnregistered(final String databaseName, final String dataSourceUnitName) { + Preconditions.checkState(contextManager.getMetaDataContexts().getMetaData().containsDatabase(databaseName), "No database '%s' exists.", databaseName); + contextManager.getMetaDataContextManager().getStorageUnitManager().unregisterStorageUnit(databaseName, dataSourceUnitName); + } +} diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/TableChangedHandler.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/TableChangedHandler.java new file mode 100644 index 0000000000000..d92d3befaf6bb --- /dev/null +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/TableChangedHandler.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.database; + +import com.google.common.base.Preconditions; +import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable; +import org.apache.shardingsphere.metadata.persist.node.metadata.TableMetaDataNode; +import org.apache.shardingsphere.mode.event.DataChangedEvent; +import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine; + +/** + * Table changed handler. + */ +public final class TableChangedHandler { + + private final ContextManager contextManager; + + private final ShardingSphereStatisticsRefreshEngine statisticsRefreshEngine; + + public TableChangedHandler(final ContextManager contextManager) { + this.contextManager = contextManager; + statisticsRefreshEngine = new ShardingSphereStatisticsRefreshEngine(contextManager); + } + + /** + * Handle table created or altered. + * + * @param databaseName database name + * @param schemaName schema name + * @param event data changed event + */ + public void handleCreatedOrAltered(final String databaseName, final String schemaName, final DataChangedEvent event) { + String tableName = TableMetaDataNode.getTableNameByActiveVersionNode(event.getKey()).orElseThrow(() -> new IllegalStateException("Table name not found.")); + Preconditions.checkArgument(event.getValue().equals( + contextManager.getPersistServiceFacade().getMetaDataPersistService().getMetaDataVersionPersistService().getActiveVersionByFullPath(event.getKey())), + "Invalid active version: %s of key: %s", event.getValue(), event.getKey()); + ShardingSphereTable table = contextManager.getPersistServiceFacade().getMetaDataPersistService().getDatabaseMetaDataFacade().getTable().load(databaseName, schemaName, tableName); + contextManager.getMetaDataContextManager().getSchemaMetaDataManager().alterSchema(databaseName, schemaName, table, null); + statisticsRefreshEngine.asyncRefresh(); + } + + /** + * Handle table altered. + * + * @param databaseName database name + * @param schemaName schema name + * @param event data changed event + */ + public void handleDropped(final String databaseName, final String schemaName, final DataChangedEvent event) { + String tableName = TableMetaDataNode.getTableName(event.getKey()).orElseThrow(() -> new IllegalStateException("Table name not found.")); + contextManager.getMetaDataContextManager().getSchemaMetaDataManager().alterSchema(databaseName, schemaName, tableName, null); + statisticsRefreshEngine.asyncRefresh(); + } +} diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/ViewChangedHandler.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/ViewChangedHandler.java new file mode 100644 index 0000000000000..d7dbe67ecd1f8 --- /dev/null +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/handler/database/ViewChangedHandler.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.manager.cluster.event.dispatch.handler.database; + +import com.google.common.base.Preconditions; +import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereView; +import org.apache.shardingsphere.metadata.persist.node.metadata.ViewMetaDataNode; +import org.apache.shardingsphere.mode.event.DataChangedEvent; +import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine; + +/** + * View changed handler. + */ +public final class ViewChangedHandler { + + private final ContextManager contextManager; + + private final ShardingSphereStatisticsRefreshEngine statisticsRefreshEngine; + + public ViewChangedHandler(final ContextManager contextManager) { + this.contextManager = contextManager; + statisticsRefreshEngine = new ShardingSphereStatisticsRefreshEngine(contextManager); + } + + /** + * Handle view created or altered. + * + * @param databaseName database name + * @param schemaName schema name + * @param event data changed event + */ + public void handleCreatedOrAltered(final String databaseName, final String schemaName, final DataChangedEvent event) { + String viewName = ViewMetaDataNode.getViewNameByActiveVersionNode(event.getKey()).orElseThrow(() -> new IllegalStateException("View name not found.")); + Preconditions.checkArgument(event.getValue().equals( + contextManager.getPersistServiceFacade().getMetaDataPersistService().getMetaDataVersionPersistService().getActiveVersionByFullPath(event.getKey())), + "Invalid active version: %s of key: %s", event.getValue(), event.getKey()); + ShardingSphereView view = contextManager.getPersistServiceFacade().getMetaDataPersistService().getDatabaseMetaDataFacade().getView().load(databaseName, schemaName, viewName); + contextManager.getMetaDataContextManager().getSchemaMetaDataManager().alterSchema(databaseName, schemaName, null, view); + new ShardingSphereStatisticsRefreshEngine(contextManager).asyncRefresh(); + } + + /** + * Handle view dropped. + * + * @param databaseName database name + * @param schemaName schema name + * @param event data changed event + */ + public void handleDropped(final String databaseName, final String schemaName, final DataChangedEvent event) { + String viewName = ViewMetaDataNode.getViewName(event.getKey()).orElseThrow(() -> new IllegalStateException("View name not found.")); + contextManager.getMetaDataContextManager().getSchemaMetaDataManager().alterSchema(databaseName, schemaName, null, viewName); + new ShardingSphereStatisticsRefreshEngine(contextManager).asyncRefresh(); + } +}