diff --git a/core/src/main/java/org/apache/gravitino/authorization/AccessControlManager.java b/core/src/main/java/org/apache/gravitino/authorization/AccessControlManager.java index 798285806f5..3c237f7b545 100644 --- a/core/src/main/java/org/apache/gravitino/authorization/AccessControlManager.java +++ b/core/src/main/java/org/apache/gravitino/authorization/AccessControlManager.java @@ -24,6 +24,7 @@ import org.apache.gravitino.Configs; import org.apache.gravitino.EntityStore; import org.apache.gravitino.MetadataObject; +import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.exceptions.GroupAlreadyExistsException; import org.apache.gravitino.exceptions.IllegalRoleException; import org.apache.gravitino.exceptions.NoSuchGroupException; @@ -33,7 +34,10 @@ import org.apache.gravitino.exceptions.NoSuchUserException; import org.apache.gravitino.exceptions.RoleAlreadyExistsException; import org.apache.gravitino.exceptions.UserAlreadyExistsException; +import org.apache.gravitino.lock.LockType; +import org.apache.gravitino.lock.TreeLockUtils; import org.apache.gravitino.storage.IdGenerator; +import org.apache.gravitino.utils.MetadataObjectUtil; /** * AccessControlManager is used for manage users, roles, grant information, this class is an @@ -56,44 +60,68 @@ public AccessControlManager(EntityStore store, IdGenerator idGenerator, Config c @Override public User addUser(String metalake, String user) throws UserAlreadyExistsException, NoSuchMetalakeException { - return userGroupManager.addUser(metalake, user); + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(AuthorizationUtils.ofGroupNamespace(metalake).levels()), + LockType.WRITE, + () -> userGroupManager.addUser(metalake, user)); } @Override public boolean removeUser(String metalake, String user) throws NoSuchMetalakeException { - return userGroupManager.removeUser(metalake, user); + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(AuthorizationUtils.ofGroupNamespace(metalake).levels()), + LockType.WRITE, + () -> userGroupManager.removeUser(metalake, user)); } @Override public User getUser(String metalake, String user) throws NoSuchUserException, NoSuchMetalakeException { - return userGroupManager.getUser(metalake, user); + return TreeLockUtils.doWithTreeLock( + AuthorizationUtils.ofGroup(metalake, user), + LockType.READ, + () -> userGroupManager.getUser(metalake, user)); } @Override public String[] listUserNames(String metalake) throws NoSuchMetalakeException { - return userGroupManager.listUserNames(metalake); + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(AuthorizationUtils.ofUserNamespace(metalake).levels()), + LockType.READ, + () -> userGroupManager.listUserNames(metalake)); } @Override public User[] listUsers(String metalake) throws NoSuchMetalakeException { - return userGroupManager.listUsers(metalake); + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(AuthorizationUtils.ofUserNamespace(metalake).levels()), + LockType.READ, + () -> userGroupManager.listUsers(metalake)); } public Group addGroup(String metalake, String group) throws GroupAlreadyExistsException, NoSuchMetalakeException { - return userGroupManager.addGroup(metalake, group); + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(AuthorizationUtils.ofGroupNamespace(metalake).levels()), + LockType.WRITE, + () -> userGroupManager.addGroup(metalake, group)); } @Override public boolean removeGroup(String metalake, String group) throws NoSuchMetalakeException { - return userGroupManager.removeGroup(metalake, group); + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(AuthorizationUtils.ofGroupNamespace(metalake).levels()), + LockType.WRITE, + () -> userGroupManager.removeGroup(metalake, group)); } @Override public Group getGroup(String metalake, String group) throws NoSuchGroupException, NoSuchMetalakeException { - return userGroupManager.getGroup(metalake, group); + return TreeLockUtils.doWithTreeLock( + AuthorizationUtils.ofGroup(metalake, group), + LockType.READ, + () -> userGroupManager.getGroup(metalake, group)); } @Override @@ -109,25 +137,53 @@ public String[] listGroupNames(String metalake) throws NoSuchMetalakeException { @Override public User grantRolesToUser(String metalake, List roles, String user) throws NoSuchUserException, IllegalRoleException, NoSuchMetalakeException { - return permissionManager.grantRolesToUser(metalake, roles, user); + return TreeLockUtils.doWithTreeLock( + AuthorizationUtils.ofUser(metalake, user), + LockType.WRITE, + () -> + TreeLockUtils.doWithTreeLock( + NameIdentifier.of(AuthorizationUtils.ofRoleNamespace(metalake).levels()), + LockType.READ, + () -> permissionManager.grantRolesToUser(metalake, roles, user))); } @Override public Group grantRolesToGroup(String metalake, List roles, String group) throws NoSuchGroupException, IllegalRoleException, NoSuchMetalakeException { - return permissionManager.grantRolesToGroup(metalake, roles, group); + return TreeLockUtils.doWithTreeLock( + AuthorizationUtils.ofGroup(metalake, group), + LockType.WRITE, + () -> + TreeLockUtils.doWithTreeLock( + NameIdentifier.of(AuthorizationUtils.ofRoleNamespace(metalake).levels()), + LockType.READ, + () -> permissionManager.grantRolesToGroup(metalake, roles, group))); } @Override public Group revokeRolesFromGroup(String metalake, List roles, String group) throws NoSuchGroupException, IllegalRoleException, NoSuchMetalakeException { - return permissionManager.revokeRolesFromGroup(metalake, roles, group); + return TreeLockUtils.doWithTreeLock( + AuthorizationUtils.ofGroup(metalake, group), + LockType.WRITE, + () -> + TreeLockUtils.doWithTreeLock( + NameIdentifier.of(AuthorizationUtils.ofRoleNamespace(metalake).levels()), + LockType.READ, + () -> permissionManager.revokeRolesFromGroup(metalake, roles, group))); } @Override public User revokeRolesFromUser(String metalake, List roles, String user) throws NoSuchUserException, IllegalRoleException, NoSuchMetalakeException { - return permissionManager.revokeRolesFromUser(metalake, roles, user); + return TreeLockUtils.doWithTreeLock( + AuthorizationUtils.ofUser(metalake, user), + LockType.WRITE, + () -> + TreeLockUtils.doWithTreeLock( + NameIdentifier.of(AuthorizationUtils.ofRoleNamespace(metalake).levels()), + LockType.READ, + () -> permissionManager.revokeRolesFromUser(metalake, roles, user))); } @Override @@ -142,42 +198,60 @@ public Role createRole( Map properties, List securableObjects) throws RoleAlreadyExistsException, NoSuchMetalakeException { - return roleManager.createRole(metalake, role, properties, securableObjects); + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(AuthorizationUtils.ofRoleNamespace(metalake).levels()), + LockType.WRITE, + () -> roleManager.createRole(metalake, role, properties, securableObjects)); } @Override public Role getRole(String metalake, String role) throws NoSuchRoleException, NoSuchMetalakeException { - return roleManager.getRole(metalake, role); + return TreeLockUtils.doWithTreeLock( + AuthorizationUtils.ofRole(metalake, role), + LockType.READ, + () -> roleManager.getRole(metalake, role)); } @Override public boolean deleteRole(String metalake, String role) throws NoSuchMetalakeException { - return roleManager.deleteRole(metalake, role); + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(AuthorizationUtils.ofRoleNamespace(metalake).levels()), + LockType.WRITE, + () -> roleManager.deleteRole(metalake, role)); } @Override public String[] listRoleNames(String metalake) throws NoSuchMetalakeException { - return roleManager.listRoleNames(metalake); + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(metalake), LockType.READ, () -> roleManager.listRoleNames(metalake)); } @Override public String[] listRoleNamesByObject(String metalake, MetadataObject object) throws NoSuchMetalakeException, NoSuchMetadataObjectException { - return roleManager.listRoleNamesByObject(metalake, object); + NameIdentifier identifier = MetadataObjectUtil.toEntityIdent(metalake, object); + return TreeLockUtils.doWithTreeLock( + identifier, LockType.READ, () -> roleManager.listRoleNamesByObject(metalake, object)); } @Override public Role grantPrivilegeToRole( String metalake, String role, MetadataObject object, List privileges) throws NoSuchRoleException, NoSuchMetalakeException { - return permissionManager.grantPrivilegesToRole(metalake, role, object, privileges); + return TreeLockUtils.doWithTreeLock( + AuthorizationUtils.ofRole(metalake, role), + LockType.WRITE, + () -> permissionManager.grantPrivilegesToRole(metalake, role, object, privileges)); } @Override public Role revokePrivilegesFromRole( String metalake, String role, MetadataObject object, List privileges) throws NoSuchRoleException, NoSuchMetalakeException { - return permissionManager.revokePrivilegesFromRole(metalake, role, object, privileges); + return TreeLockUtils.doWithTreeLock( + AuthorizationUtils.ofRole(metalake, role), + LockType.WRITE, + () -> permissionManager.revokePrivilegesFromRole(metalake, role, object, privileges)); } } diff --git a/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java b/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java index 04285954e97..23ab582988a 100644 --- a/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java +++ b/core/src/main/java/org/apache/gravitino/authorization/OwnerManager.java @@ -67,119 +67,140 @@ public OwnerManager(EntityStore store) { public void setOwner( String metalake, MetadataObject metadataObject, String ownerName, Owner.Type ownerType) { - try { - Optional originOwner = getOwner(metalake, metadataObject); - - NameIdentifier objectIdent = MetadataObjectUtil.toEntityIdent(metalake, metadataObject); - OwnerImpl newOwner = new OwnerImpl(); - - if (ownerType == Owner.Type.USER) { - NameIdentifier ownerIdent = AuthorizationUtils.ofUser(metalake, ownerName); - TreeLockUtils.doWithTreeLock( - ownerIdent, - LockType.READ, - () -> { - store - .relationOperations() - .insertRelation( - SupportsRelationOperations.Type.OWNER_REL, - objectIdent, - MetadataObjectUtil.toEntityType(metadataObject), - ownerIdent, - Entity.EntityType.USER, - true); - return null; - }); - - newOwner.name = ownerName; - newOwner.type = Owner.Type.USER; - } else if (ownerType == Owner.Type.GROUP) { - NameIdentifier ownerIdent = AuthorizationUtils.ofGroup(metalake, ownerName); - TreeLockUtils.doWithTreeLock( - ownerIdent, - LockType.READ, - () -> { - store - .relationOperations() - .insertRelation( - SupportsRelationOperations.Type.OWNER_REL, - objectIdent, - MetadataObjectUtil.toEntityType(metadataObject), - ownerIdent, - Entity.EntityType.GROUP, - true); - return null; - }); - - newOwner.name = ownerName; - newOwner.type = Owner.Type.GROUP; - } - - AuthorizationUtils.callAuthorizationPluginForMetadataObject( - metalake, - metadataObject, - authorizationPlugin -> - authorizationPlugin.onOwnerSet(metadataObject, originOwner.orElse(null), newOwner)); - } catch (NoSuchEntityException nse) { - LOG.warn( - "Metadata object {} or owner {} is not found", metadataObject.fullName(), ownerName, nse); - throw new NotFoundException( - nse, "Metadata object %s or owner %s is not found", metadataObject.fullName(), ownerName); - } catch (IOException ioe) { - LOG.info( - "Fail to set the owner {} of metadata object {}", - ownerName, - metadataObject.fullName(), - ioe); - throw new RuntimeException(ioe); - } + + NameIdentifier objectIdent = MetadataObjectUtil.toEntityIdent(metalake, metadataObject); + TreeLockUtils.doWithTreeLock( + objectIdent, + LockType.READ, + () -> { + try { + Optional originOwner = getOwner(metalake, metadataObject); + + OwnerImpl newOwner = new OwnerImpl(); + if (ownerType == Owner.Type.USER) { + NameIdentifier ownerIdent = AuthorizationUtils.ofUser(metalake, ownerName); + TreeLockUtils.doWithTreeLock( + ownerIdent, + LockType.READ, + () -> { + store + .relationOperations() + .insertRelation( + SupportsRelationOperations.Type.OWNER_REL, + objectIdent, + MetadataObjectUtil.toEntityType(metadataObject), + ownerIdent, + Entity.EntityType.USER, + true); + return null; + }); + + newOwner.name = ownerName; + newOwner.type = Owner.Type.USER; + } else if (ownerType == Owner.Type.GROUP) { + NameIdentifier ownerIdent = AuthorizationUtils.ofGroup(metalake, ownerName); + TreeLockUtils.doWithTreeLock( + ownerIdent, + LockType.READ, + () -> { + store + .relationOperations() + .insertRelation( + SupportsRelationOperations.Type.OWNER_REL, + objectIdent, + MetadataObjectUtil.toEntityType(metadataObject), + ownerIdent, + Entity.EntityType.GROUP, + true); + return null; + }); + + newOwner.name = ownerName; + newOwner.type = Owner.Type.GROUP; + } + + AuthorizationUtils.callAuthorizationPluginForMetadataObject( + metalake, + metadataObject, + authorizationPlugin -> + authorizationPlugin.onOwnerSet( + metadataObject, originOwner.orElse(null), newOwner)); + } catch (NoSuchEntityException nse) { + LOG.warn( + "Metadata object {} or owner {} is not found", + metadataObject.fullName(), + ownerName, + nse); + throw new NotFoundException( + nse, + "Metadata object %s or owner %s is not found", + metadataObject.fullName(), + ownerName); + } catch (IOException ioe) { + LOG.info( + "Fail to set the owner {} of metadata object {}", + ownerName, + metadataObject.fullName(), + ioe); + throw new RuntimeException(ioe); + } + + return null; + }); } public Optional getOwner(String metalake, MetadataObject metadataObject) { - try { - OwnerImpl owner = new OwnerImpl(); - NameIdentifier ident = MetadataObjectUtil.toEntityIdent(metalake, metadataObject); - List entities = - store - .relationOperations() - .listEntitiesByRelation( - SupportsRelationOperations.Type.OWNER_REL, - ident, - MetadataObjectUtil.toEntityType(metadataObject)); - - if (entities.isEmpty()) { - return Optional.empty(); - } - - if (entities.size() != 1) { - throw new IllegalStateException( - String.format("The number of the owner %s must be 1", metadataObject.fullName())); - } - - Entity entity = entities.get(0); - if (!(entity instanceof UserEntity) && !(entity instanceof GroupEntity)) { - throw new IllegalArgumentException( - String.format( - "Doesn't support owner entity class %s", entities.get(0).getClass().getName())); - } - - if (entities.get(0) instanceof UserEntity) { - UserEntity user = (UserEntity) entities.get(0); - owner.name = user.name(); - owner.type = Owner.Type.USER; - } else if (entities.get(0) instanceof GroupEntity) { - GroupEntity group = (GroupEntity) entities.get(0); - owner.name = group.name(); - owner.type = Owner.Type.GROUP; - } - return Optional.of(owner); - } catch (NoSuchEntityException nse) { - throw new NoSuchMetadataObjectException( - "The metadata object of %s isn't found", metadataObject.fullName()); - } catch (IOException ioe) { - LOG.info("Fail to get the owner of entity {}", metadataObject.fullName(), ioe); - throw new RuntimeException(ioe); - } + NameIdentifier ident = MetadataObjectUtil.toEntityIdent(metalake, metadataObject); + + return TreeLockUtils.doWithTreeLock( + ident, + LockType.READ, + () -> { + try { + OwnerImpl owner = new OwnerImpl(); + List entities = + store + .relationOperations() + .listEntitiesByRelation( + SupportsRelationOperations.Type.OWNER_REL, + ident, + MetadataObjectUtil.toEntityType(metadataObject)); + + if (entities.isEmpty()) { + return Optional.empty(); + } + + if (entities.size() != 1) { + throw new IllegalStateException( + String.format("The number of the owner %s must be 1", metadataObject.fullName())); + } + + Entity entity = entities.get(0); + if (!(entity instanceof UserEntity) && !(entity instanceof GroupEntity)) { + throw new IllegalArgumentException( + String.format( + "Doesn't support owner entity class %s", + entities.get(0).getClass().getName())); + } + + if (entities.get(0) instanceof UserEntity) { + UserEntity user = (UserEntity) entities.get(0); + owner.name = user.name(); + owner.type = Owner.Type.USER; + } else if (entities.get(0) instanceof GroupEntity) { + GroupEntity group = (GroupEntity) entities.get(0); + owner.name = group.name(); + owner.type = Owner.Type.GROUP; + } + return Optional.of(owner); + } catch (NoSuchEntityException nse) { + throw new NoSuchMetadataObjectException( + "The metadata object of %s isn't found", metadataObject.fullName()); + } catch (IOException ioe) { + LOG.info("Fail to get the owner of entity {}", metadataObject.fullName(), ioe); + throw new RuntimeException(ioe); + } + }); } private static class OwnerImpl implements Owner { diff --git a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java index 6f77bb46206..5628153eac0 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java +++ b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java @@ -91,6 +91,8 @@ import org.apache.gravitino.exceptions.NonEmptyCatalogException; import org.apache.gravitino.exceptions.NonEmptyEntityException; import org.apache.gravitino.file.FilesetCatalog; +import org.apache.gravitino.lock.LockType; +import org.apache.gravitino.lock.TreeLockUtils; import org.apache.gravitino.messaging.TopicCatalog; import org.apache.gravitino.meta.AuditInfo; import org.apache.gravitino.meta.CatalogEntity; @@ -293,36 +295,45 @@ public void close() { @Override public NameIdentifier[] listCatalogs(Namespace namespace) throws NoSuchMetalakeException { NameIdentifier metalakeIdent = NameIdentifier.of(namespace.levels()); - checkMetalake(NameIdentifier.of(namespace.level(0)), store); - try { - return store.list(namespace, CatalogEntity.class, EntityType.CATALOG).stream() - .map(entity -> NameIdentifier.of(namespace, entity.name())) - .toArray(NameIdentifier[]::new); - - } catch (IOException ioe) { - LOG.error("Failed to list catalogs in metalake {}", metalakeIdent, ioe); - throw new RuntimeException(ioe); - } + return TreeLockUtils.doWithTreeLock( + metalakeIdent, + LockType.READ, + () -> { + checkMetalake(metalakeIdent, store); + try { + return store.list(namespace, CatalogEntity.class, EntityType.CATALOG).stream() + .map(entity -> NameIdentifier.of(namespace, entity.name())) + .toArray(NameIdentifier[]::new); + + } catch (IOException ioe) { + LOG.error("Failed to list catalogs in metalake {}", metalakeIdent, ioe); + throw new RuntimeException(ioe); + } + }); } @Override public Catalog[] listCatalogsInfo(Namespace namespace) throws NoSuchMetalakeException { NameIdentifier metalakeIdent = NameIdentifier.of(namespace.levels()); - checkMetalake(metalakeIdent, store); - - try { - List catalogEntities = - store.list(namespace, CatalogEntity.class, EntityType.CATALOG); - - return catalogEntities.stream() - .map(e -> e.toCatalogInfoWithResolvedProps(getResolvedProperties(e))) - .toArray(Catalog[]::new); - - } catch (IOException ioe) { - LOG.error("Failed to list catalogs in metalake {}", metalakeIdent, ioe); - throw new RuntimeException(ioe); - } + return TreeLockUtils.doWithTreeLock( + metalakeIdent, + LockType.READ, + () -> { + checkMetalake(metalakeIdent, store); + try { + List catalogEntities = + store.list(namespace, CatalogEntity.class, EntityType.CATALOG); + + return catalogEntities.stream() + .map(e -> e.toCatalogInfoWithResolvedProps(getResolvedProperties(e))) + .toArray(Catalog[]::new); + + } catch (IOException ioe) { + LOG.error("Failed to list catalogs in metalake {}", metalakeIdent, ioe); + throw new RuntimeException(ioe); + } + }); } /** @@ -335,9 +346,14 @@ public Catalog[] listCatalogsInfo(Namespace namespace) throws NoSuchMetalakeExce @Override public Catalog loadCatalog(NameIdentifier ident) throws NoSuchCatalogException { NameIdentifier metalakeIdent = NameIdentifier.of(ident.namespace().levels()); - checkMetalake(metalakeIdent, store); - return loadCatalogAndWrap(ident).catalog; + return TreeLockUtils.doWithTreeLock( + ident, + LockType.READ, + () -> { + checkMetalake(metalakeIdent, store); + return loadCatalogAndWrap(ident).catalog; + }); } /** @@ -361,67 +377,74 @@ public Catalog createCatalog( Map properties) throws NoSuchMetalakeException, CatalogAlreadyExistsException { NameIdentifier metalakeIdent = NameIdentifier.of(ident.namespace().levels()); - checkMetalake(metalakeIdent, store); - Map mergedConfig = buildCatalogConf(provider, properties); - - long uid = idGenerator.nextId(); - StringIdentifier stringId = StringIdentifier.fromId(uid); - Instant now = Instant.now(); - String creator = PrincipalUtils.getCurrentPrincipal().getName(); - CatalogEntity e = - CatalogEntity.builder() - .withId(uid) - .withName(ident.name()) - .withNamespace(ident.namespace()) - .withType(type) - .withProvider(provider) - .withComment(comment) - .withProperties(StringIdentifier.newPropertiesWithId(stringId, mergedConfig)) - .withAuditInfo( - AuditInfo.builder() - .withCreator(creator) - .withCreateTime(now) - .withLastModifier(creator) - .withLastModifiedTime(now) - .build()) - .build(); - - boolean needClean = true; - try { - store.put(e, false /* overwrite */); - CatalogWrapper wrapper = catalogCache.get(ident, id -> createCatalogWrapper(e, mergedConfig)); - - needClean = false; - return wrapper.catalog; - - } catch (EntityAlreadyExistsException e1) { - needClean = false; - LOG.warn("Catalog {} already exists", ident, e1); - throw new CatalogAlreadyExistsException("Catalog %s already exists", ident); - - } catch (IllegalArgumentException | NoSuchMetalakeException e2) { - throw e2; + return TreeLockUtils.doWithTreeLock( + metalakeIdent, + LockType.WRITE, + () -> { + checkMetalake(metalakeIdent, store); + Map mergedConfig = buildCatalogConf(provider, properties); + + long uid = idGenerator.nextId(); + StringIdentifier stringId = StringIdentifier.fromId(uid); + Instant now = Instant.now(); + String creator = PrincipalUtils.getCurrentPrincipal().getName(); + CatalogEntity e = + CatalogEntity.builder() + .withId(uid) + .withName(ident.name()) + .withNamespace(ident.namespace()) + .withType(type) + .withProvider(provider) + .withComment(comment) + .withProperties(StringIdentifier.newPropertiesWithId(stringId, mergedConfig)) + .withAuditInfo( + AuditInfo.builder() + .withCreator(creator) + .withCreateTime(now) + .withLastModifier(creator) + .withLastModifiedTime(now) + .build()) + .build(); + + boolean needClean = true; + try { + store.put(e, false /* overwrite */); + CatalogWrapper wrapper = + catalogCache.get(ident, id -> createCatalogWrapper(e, mergedConfig)); + + needClean = false; + return wrapper.catalog; + + } catch (EntityAlreadyExistsException e1) { + needClean = false; + LOG.warn("Catalog {} already exists", ident, e1); + throw new CatalogAlreadyExistsException("Catalog %s already exists", ident); + + } catch (IllegalArgumentException | NoSuchMetalakeException e2) { + throw e2; + + } catch (Exception e3) { + catalogCache.invalidate(ident); + LOG.error("Failed to create catalog {}", ident, e3); + if (e3 instanceof RuntimeException) { + throw (RuntimeException) e3; + } + throw new RuntimeException(e3); - } catch (Exception e3) { - catalogCache.invalidate(ident); - LOG.error("Failed to create catalog {}", ident, e3); - if (e3 instanceof RuntimeException) { - throw (RuntimeException) e3; - } - throw new RuntimeException(e3); - - } finally { - if (needClean) { - // since we put the catalog entity into the store but failed to create the catalog instance, - // we need to clean up the entity stored. - try { - store.delete(ident, EntityType.CATALOG, true); - } catch (IOException e4) { - LOG.error("Failed to clean up catalog {}", ident, e4); - } - } - } + } finally { + if (needClean) { + // since we put the catalog entity into the store but failed to create the catalog + // instance, + // we need to clean up the entity stored. + try { + store.delete(ident, EntityType.CATALOG, true); + } catch (IOException e4) { + LOG.error("Failed to clean up catalog {}", ident, e4); + } + } + } + }); } /** @@ -442,115 +465,134 @@ public void testConnection( String comment, Map properties) { NameIdentifier metalakeIdent = NameIdentifier.of(ident.namespace().levels()); - checkMetalake(metalakeIdent, store); - try { - if (store.exists(ident, EntityType.CATALOG)) { - throw new CatalogAlreadyExistsException("Catalog %s already exists", ident); - } + TreeLockUtils.doWithTreeLock( + metalakeIdent, + LockType.READ, + () -> { + checkMetalake(metalakeIdent, store); + try { + if (store.exists(ident, EntityType.CATALOG)) { + throw new CatalogAlreadyExistsException("Catalog %s already exists", ident); + } - Map mergedConfig = buildCatalogConf(provider, properties); - Instant now = Instant.now(); - String creator = PrincipalUtils.getCurrentPrincipal().getName(); - CatalogEntity dummyEntity = - CatalogEntity.builder() - .withId(DUMMY_ID.id()) - .withName(ident.name()) - .withNamespace(ident.namespace()) - .withType(type) - .withProvider(provider) - .withComment(comment) - .withProperties(StringIdentifier.newPropertiesWithId(DUMMY_ID, mergedConfig)) - .withAuditInfo( - AuditInfo.builder() - .withCreator(creator) - .withCreateTime(now) - .withLastModifier(creator) - .withLastModifiedTime(now) - .build()) - .build(); - - CatalogWrapper wrapper = createCatalogWrapper(dummyEntity, mergedConfig); - wrapper.doWithCatalogOps( - c -> { - c.testConnection(ident, type, provider, comment, mergedConfig); - return null; - }); - } catch (GravitinoRuntimeException e) { - throw e; - } catch (Exception e) { - LOG.warn("Failed to test catalog creation {}", ident, e); - if (e instanceof RuntimeException) { - throw (RuntimeException) e; - } - throw new RuntimeException(e); - } + Map mergedConfig = buildCatalogConf(provider, properties); + Instant now = Instant.now(); + String creator = PrincipalUtils.getCurrentPrincipal().getName(); + CatalogEntity dummyEntity = + CatalogEntity.builder() + .withId(DUMMY_ID.id()) + .withName(ident.name()) + .withNamespace(ident.namespace()) + .withType(type) + .withProvider(provider) + .withComment(comment) + .withProperties(StringIdentifier.newPropertiesWithId(DUMMY_ID, mergedConfig)) + .withAuditInfo( + AuditInfo.builder() + .withCreator(creator) + .withCreateTime(now) + .withLastModifier(creator) + .withLastModifiedTime(now) + .build()) + .build(); + + CatalogWrapper wrapper = createCatalogWrapper(dummyEntity, mergedConfig); + return wrapper.doWithCatalogOps( + c -> { + c.testConnection(ident, type, provider, comment, mergedConfig); + return null; + }); + } catch (GravitinoRuntimeException e) { + throw e; + } catch (Exception e) { + LOG.warn("Failed to test catalog creation {}", ident, e); + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } + throw new RuntimeException(e); + } + }); } @Override public void enableCatalog(NameIdentifier ident) throws NoSuchCatalogException, CatalogNotInUseException { NameIdentifier metalakeIdent = NameIdentifier.of(ident.namespace().levels()); - checkMetalake(metalakeIdent, store); - try { - if (catalogInUse(store, ident)) { - return; - } + TreeLockUtils.doWithTreeLock( + metalakeIdent, + LockType.WRITE, + () -> { + checkMetalake(metalakeIdent, store); - store.update( - ident, - CatalogEntity.class, - EntityType.CATALOG, - catalog -> { - CatalogEntity.Builder newCatalogBuilder = newCatalogBuilder(ident.namespace(), catalog); - - Map newProps = - catalog.getProperties() == null - ? new HashMap<>() - : new HashMap<>(catalog.getProperties()); - newProps.put(PROPERTY_IN_USE, "true"); - newCatalogBuilder.withProperties(newProps); - - return newCatalogBuilder.build(); - }); - catalogCache.invalidate(ident); + try { + if (catalogInUse(store, ident)) { + return null; + } - } catch (IOException e) { - throw new RuntimeException(e); - } + store.update( + ident, + CatalogEntity.class, + EntityType.CATALOG, + catalog -> { + CatalogEntity.Builder newCatalogBuilder = + newCatalogBuilder(ident.namespace(), catalog); + + Map newProps = + catalog.getProperties() == null + ? new HashMap<>() + : new HashMap<>(catalog.getProperties()); + newProps.put(PROPERTY_IN_USE, "true"); + newCatalogBuilder.withProperties(newProps); + + return newCatalogBuilder.build(); + }); + catalogCache.invalidate(ident); + return null; + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } @Override public void disableCatalog(NameIdentifier ident) throws NoSuchCatalogException { NameIdentifier metalakeIdent = NameIdentifier.of(ident.namespace().levels()); - checkMetalake(metalakeIdent, store); - try { - if (!catalogInUse(store, ident)) { - return; - } - store.update( - ident, - CatalogEntity.class, - EntityType.CATALOG, - catalog -> { - CatalogEntity.Builder newCatalogBuilder = newCatalogBuilder(ident.namespace(), catalog); - - Map newProps = - catalog.getProperties() == null - ? new HashMap<>() - : new HashMap<>(catalog.getProperties()); - newProps.put(PROPERTY_IN_USE, "false"); - newCatalogBuilder.withProperties(newProps); - - return newCatalogBuilder.build(); - }); - catalogCache.invalidate(ident); + TreeLockUtils.doWithTreeLock( + metalakeIdent, + LockType.WRITE, + () -> { + checkMetalake(metalakeIdent, store); - } catch (IOException e) { - throw new RuntimeException(e); - } + try { + if (!catalogInUse(store, ident)) { + return null; + } + store.update( + ident, + CatalogEntity.class, + EntityType.CATALOG, + catalog -> { + CatalogEntity.Builder newCatalogBuilder = + newCatalogBuilder(ident.namespace(), catalog); + + Map newProps = + catalog.getProperties() == null + ? new HashMap<>() + : new HashMap<>(catalog.getProperties()); + newProps.put(PROPERTY_IN_USE, "false"); + newCatalogBuilder.withProperties(newProps); + + return newCatalogBuilder.build(); + }); + catalogCache.invalidate(ident); + return null; + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } /** @@ -565,121 +607,134 @@ public void disableCatalog(NameIdentifier ident) throws NoSuchCatalogException { @Override public Catalog alterCatalog(NameIdentifier ident, CatalogChange... changes) throws NoSuchCatalogException, IllegalArgumentException { - checkCatalogInUse(store, ident); - - // There could be a race issue that someone is using the catalog from cache while we are - // updating it. - - CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident); - if (catalogWrapper == null) { - throw new NoSuchCatalogException(CATALOG_DOES_NOT_EXIST_MSG, ident); - } - - try { - catalogWrapper.doWithPropertiesMeta( - f -> { - Pair, Map> alterProperty = - getCatalogAlterProperty(changes); - validatePropertyForAlter( - f.catalogPropertiesMetadata(), alterProperty.getLeft(), alterProperty.getRight()); - return null; - }); - } catch (IllegalArgumentException e1) { - throw e1; - } catch (Exception e) { - LOG.error("Failed to alter catalog {}", ident, e); - throw new RuntimeException(e); - } - - catalogCache.invalidate(ident); - try { - CatalogEntity updatedCatalog = - store.update( - ident, - CatalogEntity.class, - EntityType.CATALOG, - catalog -> { - CatalogEntity.Builder newCatalogBuilder = - newCatalogBuilder(ident.namespace(), catalog); - - Map newProps = - catalog.getProperties() == null - ? new HashMap<>() - : new HashMap<>(catalog.getProperties()); - newCatalogBuilder = updateEntity(newCatalogBuilder, newProps, changes); - - return newCatalogBuilder.build(); - }); - return Objects.requireNonNull( - catalogCache.get( - updatedCatalog.nameIdentifier(), - id -> createCatalogWrapper(updatedCatalog, null))) - .catalog; - - } catch (NoSuchEntityException ne) { - LOG.warn("Catalog {} does not exist", ident, ne); - throw new NoSuchCatalogException(CATALOG_DOES_NOT_EXIST_MSG, ident); - - } catch (IllegalArgumentException iae) { - LOG.warn("Failed to alter catalog {} with unknown change", ident, iae); - throw iae; - - } catch (IOException ioe) { - LOG.error("Failed to alter catalog {}", ident, ioe); - throw new RuntimeException(ioe); - } + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(ident.namespace().level(0)), + LockType.WRITE, + () -> { + checkCatalogInUse(store, ident); + + // There could be a race issue that someone is using the catalog from cache while we are + // updating it. + + CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident); + if (catalogWrapper == null) { + throw new NoSuchCatalogException(CATALOG_DOES_NOT_EXIST_MSG, ident); + } + + try { + catalogWrapper.doWithPropertiesMeta( + f -> { + Pair, Map> alterProperty = + getCatalogAlterProperty(changes); + validatePropertyForAlter( + f.catalogPropertiesMetadata(), + alterProperty.getLeft(), + alterProperty.getRight()); + return null; + }); + } catch (IllegalArgumentException e1) { + throw e1; + } catch (Exception e) { + LOG.error("Failed to alter catalog {}", ident, e); + throw new RuntimeException(e); + } + + catalogCache.invalidate(ident); + try { + CatalogEntity updatedCatalog = + store.update( + ident, + CatalogEntity.class, + EntityType.CATALOG, + catalog -> { + CatalogEntity.Builder newCatalogBuilder = + newCatalogBuilder(ident.namespace(), catalog); + + Map newProps = + catalog.getProperties() == null + ? new HashMap<>() + : new HashMap<>(catalog.getProperties()); + newCatalogBuilder = updateEntity(newCatalogBuilder, newProps, changes); + + return newCatalogBuilder.build(); + }); + return Objects.requireNonNull( + catalogCache.get( + updatedCatalog.nameIdentifier(), + id -> createCatalogWrapper(updatedCatalog, null))) + .catalog; + + } catch (NoSuchEntityException ne) { + LOG.warn("Catalog {} does not exist", ident, ne); + throw new NoSuchCatalogException(CATALOG_DOES_NOT_EXIST_MSG, ident); + + } catch (IllegalArgumentException iae) { + LOG.warn("Failed to alter catalog {} with unknown change", ident, iae); + throw iae; + + } catch (IOException ioe) { + LOG.error("Failed to alter catalog {}", ident, ioe); + throw new RuntimeException(ioe); + } + }); } @Override public boolean dropCatalog(NameIdentifier ident, boolean force) throws NonEmptyEntityException, CatalogInUseException { NameIdentifier metalakeIdent = NameIdentifier.of(ident.namespace().levels()); - checkMetalake(metalakeIdent, store); - - try { - boolean catalogInUse = catalogInUse(store, ident); - if (catalogInUse && !force) { - throw new CatalogInUseException( - "Catalog %s is in use, please disable it first or use force option", ident); - } - List schemas = - store.list( - Namespace.of(ident.namespace().level(0), ident.name()), - SchemaEntity.class, - EntityType.SCHEMA); - CatalogEntity catalogEntity = store.get(ident, EntityType.CATALOG, CatalogEntity.class); - - if (!schemas.isEmpty() && !force) { - // the Kafka catalog is special, it includes a default schema - if (!catalogEntity.getProvider().equals("kafka") || schemas.size() > 1) { - throw new NonEmptyCatalogException( - "Catalog %s has schemas, please drop them first or use force option", ident); - } - } + return TreeLockUtils.doWithTreeLock( + metalakeIdent, + LockType.WRITE, + () -> { + checkMetalake(metalakeIdent, store); + try { + boolean catalogInUse = catalogInUse(store, ident); + if (catalogInUse && !force) { + throw new CatalogInUseException( + "Catalog %s is in use, please disable it first or use force option", ident); + } - CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident); - if (includeManagedEntities(catalogEntity)) { - schemas.forEach( - schema -> { - try { - catalogWrapper.doWithSchemaOps( - schemaOps -> schemaOps.dropSchema(schema.nameIdentifier(), true)); - } catch (Exception e) { - LOG.warn("Failed to drop schema {}", schema.nameIdentifier()); - throw new RuntimeException("Failed to drop schema " + schema.nameIdentifier(), e); + List schemas = + store.list( + Namespace.of(ident.namespace().level(0), ident.name()), + SchemaEntity.class, + EntityType.SCHEMA); + CatalogEntity catalogEntity = store.get(ident, EntityType.CATALOG, CatalogEntity.class); + + if (!schemas.isEmpty() && !force) { + // the Kafka catalog is special, it includes a default schema + if (!catalogEntity.getProvider().equals("kafka") || schemas.size() > 1) { + throw new NonEmptyCatalogException( + "Catalog %s has schemas, please drop them first or use force option", ident); } - }); - } - catalogCache.invalidate(ident); - return store.delete(ident, EntityType.CATALOG, true); + } - } catch (NoSuchMetalakeException | NoSuchCatalogException ignored) { - return false; + CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident); + if (includeManagedEntities(catalogEntity)) { + schemas.forEach( + schema -> { + try { + catalogWrapper.doWithSchemaOps( + schemaOps -> schemaOps.dropSchema(schema.nameIdentifier(), true)); + } catch (Exception e) { + LOG.warn("Failed to drop schema {}", schema.nameIdentifier()); + throw new RuntimeException( + "Failed to drop schema " + schema.nameIdentifier(), e); + } + }); + } + catalogCache.invalidate(ident); + return store.delete(ident, EntityType.CATALOG, true); - } catch (IOException e) { - throw new RuntimeException(e); - } + } catch (NoSuchMetalakeException | NoSuchCatalogException ignored) { + return false; + + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } private boolean includeManagedEntities(CatalogEntity catalogEntity) { diff --git a/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java b/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java index 98c6311bd7c..89209915c02 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java +++ b/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java @@ -33,6 +33,8 @@ import org.apache.gravitino.exceptions.NonEmptyEntityException; import org.apache.gravitino.file.Fileset; import org.apache.gravitino.file.FilesetChange; +import org.apache.gravitino.lock.LockType; +import org.apache.gravitino.lock.TreeLockUtils; import org.apache.gravitino.storage.IdGenerator; public class FilesetOperationDispatcher extends OperationDispatcher implements FilesetDispatcher { @@ -57,10 +59,14 @@ public FilesetOperationDispatcher( */ @Override public NameIdentifier[] listFilesets(Namespace namespace) throws NoSuchSchemaException { - return doWithCatalog( - getCatalogIdentifier(NameIdentifier.of(namespace.levels())), - c -> c.doWithFilesetOps(f -> f.listFilesets(namespace)), - NoSuchSchemaException.class); + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(namespace.levels()), + LockType.READ, + () -> + doWithCatalog( + getCatalogIdentifier(NameIdentifier.of(namespace.levels())), + c -> c.doWithFilesetOps(f -> f.listFilesets(namespace)), + NoSuchSchemaException.class)); } /** @@ -73,19 +79,23 @@ public NameIdentifier[] listFilesets(Namespace namespace) throws NoSuchSchemaExc @Override public Fileset loadFileset(NameIdentifier ident) throws NoSuchFilesetException { NameIdentifier catalogIdent = getCatalogIdentifier(ident); - Fileset fileset = - doWithCatalog( - catalogIdent, - c -> c.doWithFilesetOps(f -> f.loadFileset(ident)), - NoSuchFilesetException.class); - - // Currently we only support maintaining the Fileset in the Gravitino's store. - return EntityCombinedFileset.of(fileset) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdent, - HasPropertyMetadata::filesetPropertiesMetadata, - fileset.properties())); + return TreeLockUtils.doWithTreeLock( + ident, + LockType.READ, + () -> { + Fileset fileset = + doWithCatalog( + catalogIdent, + c -> c.doWithFilesetOps(f -> f.loadFileset(ident)), + NoSuchFilesetException.class); + // Currently we only support maintaining the Fileset in the Gravitino's store. + return EntityCombinedFileset.of(fileset) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, + HasPropertyMetadata::filesetPropertiesMetadata, + fileset.properties())); + }); } /** @@ -114,34 +124,42 @@ public Fileset createFileset( Map properties) throws NoSuchSchemaException, FilesetAlreadyExistsException { NameIdentifier catalogIdent = getCatalogIdentifier(ident); - doWithCatalog( - catalogIdent, - c -> - c.doWithPropertiesMeta( - p -> { - validatePropertyForCreate(p.filesetPropertiesMetadata(), properties); - return null; - }), - IllegalArgumentException.class); - long uid = idGenerator.nextId(); - StringIdentifier stringId = StringIdentifier.fromId(uid); - Map updatedProperties = - StringIdentifier.newPropertiesWithId(stringId, properties); - Fileset createdFileset = - doWithCatalog( - catalogIdent, - c -> - c.doWithFilesetOps( - f -> f.createFileset(ident, comment, type, storageLocation, updatedProperties)), - NoSuchSchemaException.class, - FilesetAlreadyExistsException.class); - return EntityCombinedFileset.of(createdFileset) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdent, - HasPropertyMetadata::filesetPropertiesMetadata, - createdFileset.properties())); + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(ident.namespace().levels()), + LockType.WRITE, + () -> { + doWithCatalog( + catalogIdent, + c -> + c.doWithPropertiesMeta( + p -> { + validatePropertyForCreate(p.filesetPropertiesMetadata(), properties); + return null; + }), + IllegalArgumentException.class); + long uid = idGenerator.nextId(); + StringIdentifier stringId = StringIdentifier.fromId(uid); + Map updatedProperties = + StringIdentifier.newPropertiesWithId(stringId, properties); + + Fileset createdFileset = + doWithCatalog( + catalogIdent, + c -> + c.doWithFilesetOps( + f -> + f.createFileset( + ident, comment, type, storageLocation, updatedProperties)), + NoSuchSchemaException.class, + FilesetAlreadyExistsException.class); + return EntityCombinedFileset.of(createdFileset) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, + HasPropertyMetadata::filesetPropertiesMetadata, + createdFileset.properties())); + }); } /** @@ -162,21 +180,26 @@ public Fileset createFileset( @Override public Fileset alterFileset(NameIdentifier ident, FilesetChange... changes) throws NoSuchFilesetException, IllegalArgumentException { - validateAlterProperties(ident, HasPropertyMetadata::filesetPropertiesMetadata, changes); + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(ident.namespace().levels()), + LockType.WRITE, + () -> { + validateAlterProperties(ident, HasPropertyMetadata::filesetPropertiesMetadata, changes); - NameIdentifier catalogIdent = getCatalogIdentifier(ident); - Fileset alteredFileset = - doWithCatalog( - catalogIdent, - c -> c.doWithFilesetOps(f -> f.alterFileset(ident, changes)), - NoSuchFilesetException.class, - IllegalArgumentException.class); - return EntityCombinedFileset.of(alteredFileset) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdent, - HasPropertyMetadata::filesetPropertiesMetadata, - alteredFileset.properties())); + NameIdentifier catalogIdent = getCatalogIdentifier(ident); + Fileset alteredFileset = + doWithCatalog( + catalogIdent, + c -> c.doWithFilesetOps(f -> f.alterFileset(ident, changes)), + NoSuchFilesetException.class, + IllegalArgumentException.class); + return EntityCombinedFileset.of(alteredFileset) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, + HasPropertyMetadata::filesetPropertiesMetadata, + alteredFileset.properties())); + }); } /** @@ -190,10 +213,14 @@ public Fileset alterFileset(NameIdentifier ident, FilesetChange... changes) */ @Override public boolean dropFileset(NameIdentifier ident) { - return doWithCatalog( - getCatalogIdentifier(ident), - c -> c.doWithFilesetOps(f -> f.dropFileset(ident)), - NonEmptyEntityException.class); + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(ident.namespace().levels()), + LockType.WRITE, + () -> + doWithCatalog( + getCatalogIdentifier(ident), + c -> c.doWithFilesetOps(f -> f.dropFileset(ident)), + NonEmptyEntityException.class)); } /** @@ -208,9 +235,13 @@ public boolean dropFileset(NameIdentifier ident) { @Override public String getFileLocation(NameIdentifier ident, String subPath) throws NoSuchFilesetException { - return doWithCatalog( - getCatalogIdentifier(ident), - c -> c.doWithFilesetOps(f -> f.getFileLocation(ident, subPath)), - NonEmptyEntityException.class); + return TreeLockUtils.doWithTreeLock( + ident, + LockType.READ, + () -> + doWithCatalog( + getCatalogIdentifier(ident), + c -> c.doWithFilesetOps(f -> f.getFileLocation(ident, subPath)), + NonEmptyEntityException.class)); } } diff --git a/core/src/main/java/org/apache/gravitino/catalog/PartitionOperationDispatcher.java b/core/src/main/java/org/apache/gravitino/catalog/PartitionOperationDispatcher.java index 5ff0696d51e..5831bda7b57 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/PartitionOperationDispatcher.java +++ b/core/src/main/java/org/apache/gravitino/catalog/PartitionOperationDispatcher.java @@ -23,6 +23,8 @@ import org.apache.gravitino.exceptions.NoSuchPartitionException; import org.apache.gravitino.exceptions.NoSuchTableException; import org.apache.gravitino.exceptions.PartitionAlreadyExistsException; +import org.apache.gravitino.lock.LockType; +import org.apache.gravitino.lock.TreeLockUtils; import org.apache.gravitino.rel.SupportsPartitions; import org.apache.gravitino.rel.partitions.Partition; import org.apache.gravitino.storage.IdGenerator; @@ -44,39 +46,64 @@ public PartitionOperationDispatcher( @Override public String[] listPartitionNames(NameIdentifier tableIdent) { - return doWithTable( - tableIdent, SupportsPartitions::listPartitionNames, NoSuchTableException.class); + return TreeLockUtils.doWithTreeLock( + tableIdent, + LockType.READ, + () -> + doWithTable( + tableIdent, SupportsPartitions::listPartitionNames, NoSuchTableException.class)); } @Override public Partition[] listPartitions(NameIdentifier tableIdent) { - return doWithTable(tableIdent, SupportsPartitions::listPartitions, NoSuchTableException.class); + return TreeLockUtils.doWithTreeLock( + tableIdent, + LockType.READ, + () -> + doWithTable( + tableIdent, SupportsPartitions::listPartitions, NoSuchTableException.class)); } @Override public Partition getPartition(NameIdentifier tableIdent, String partitionName) throws NoSuchPartitionException { - return doWithTable( - tableIdent, p -> p.getPartition(partitionName), NoSuchPartitionException.class); + return TreeLockUtils.doWithTreeLock( + tableIdent, + LockType.READ, + () -> + doWithTable( + tableIdent, p -> p.getPartition(partitionName), NoSuchPartitionException.class)); } @Override public Partition addPartition(NameIdentifier tableIdent, Partition partition) throws PartitionAlreadyExistsException { - return doWithTable( - tableIdent, p -> p.addPartition(partition), PartitionAlreadyExistsException.class); + return TreeLockUtils.doWithTreeLock( + tableIdent, + LockType.WRITE, + () -> + doWithTable( + tableIdent, p -> p.addPartition(partition), PartitionAlreadyExistsException.class)); } @Override public boolean dropPartition(NameIdentifier tableIdent, String partitionName) { - return doWithTable( - tableIdent, p -> p.dropPartition(partitionName), NoSuchPartitionException.class); + return TreeLockUtils.doWithTreeLock( + tableIdent, + LockType.WRITE, + () -> + doWithTable( + tableIdent, p -> p.dropPartition(partitionName), NoSuchPartitionException.class)); } @Override public boolean purgePartition(NameIdentifier tableIdent, String partitionName) throws UnsupportedOperationException { - return doWithTable( - tableIdent, p -> p.purgePartition(partitionName), NoSuchPartitionException.class); + return TreeLockUtils.doWithTreeLock( + tableIdent, + LockType.WRITE, + () -> + doWithTable( + tableIdent, p -> p.dropPartition(partitionName), NoSuchPartitionException.class)); } } diff --git a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java index ea423abfab3..964d5c00a09 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java +++ b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java @@ -72,10 +72,14 @@ public SchemaOperationDispatcher( */ @Override public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException { - return doWithCatalog( - getCatalogIdentifier(NameIdentifier.of(namespace.levels())), - c -> c.doWithSchemaOps(s -> s.listSchemas(namespace)), - NoSuchCatalogException.class); + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(namespace.levels()), + LockType.READ, + () -> + doWithCatalog( + getCatalogIdentifier(NameIdentifier.of(namespace.levels())), + c -> c.doWithSchemaOps(s -> s.listSchemas(namespace)), + NoSuchCatalogException.class)); } /** @@ -94,72 +98,80 @@ public Schema createSchema(NameIdentifier ident, String comment, Map - c.doWithPropertiesMeta( - p -> { - validatePropertyForCreate(p.schemaPropertiesMetadata(), properties); - return null; - }), - IllegalArgumentException.class); - long uid = idGenerator.nextId(); - // Add StringIdentifier to the properties, the specific catalog will handle this - // StringIdentifier to make sure only when the operation is successful, the related - // SchemaEntity will be visible. - StringIdentifier stringId = StringIdentifier.fromId(uid); - Map updatedProperties = - StringIdentifier.newPropertiesWithId(stringId, properties); - - // we do not retrieve the schema again (to obtain some values generated by underlying catalog) - // since some catalogs' API is async and the schema may not be created immediately - Schema schema = - doWithCatalog( - catalogIdent, - c -> c.doWithSchemaOps(s -> s.createSchema(ident, comment, updatedProperties)), - NoSuchCatalogException.class, - SchemaAlreadyExistsException.class); - - // If the Schema is maintained by the Gravitino's store, we don't have to store again. - boolean isManagedSchema = isManagedEntity(catalogIdent, Capability.Scope.SCHEMA); - if (isManagedSchema) { - return EntityCombinedSchema.of(schema) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdent, - HasPropertyMetadata::schemaPropertiesMetadata, - schema.properties())); - } - - SchemaEntity schemaEntity = - SchemaEntity.builder() - .withId(uid) - .withName(ident.name()) - .withNamespace(ident.namespace()) - .withAuditInfo( - AuditInfo.builder() - .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) - .withCreateTime(Instant.now()) - .build()) - .build(); - - try { - store.put(schemaEntity, true /* overwrite */); - } catch (Exception e) { - LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e); - return EntityCombinedSchema.of(schema) - .withHiddenPropertiesSet( - getHiddenPropertyNames( + LockType.WRITE, + () -> { + doWithCatalog( + catalogIdent, + c -> + c.doWithPropertiesMeta( + p -> { + validatePropertyForCreate(p.schemaPropertiesMetadata(), properties); + return null; + }), + IllegalArgumentException.class); + long uid = idGenerator.nextId(); + // Add StringIdentifier to the properties, the specific catalog will handle this + // StringIdentifier to make sure only when the operation is successful, the related + // SchemaEntity will be visible. + StringIdentifier stringId = StringIdentifier.fromId(uid); + Map updatedProperties = + StringIdentifier.newPropertiesWithId(stringId, properties); + + // we do not retrieve the schema again (to obtain some values generated by underlying + // catalog) + // since some catalogs' API is async and the schema may not be created immediately + Schema schema = + doWithCatalog( catalogIdent, - HasPropertyMetadata::schemaPropertiesMetadata, - schema.properties())); - } - - // Merge both the metadata from catalog operation and the metadata from entity store. - return EntityCombinedSchema.of(schema, schemaEntity) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdent, HasPropertyMetadata::schemaPropertiesMetadata, schema.properties())); + c -> c.doWithSchemaOps(s -> s.createSchema(ident, comment, updatedProperties)), + NoSuchCatalogException.class, + SchemaAlreadyExistsException.class); + + // If the Schema is maintained by the Gravitino's store, we don't have to store again. + boolean isManagedSchema = isManagedEntity(catalogIdent, Capability.Scope.SCHEMA); + if (isManagedSchema) { + return EntityCombinedSchema.of(schema) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, + HasPropertyMetadata::schemaPropertiesMetadata, + schema.properties())); + } + + SchemaEntity schemaEntity = + SchemaEntity.builder() + .withId(uid) + .withName(ident.name()) + .withNamespace(ident.namespace()) + .withAuditInfo( + AuditInfo.builder() + .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) + .withCreateTime(Instant.now()) + .build()) + .build(); + + try { + store.put(schemaEntity, true /* overwrite */); + } catch (Exception e) { + LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e); + return EntityCombinedSchema.of(schema) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, + HasPropertyMetadata::schemaPropertiesMetadata, + schema.properties())); + } + + // Merge both the metadata from catalog operation and the metadata from entity store. + return EntityCombinedSchema.of(schema, schemaEntity) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, + HasPropertyMetadata::schemaPropertiesMetadata, + schema.properties())); + }); } /** @@ -200,68 +212,73 @@ public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException { @Override public Schema alterSchema(NameIdentifier ident, SchemaChange... changes) throws NoSuchSchemaException { - validateAlterProperties(ident, HasPropertyMetadata::schemaPropertiesMetadata, changes); NameIdentifier catalogIdent = getCatalogIdentifier(ident); - Schema alteredSchema = - doWithCatalog( - catalogIdent, - c -> c.doWithSchemaOps(s -> s.alterSchema(ident, changes)), - NoSuchSchemaException.class); - - // If the Schema is maintained by the Gravitino's store, we don't have to alter again. - boolean isManagedSchema = isManagedEntity(catalogIdent, Capability.Scope.SCHEMA); - if (isManagedSchema) { - return EntityCombinedSchema.of(alteredSchema) - .withHiddenPropertiesSet( - getHiddenPropertyNames( + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(ident.namespace().levels()), + LockType.WRITE, + () -> { + validateAlterProperties(ident, HasPropertyMetadata::schemaPropertiesMetadata, changes); + Schema alteredSchema = + doWithCatalog( catalogIdent, - HasPropertyMetadata::schemaPropertiesMetadata, - alteredSchema.properties())); - } - - StringIdentifier stringId = getStringIdFromProperties(alteredSchema.properties()); - // Case 1: The schema is not created by Gravitino. - if (stringId == null) { - return EntityCombinedSchema.of(alteredSchema) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdent, - HasPropertyMetadata::schemaPropertiesMetadata, - alteredSchema.properties())); - } - - SchemaEntity updatedSchemaEntity = - operateOnEntity( - ident, - id -> - store.update( - id, - SchemaEntity.class, - SCHEMA, - schemaEntity -> - SchemaEntity.builder() - .withId(schemaEntity.id()) - .withName(schemaEntity.name()) - .withNamespace(ident.namespace()) - .withAuditInfo( - AuditInfo.builder() - .withCreator(schemaEntity.auditInfo().creator()) - .withCreateTime(schemaEntity.auditInfo().createTime()) - .withLastModifier( - PrincipalUtils.getCurrentPrincipal().getName()) - .withLastModifiedTime(Instant.now()) - .build()) - .build()), - "UPDATE", - stringId.id()); - - return EntityCombinedSchema.of(alteredSchema, updatedSchemaEntity) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdent, - HasPropertyMetadata::schemaPropertiesMetadata, - alteredSchema.properties())); + c -> c.doWithSchemaOps(s -> s.alterSchema(ident, changes)), + NoSuchSchemaException.class); + + // If the Schema is maintained by the Gravitino's store, we don't have to alter again. + boolean isManagedSchema = isManagedEntity(catalogIdent, Capability.Scope.SCHEMA); + if (isManagedSchema) { + return EntityCombinedSchema.of(alteredSchema) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, + HasPropertyMetadata::schemaPropertiesMetadata, + alteredSchema.properties())); + } + + StringIdentifier stringId = getStringIdFromProperties(alteredSchema.properties()); + // Case 1: The schema is not created by Gravitino. + if (stringId == null) { + return EntityCombinedSchema.of(alteredSchema) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, + HasPropertyMetadata::schemaPropertiesMetadata, + alteredSchema.properties())); + } + + SchemaEntity updatedSchemaEntity = + operateOnEntity( + ident, + id -> + store.update( + id, + SchemaEntity.class, + SCHEMA, + schemaEntity -> + SchemaEntity.builder() + .withId(schemaEntity.id()) + .withName(schemaEntity.name()) + .withNamespace(ident.namespace()) + .withAuditInfo( + AuditInfo.builder() + .withCreator(schemaEntity.auditInfo().creator()) + .withCreateTime(schemaEntity.auditInfo().createTime()) + .withLastModifier( + PrincipalUtils.getCurrentPrincipal().getName()) + .withLastModifiedTime(Instant.now()) + .build()) + .build()), + "UPDATE", + stringId.id()); + + return EntityCombinedSchema.of(alteredSchema, updatedSchemaEntity) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, + HasPropertyMetadata::schemaPropertiesMetadata, + alteredSchema.properties())); + }); } /** @@ -276,35 +293,42 @@ public Schema alterSchema(NameIdentifier ident, SchemaChange... changes) @Override public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException { NameIdentifier catalogIdent = getCatalogIdentifier(ident); - boolean droppedFromCatalog = - doWithCatalog( - catalogIdent, - c -> c.doWithSchemaOps(s -> s.dropSchema(ident, cascade)), - NonEmptySchemaException.class, - RuntimeException.class); - - // For unmanaged schema, it could happen that the schema: - // 1. Is not found in the catalog (dropped directly from underlying sources) - // 2. Is found in the catalog but not in the store (not managed by Gravitino) - // 3. Is found in the catalog and the store (managed by Gravitino) - // 4. Neither found in the catalog nor in the store. - // In all situations, we try to delete the schema from the store, but we don't take the - // return value of the store operation into account. We only take the return value of the - // catalog into account. - // - // For managed schema, we should take the return value of the store operation into account. - boolean droppedFromStore = false; - try { - droppedFromStore = store.delete(ident, SCHEMA, cascade); - } catch (NoSuchEntityException e) { - LOG.warn("The schema to be dropped does not exist in the store: {}", ident, e); - } catch (Exception e) { - throw new RuntimeException(e); - } - return isManagedEntity(catalogIdent, Capability.Scope.SCHEMA) - ? droppedFromStore - : droppedFromCatalog; + return TreeLockUtils.doWithTreeLock( + catalogIdent, + LockType.WRITE, + () -> { + boolean droppedFromCatalog = + doWithCatalog( + catalogIdent, + c -> c.doWithSchemaOps(s -> s.dropSchema(ident, cascade)), + NonEmptySchemaException.class, + RuntimeException.class); + + // For unmanaged schema, it could happen that the schema: + // 1. Is not found in the catalog (dropped directly from underlying sources) + // 2. Is found in the catalog but not in the store (not managed by Gravitino) + // 3. Is found in the catalog and the store (managed by Gravitino) + // 4. Neither found in the catalog nor in the store. + // In all situations, we try to delete the schema from the store, but we don't take the + // return value of the store operation into account. We only take the return value of the + // catalog into account. + // + // For managed schema, we should take the return value of the store operation into + // account. + boolean droppedFromStore = false; + try { + droppedFromStore = store.delete(ident, SCHEMA, cascade); + } catch (NoSuchEntityException e) { + LOG.warn("The schema to be dropped does not exist in the store: {}", ident, e); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return isManagedEntity(catalogIdent, Capability.Scope.SCHEMA) + ? droppedFromStore + : droppedFromCatalog; + }); } private void importSchema(NameIdentifier identifier) { diff --git a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java index da869d65f6a..1cfa4b1d77f 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java +++ b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java @@ -91,10 +91,14 @@ public TableOperationDispatcher( */ @Override public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaException { - return doWithCatalog( - getCatalogIdentifier(NameIdentifier.of(namespace.levels())), - c -> c.doWithTableOps(t -> t.listTables(namespace)), - NoSuchSchemaException.class); + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(namespace.levels()), + LockType.READ, + () -> + doWithCatalog( + getCatalogIdentifier(NameIdentifier.of(namespace.levels())), + c -> c.doWithTableOps(t -> t.listTables(namespace)), + NoSuchSchemaException.class)); } /** @@ -191,71 +195,76 @@ public Table createTable( @Override public Table alterTable(NameIdentifier ident, TableChange... changes) throws NoSuchTableException, IllegalArgumentException { - validateAlterProperties(ident, HasPropertyMetadata::tablePropertiesMetadata, changes); - - NameIdentifier catalogIdent = getCatalogIdentifier(ident); - Table alteredTable = - doWithCatalog( - catalogIdent, - c -> - c.doWithTableOps( - t -> t.alterTable(ident, applyCapabilities(c.capabilities(), changes))), - NoSuchTableException.class, - IllegalArgumentException.class); - - StringIdentifier stringId = getStringIdFromProperties(alteredTable.properties()); - // Case 1: The table is not created by Gravitino. - if (stringId == null) { - return EntityCombinedTable.of(alteredTable) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - getCatalogIdentifier(ident), - HasPropertyMetadata::tablePropertiesMetadata, - alteredTable.properties())); - } - - TableEntity updatedTableEntity = - operateOnEntity( - ident, - id -> - store.update( - id, - TableEntity.class, - TABLE, - tableEntity -> { - String newName = - Arrays.stream(changes) - .filter(c -> c instanceof TableChange.RenameTable) - .map(c -> ((TableChange.RenameTable) c).getNewName()) - .reduce((c1, c2) -> c2) - .orElse(tableEntity.name()); - // Update the columns - Pair> columnsUpdateResult = - updateColumnsIfNecessary(alteredTable, tableEntity); - - return TableEntity.builder() - .withId(tableEntity.id()) - .withName(newName) - .withNamespace(ident.namespace()) - .withColumns(columnsUpdateResult.getRight()) - .withAuditInfo( - AuditInfo.builder() - .withCreator(tableEntity.auditInfo().creator()) - .withCreateTime(tableEntity.auditInfo().createTime()) - .withLastModifier(PrincipalUtils.getCurrentPrincipal().getName()) - .withLastModifiedTime(Instant.now()) - .build()) - .build(); - }), - "UPDATE", - stringId.id()); - - return EntityCombinedTable.of(alteredTable, updatedTableEntity) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - getCatalogIdentifier(ident), - HasPropertyMetadata::tablePropertiesMetadata, - alteredTable.properties())); + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(ident.namespace().levels()), + LockType.WRITE, + () -> { + validateAlterProperties(ident, HasPropertyMetadata::tablePropertiesMetadata, changes); + NameIdentifier catalogIdent = getCatalogIdentifier(ident); + Table alteredTable = + doWithCatalog( + catalogIdent, + c -> + c.doWithTableOps( + t -> t.alterTable(ident, applyCapabilities(c.capabilities(), changes))), + NoSuchTableException.class, + IllegalArgumentException.class); + + StringIdentifier stringId = getStringIdFromProperties(alteredTable.properties()); + // Case 1: The table is not created by Gravitino. + if (stringId == null) { + return EntityCombinedTable.of(alteredTable) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + getCatalogIdentifier(ident), + HasPropertyMetadata::tablePropertiesMetadata, + alteredTable.properties())); + } + + TableEntity updatedTableEntity = + operateOnEntity( + ident, + id -> + store.update( + id, + TableEntity.class, + TABLE, + tableEntity -> { + String newName = + Arrays.stream(changes) + .filter(c -> c instanceof TableChange.RenameTable) + .map(c -> ((TableChange.RenameTable) c).getNewName()) + .reduce((c1, c2) -> c2) + .orElse(tableEntity.name()); + // Update the columns + Pair> columnsUpdateResult = + updateColumnsIfNecessary(alteredTable, tableEntity); + + return TableEntity.builder() + .withId(tableEntity.id()) + .withName(newName) + .withNamespace(ident.namespace()) + .withColumns(columnsUpdateResult.getRight()) + .withAuditInfo( + AuditInfo.builder() + .withCreator(tableEntity.auditInfo().creator()) + .withCreateTime(tableEntity.auditInfo().createTime()) + .withLastModifier( + PrincipalUtils.getCurrentPrincipal().getName()) + .withLastModifiedTime(Instant.now()) + .build()) + .build(); + }), + "UPDATE", + stringId.id()); + + return EntityCombinedTable.of(alteredTable, updatedTableEntity) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + getCatalogIdentifier(ident), + HasPropertyMetadata::tablePropertiesMetadata, + alteredTable.properties())); + }); } /** @@ -268,33 +277,40 @@ public Table alterTable(NameIdentifier ident, TableChange... changes) */ @Override public boolean dropTable(NameIdentifier ident) { - NameIdentifier catalogIdent = getCatalogIdentifier(ident); - boolean droppedFromCatalog = - doWithCatalog( - catalogIdent, c -> c.doWithTableOps(t -> t.dropTable(ident)), RuntimeException.class); - - // For unmanaged table, it could happen that the table: - // 1. Is not found in the catalog (dropped directly from underlying sources) - // 2. Is found in the catalog but not in the store (not managed by Gravitino) - // 3. Is found in the catalog and the store (managed by Gravitino) - // 4. Neither found in the catalog nor in the store. - // In all situations, we try to delete the schema from the store, but we don't take the - // return value of the store operation into account. We only take the return value of the - // catalog into account. - // - // For managed table, we should take the return value of the store operation into account. - boolean droppedFromStore = false; - try { - droppedFromStore = store.delete(ident, TABLE); - } catch (NoSuchEntityException e) { - LOG.warn("The table to be dropped does not exist in the store: {}", ident, e); - } catch (Exception e) { - throw new RuntimeException(e); - } - - return isManagedEntity(catalogIdent, Capability.Scope.TABLE) - ? droppedFromStore - : droppedFromCatalog; + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(ident.namespace().levels()), + LockType.WRITE, + () -> { + NameIdentifier catalogIdent = getCatalogIdentifier(ident); + boolean droppedFromCatalog = + doWithCatalog( + catalogIdent, + c -> c.doWithTableOps(t -> t.dropTable(ident)), + RuntimeException.class); + + // For unmanaged table, it could happen that the table: + // 1. Is not found in the catalog (dropped directly from underlying sources) + // 2. Is found in the catalog but not in the store (not managed by Gravitino) + // 3. Is found in the catalog and the store (managed by Gravitino) + // 4. Neither found in the catalog nor in the store. + // In all situations, we try to delete the schema from the store, but we don't take the + // return value of the store operation into account. We only take the return value of the + // catalog into account. + // + // For managed table, we should take the return value of the store operation into account. + boolean droppedFromStore = false; + try { + droppedFromStore = store.delete(ident, TABLE); + } catch (NoSuchEntityException e) { + LOG.warn("The table to be dropped does not exist in the store: {}", ident, e); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return isManagedEntity(catalogIdent, Capability.Scope.TABLE) + ? droppedFromStore + : droppedFromCatalog; + }); } /** @@ -313,37 +329,42 @@ public boolean dropTable(NameIdentifier ident) { */ @Override public boolean purgeTable(NameIdentifier ident) throws UnsupportedOperationException { - NameIdentifier catalogIdent = getCatalogIdentifier(ident); - boolean droppedFromCatalog = - doWithCatalog( - catalogIdent, - c -> c.doWithTableOps(t -> t.purgeTable(ident)), - RuntimeException.class, - UnsupportedOperationException.class); - - // For unmanaged table, it could happen that the table: - // 1. Is not found in the catalog (dropped directly from underlying sources) - // 2. Is found in the catalog but not in the store (not managed by Gravitino) - // 3. Is found in the catalog and the store (managed by Gravitino) - // 4. Neither found in the catalog nor in the store. - // In all situations, we try to delete the schema from the store, but we don't take the - // return value of the store operation into account. We only take the return value of the - // catalog into account. - // - // For managed table, we should take the return value of the store operation into account. - boolean droppedFromStore = false; - try { - droppedFromStore = store.delete(ident, TABLE); - } catch (NoSuchEntityException e) { - LOG.warn("The table to be purged does not exist in the store: {}", ident, e); - return false; - } catch (Exception e) { - throw new RuntimeException(e); - } - - return isManagedEntity(catalogIdent, Capability.Scope.TABLE) - ? droppedFromStore - : droppedFromCatalog; + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(ident.namespace().levels()), + LockType.WRITE, + () -> { + NameIdentifier catalogIdent = getCatalogIdentifier(ident); + boolean droppedFromCatalog = + doWithCatalog( + catalogIdent, + c -> c.doWithTableOps(t -> t.purgeTable(ident)), + RuntimeException.class, + UnsupportedOperationException.class); + + // For unmanaged table, it could happen that the table: + // 1. Is not found in the catalog (dropped directly from underlying sources) + // 2. Is found in the catalog but not in the store (not managed by Gravitino) + // 3. Is found in the catalog and the store (managed by Gravitino) + // 4. Neither found in the catalog nor in the store. + // In all situations, we try to delete the schema from the store, but we don't take the + // return value of the store operation into account. We only take the return value of the + // catalog into account. + // + // For managed table, we should take the return value of the store operation into account. + boolean droppedFromStore = false; + try { + droppedFromStore = store.delete(ident, TABLE); + } catch (NoSuchEntityException e) { + LOG.warn("The table to be purged does not exist in the store: {}", ident, e); + return false; + } catch (Exception e) { + throw new RuntimeException(e); + } + + return isManagedEntity(catalogIdent, Capability.Scope.TABLE) + ? droppedFromStore + : droppedFromCatalog; + }); } private EntityCombinedTable importTable(NameIdentifier identifier) { diff --git a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java index d44a09b1596..c3c01911bda 100644 --- a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java +++ b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java @@ -73,10 +73,14 @@ public TopicOperationDispatcher( */ @Override public NameIdentifier[] listTopics(Namespace namespace) throws NoSuchSchemaException { - return doWithCatalog( - getCatalogIdentifier(NameIdentifier.of(namespace.levels())), - c -> c.doWithTopicOps(t -> t.listTopics(namespace)), - NoSuchSchemaException.class); + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(namespace.levels()), + LockType.READ, + () -> + doWithCatalog( + getCatalogIdentifier(NameIdentifier.of(namespace.levels())), + c -> c.doWithTopicOps(t -> t.listTopics(namespace)), + NoSuchSchemaException.class)); } /** @@ -152,54 +156,61 @@ public Topic createTopic( @Override public Topic alterTopic(NameIdentifier ident, TopicChange... changes) throws NoSuchTopicException, IllegalArgumentException { - validateAlterProperties(ident, HasPropertyMetadata::topicPropertiesMetadata, changes); - - NameIdentifier catalogIdent = getCatalogIdentifier(ident); - - // we do not retrieve the topic again (to obtain some values generated by underlying catalog) - // since some catalogs' API is async and the topic may not be created immediately - Topic alteredTopic = - doWithCatalog( - catalogIdent, - c -> c.doWithTopicOps(t -> t.alterTopic(ident, changes)), - NoSuchTopicException.class, - IllegalArgumentException.class); - TopicEntity updatedTopicEntity = - operateOnEntity( - ident, - id -> - store.update( - id, - TopicEntity.class, - TOPIC, - topicEntity -> - TopicEntity.builder() - .withId(topicEntity.id()) - .withName(topicEntity.name()) - .withNamespace(ident.namespace()) - .withComment( - StringUtils.isBlank(alteredTopic.comment()) - ? topicEntity.comment() - : alteredTopic.comment()) - .withAuditInfo( - AuditInfo.builder() - .withCreator(topicEntity.auditInfo().creator()) - .withCreateTime(topicEntity.auditInfo().createTime()) - .withLastModifier( - PrincipalUtils.getCurrentPrincipal().getName()) - .withLastModifiedTime(Instant.now()) - .build()) - .build()), - "UPDATE", - getStringIdFromProperties(alteredTopic.properties()).id()); - - return EntityCombinedTopic.of(alteredTopic, updatedTopicEntity) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdent, - HasPropertyMetadata::topicPropertiesMetadata, - alteredTopic.properties())); + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(ident.namespace().levels()), + LockType.WRITE, + () -> { + validateAlterProperties(ident, HasPropertyMetadata::topicPropertiesMetadata, changes); + + NameIdentifier catalogIdent = getCatalogIdentifier(ident); + + // we do not retrieve the topic again (to obtain some values generated by underlying + // catalog) + // since some catalogs' API is async and the topic may not be created immediately + Topic alteredTopic = + doWithCatalog( + catalogIdent, + c -> c.doWithTopicOps(t -> t.alterTopic(ident, changes)), + NoSuchTopicException.class, + IllegalArgumentException.class); + + TopicEntity updatedTopicEntity = + operateOnEntity( + ident, + id -> + store.update( + id, + TopicEntity.class, + TOPIC, + topicEntity -> + TopicEntity.builder() + .withId(topicEntity.id()) + .withName(topicEntity.name()) + .withNamespace(ident.namespace()) + .withComment( + StringUtils.isBlank(alteredTopic.comment()) + ? topicEntity.comment() + : alteredTopic.comment()) + .withAuditInfo( + AuditInfo.builder() + .withCreator(topicEntity.auditInfo().creator()) + .withCreateTime(topicEntity.auditInfo().createTime()) + .withLastModifier( + PrincipalUtils.getCurrentPrincipal().getName()) + .withLastModifiedTime(Instant.now()) + .build()) + .build()), + "UPDATE", + getStringIdFromProperties(alteredTopic.properties()).id()); + + return EntityCombinedTopic.of(alteredTopic, updatedTopicEntity) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, + HasPropertyMetadata::topicPropertiesMetadata, + alteredTopic.properties())); + }); } /** @@ -211,33 +222,40 @@ public Topic alterTopic(NameIdentifier ident, TopicChange... changes) */ @Override public boolean dropTopic(NameIdentifier ident) { - NameIdentifier catalogIdent = getCatalogIdentifier(ident); - boolean droppedFromCatalog = - doWithCatalog( - catalogIdent, c -> c.doWithTopicOps(t -> t.dropTopic(ident)), RuntimeException.class); - - // For unmanaged topic, it could happen that the topic: - // 1. Is not found in the catalog (dropped directly from underlying sources) - // 2. Is found in the catalog but not in the store (not managed by Gravitino) - // 3. Is found in the catalog and the store (managed by Gravitino) - // 4. Neither found in the catalog nor in the store. - // In all situations, we try to delete the schema from the store, but we don't take the - // return value of the store operation into account. We only take the return value of the - // catalog into account. - // - // For managed topic, we should take the return value of the store operation into account. - boolean droppedFromStore = false; - try { - droppedFromStore = store.delete(ident, TOPIC); - } catch (NoSuchEntityException e) { - LOG.warn("The topic to be dropped does not exist in the store: {}", ident, e); - } catch (Exception e) { - throw new RuntimeException(e); - } - - return isManagedEntity(catalogIdent, Capability.Scope.TOPIC) - ? droppedFromStore - : droppedFromCatalog; + return TreeLockUtils.doWithTreeLock( + NameIdentifier.of(ident.namespace().levels()), + LockType.WRITE, + () -> { + NameIdentifier catalogIdent = getCatalogIdentifier(ident); + boolean droppedFromCatalog = + doWithCatalog( + catalogIdent, + c -> c.doWithTopicOps(t -> t.dropTopic(ident)), + RuntimeException.class); + + // For unmanaged topic, it could happen that the topic: + // 1. Is not found in the catalog (dropped directly from underlying sources) + // 2. Is found in the catalog but not in the store (not managed by Gravitino) + // 3. Is found in the catalog and the store (managed by Gravitino) + // 4. Neither found in the catalog nor in the store. + // In all situations, we try to delete the schema from the store, but we don't take the + // return value of the store operation into account. We only take the return value of the + // catalog into account. + // + // For managed topic, we should take the return value of the store operation into account. + boolean droppedFromStore = false; + try { + droppedFromStore = store.delete(ident, TOPIC); + } catch (NoSuchEntityException e) { + LOG.warn("The topic to be dropped does not exist in the store: {}", ident, e); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return isManagedEntity(catalogIdent, Capability.Scope.TOPIC) + ? droppedFromStore + : droppedFromCatalog; + }); } private void importTopic(NameIdentifier identifier) { diff --git a/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java b/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java index 0239526e5ee..219ca7bdac4 100644 --- a/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java +++ b/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java @@ -40,6 +40,8 @@ import org.apache.gravitino.exceptions.NoSuchMetalakeException; import org.apache.gravitino.exceptions.NonEmptyEntityException; import org.apache.gravitino.exceptions.NonEmptyMetalakeException; +import org.apache.gravitino.lock.LockType; +import org.apache.gravitino.lock.TreeLockUtils; import org.apache.gravitino.meta.AuditInfo; import org.apache.gravitino.meta.BaseMetalake; import org.apache.gravitino.meta.CatalogEntity; @@ -121,14 +123,18 @@ public static boolean metalakeInUse(EntityStore store, NameIdentifier ident) */ @Override public BaseMetalake[] listMetalakes() { - try { - return store.list(Namespace.empty(), BaseMetalake.class, EntityType.METALAKE).stream() - .map(this::newMetalakeWithResolvedProperties) - .toArray(BaseMetalake[]::new); - } catch (IOException ioe) { - LOG.error("Listing Metalakes failed due to storage issues.", ioe); - throw new RuntimeException(ioe); - } + return TreeLockUtils.doWithRootTreeLock( + LockType.READ, + () -> { + try { + return store.list(Namespace.empty(), BaseMetalake.class, EntityType.METALAKE).stream() + .map(this::newMetalakeWithResolvedProperties) + .toArray(BaseMetalake[]::new); + } catch (IOException ioe) { + LOG.error("Listing Metalakes failed due to storage issues.", ioe); + throw new RuntimeException(ioe); + } + }); } /** @@ -205,16 +211,20 @@ public BaseMetalake createMetalake( .build()) .build(); - try { - store.put(metalake, false /* overwritten */); - return metalake; - } catch (EntityAlreadyExistsException | AlreadyExistsException e) { - LOG.warn("Metalake {} already exists", ident, e); - throw new MetalakeAlreadyExistsException("Metalake %s already exists", ident); - } catch (IOException ioe) { - LOG.error("Loading Metalake {} failed due to storage issues", ident, ioe); - throw new RuntimeException(ioe); - } + return TreeLockUtils.doWithRootTreeLock( + LockType.WRITE, + () -> { + try { + store.put(metalake, false /* overwritten */); + return metalake; + } catch (EntityAlreadyExistsException | AlreadyExistsException e) { + LOG.warn("Metalake {} already exists", ident, e); + throw new MetalakeAlreadyExistsException("Metalake %s already exists", ident); + } catch (IOException ioe) { + LOG.error("Loading Metalake {} failed due to storage issues", ident, ioe); + throw new RuntimeException(ioe); + } + }); } /** @@ -230,120 +240,140 @@ public BaseMetalake createMetalake( @Override public BaseMetalake alterMetalake(NameIdentifier ident, MetalakeChange... changes) throws NoSuchMetalakeException, IllegalArgumentException { - try { - if (!metalakeInUse(store, ident)) { - throw new MetalakeNotInUseException( - "Metalake %s is not in use, please enable it first", ident); - } - - return store.update( - ident, - BaseMetalake.class, - EntityType.METALAKE, - metalake -> { - BaseMetalake.Builder builder = newMetalakeBuilder(metalake); - Map newProps = - metalake.properties() == null - ? Maps.newHashMap() - : Maps.newHashMap(metalake.properties()); - builder = updateEntity(builder, newProps, changes); - - return builder.build(); - }); - - } catch (NoSuchEntityException ne) { - LOG.warn("Metalake {} does not exist", ident, ne); - throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident); - - } catch (IllegalArgumentException iae) { - LOG.warn("Altering Metalake {} failed due to invalid changes", ident, iae); - throw iae; - - } catch (IOException ioe) { - LOG.error("Loading Metalake {} failed due to storage issues", ident, ioe); - throw new RuntimeException(ioe); - } + return TreeLockUtils.doWithRootTreeLock( + LockType.WRITE, + () -> { + try { + if (!metalakeInUse(store, ident)) { + throw new MetalakeNotInUseException( + "Metalake %s is not in use, please enable it first", ident); + } + + return store.update( + ident, + BaseMetalake.class, + EntityType.METALAKE, + metalake -> { + BaseMetalake.Builder builder = newMetalakeBuilder(metalake); + Map newProps = + metalake.properties() == null + ? Maps.newHashMap() + : Maps.newHashMap(metalake.properties()); + builder = updateEntity(builder, newProps, changes); + + return builder.build(); + }); + + } catch (NoSuchEntityException ne) { + LOG.warn("Metalake {} does not exist", ident, ne); + throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident); + + } catch (IllegalArgumentException iae) { + LOG.warn("Altering Metalake {} failed due to invalid changes", ident, iae); + throw iae; + + } catch (IOException ioe) { + LOG.error("Loading Metalake {} failed due to storage issues", ident, ioe); + throw new RuntimeException(ioe); + } + }); } @Override public boolean dropMetalake(NameIdentifier ident, boolean force) throws NonEmptyEntityException, MetalakeInUseException { - try { - boolean inUse = metalakeInUse(store, ident); - if (inUse && !force) { - throw new MetalakeInUseException( - "Metalake %s is in use, please disable it first or use force option", ident); - } - - List catalogEntities = - store.list(Namespace.of(ident.name()), CatalogEntity.class, EntityType.CATALOG); - if (!catalogEntities.isEmpty() && !force) { - throw new NonEmptyMetalakeException( - "Metalake %s has catalogs, please drop them first or use force option", ident); - } - - return store.delete(ident, EntityType.METALAKE, true); - } catch (NoSuchMetalakeException e) { - return false; - - } catch (IOException e) { - throw new RuntimeException(e); - } + return TreeLockUtils.doWithRootTreeLock( + LockType.WRITE, + () -> { + try { + boolean inUse = metalakeInUse(store, ident); + if (inUse && !force) { + throw new MetalakeInUseException( + "Metalake %s is in use, please disable it first or use force option", ident); + } + + List catalogEntities = + store.list(Namespace.of(ident.name()), CatalogEntity.class, EntityType.CATALOG); + if (!catalogEntities.isEmpty() && !force) { + throw new NonEmptyMetalakeException( + "Metalake %s has catalogs, please drop them first or use force option", ident); + } + + return store.delete(ident, EntityType.METALAKE, true); + } catch (NoSuchMetalakeException e) { + return false; + + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } @Override public void enableMetalake(NameIdentifier ident) throws NoSuchMetalakeException { - try { - - boolean inUse = metalakeInUse(store, ident); - if (!inUse) { - store.update( - ident, - BaseMetalake.class, - EntityType.METALAKE, - metalake -> { - BaseMetalake.Builder builder = newMetalakeBuilder(metalake); - - Map newProps = - metalake.properties() == null - ? Maps.newHashMap() - : Maps.newHashMap(metalake.properties()); - newProps.put(PROPERTY_IN_USE, "true"); - builder.withProperties(newProps); - - return builder.build(); - }); - } - } catch (IOException e) { - throw new RuntimeException(e); - } + TreeLockUtils.doWithTreeLock( + ident, + LockType.WRITE, + () -> { + try { + boolean inUse = metalakeInUse(store, ident); + if (!inUse) { + store.update( + ident, + BaseMetalake.class, + EntityType.METALAKE, + metalake -> { + BaseMetalake.Builder builder = newMetalakeBuilder(metalake); + + Map newProps = + metalake.properties() == null + ? Maps.newHashMap() + : Maps.newHashMap(metalake.properties()); + newProps.put(PROPERTY_IN_USE, "true"); + builder.withProperties(newProps); + + return builder.build(); + }); + } + + return null; + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } @Override public void disableMetalake(NameIdentifier ident) throws NoSuchMetalakeException { - try { - boolean inUse = metalakeInUse(store, ident); - if (inUse) { - store.update( - ident, - BaseMetalake.class, - EntityType.METALAKE, - metalake -> { - BaseMetalake.Builder builder = newMetalakeBuilder(metalake); - - Map newProps = - metalake.properties() == null - ? Maps.newHashMap() - : Maps.newHashMap(metalake.properties()); - newProps.put(PROPERTY_IN_USE, "false"); - builder.withProperties(newProps); - - return builder.build(); - }); - } - } catch (IOException e) { - throw new RuntimeException(e); - } + TreeLockUtils.doWithTreeLock( + ident, + LockType.WRITE, + () -> { + try { + boolean inUse = metalakeInUse(store, ident); + if (inUse) { + store.update( + ident, + BaseMetalake.class, + EntityType.METALAKE, + metalake -> { + BaseMetalake.Builder builder = newMetalakeBuilder(metalake); + + Map newProps = + metalake.properties() == null + ? Maps.newHashMap() + : Maps.newHashMap(metalake.properties()); + newProps.put(PROPERTY_IN_USE, "false"); + builder.withProperties(newProps); + + return builder.build(); + }); + } + return null; + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } private BaseMetalake.Builder newMetalakeBuilder(BaseMetalake metalake) { diff --git a/server/src/main/java/org/apache/gravitino/server/web/rest/CatalogOperations.java b/server/src/main/java/org/apache/gravitino/server/web/rest/CatalogOperations.java index ba934d5bdcf..5a1ebcaded8 100644 --- a/server/src/main/java/org/apache/gravitino/server/web/rest/CatalogOperations.java +++ b/server/src/main/java/org/apache/gravitino/server/web/rest/CatalogOperations.java @@ -51,8 +51,6 @@ import org.apache.gravitino.dto.responses.DropResponse; import org.apache.gravitino.dto.responses.EntityListResponse; import org.apache.gravitino.dto.util.DTOConverters; -import org.apache.gravitino.lock.LockType; -import org.apache.gravitino.lock.TreeLockUtils; import org.apache.gravitino.metrics.MetricNames; import org.apache.gravitino.server.web.Utils; import org.apache.gravitino.utils.NameIdentifierUtil; @@ -93,23 +91,17 @@ public Response listCatalogs( () -> { Namespace catalogNS = NamespaceUtil.ofCatalog(metalake); // Lock the root and the metalake with WRITE lock to ensure the consistency of the list. - return TreeLockUtils.doWithTreeLock( - NameIdentifier.of(metalake), - LockType.READ, - () -> { - if (verbose) { - Catalog[] catalogs = catalogDispatcher.listCatalogsInfo(catalogNS); - Response response = - Utils.ok(new CatalogListResponse(DTOConverters.toDTOs(catalogs))); - LOG.info("List {} catalogs info under metalake: {}", catalogs.length, metalake); - return response; - } else { - NameIdentifier[] idents = catalogDispatcher.listCatalogs(catalogNS); - Response response = Utils.ok(new EntityListResponse(idents)); - LOG.info("List {} catalogs under metalake: {}", idents.length, metalake); - return response; - } - }); + if (verbose) { + Catalog[] catalogs = catalogDispatcher.listCatalogsInfo(catalogNS); + Response response = Utils.ok(new CatalogListResponse(DTOConverters.toDTOs(catalogs))); + LOG.info("List {} catalogs info under metalake: {}", catalogs.length, metalake); + return response; + } else { + NameIdentifier[] idents = catalogDispatcher.listCatalogs(catalogNS); + Response response = Utils.ok(new EntityListResponse(idents)); + LOG.info("List {} catalogs under metalake: {}", idents.length, metalake); + return response; + } }); } catch (Exception e) { return ExceptionHandlers.handleCatalogException(OperationType.LIST, "", metalake, e); @@ -130,16 +122,12 @@ public Response createCatalog( request.validate(); NameIdentifier ident = NameIdentifierUtil.ofCatalog(metalake, request.getName()); Catalog catalog = - TreeLockUtils.doWithTreeLock( - NameIdentifierUtil.ofMetalake(metalake), - LockType.WRITE, - () -> - catalogDispatcher.createCatalog( - ident, - request.getType(), - request.getProvider(), - request.getComment(), - request.getProperties())); + catalogDispatcher.createCatalog( + ident, + request.getType(), + request.getProvider(), + request.getComment(), + request.getProperties()); Response response = Utils.ok(new CatalogResponse(DTOConverters.toDTO(catalog))); LOG.info("Catalog created: {}.{}", metalake, catalog.name()); return response; @@ -165,18 +153,12 @@ public Response testConnection( () -> { request.validate(); NameIdentifier ident = NameIdentifierUtil.ofCatalog(metalake, request.getName()); - TreeLockUtils.doWithTreeLock( - NameIdentifierUtil.ofMetalake(metalake), - LockType.READ, - () -> { - catalogDispatcher.testConnection( - ident, - request.getType(), - request.getProvider(), - request.getComment(), - request.getProperties()); - return null; - }); + catalogDispatcher.testConnection( + ident, + request.getType(), + request.getProvider(), + request.getComment(), + request.getProperties()); Response response = Utils.ok(new BaseResponse()); LOG.info( "Successfully test connection for catalog: {}.{}", metalake, request.getName()); @@ -204,17 +186,12 @@ public Response setCatalog( httpRequest, () -> { NameIdentifier ident = NameIdentifierUtil.ofCatalog(metalake, catalogName); - TreeLockUtils.doWithTreeLock( - NameIdentifierUtil.ofMetalake(metalake), - LockType.WRITE, - () -> { - if (request.isInUse()) { - catalogDispatcher.enableCatalog(ident); - } else { - catalogDispatcher.disableCatalog(ident); - } - return null; - }); + if (request.isInUse()) { + catalogDispatcher.enableCatalog(ident); + } else { + catalogDispatcher.disableCatalog(ident); + } + Response response = Utils.ok(new BaseResponse()); LOG.info( "Successfully {} catalog: {}.{}", @@ -245,9 +222,7 @@ public Response loadCatalog( LOG.info("Received load catalog request for catalog: {}.{}", metalakeName, catalogName); try { NameIdentifier ident = NameIdentifierUtil.ofCatalog(metalakeName, catalogName); - Catalog catalog = - TreeLockUtils.doWithTreeLock( - ident, LockType.READ, () -> catalogDispatcher.loadCatalog(ident)); + Catalog catalog = catalogDispatcher.loadCatalog(ident); Response response = Utils.ok(new CatalogResponse(DTOConverters.toDTO(catalog))); LOG.info("Catalog loaded: {}.{}", metalakeName, catalogName); return response; @@ -278,11 +253,7 @@ public Response alterCatalog( request.getUpdates().stream() .map(CatalogUpdateRequest::catalogChange) .toArray(CatalogChange[]::new); - Catalog catalog = - TreeLockUtils.doWithTreeLock( - NameIdentifierUtil.ofMetalake(metalakeName), - LockType.WRITE, - () -> catalogDispatcher.alterCatalog(ident, changes)); + Catalog catalog = catalogDispatcher.alterCatalog(ident, changes); Response response = Utils.ok(new CatalogResponse(DTOConverters.toDTO(catalog))); LOG.info("Catalog altered: {}.{}", metalakeName, catalog.name()); return response; @@ -309,11 +280,7 @@ public Response dropCatalog( httpRequest, () -> { NameIdentifier ident = NameIdentifierUtil.ofCatalog(metalakeName, catalogName); - boolean dropped = - TreeLockUtils.doWithTreeLock( - NameIdentifierUtil.ofMetalake(metalakeName), - LockType.WRITE, - () -> catalogDispatcher.dropCatalog(ident, force)); + boolean dropped = catalogDispatcher.dropCatalog(ident, force); if (!dropped) { LOG.warn("Failed to drop catalog {} under metalake {}", catalogName, metalakeName); } diff --git a/server/src/main/java/org/apache/gravitino/server/web/rest/FilesetOperations.java b/server/src/main/java/org/apache/gravitino/server/web/rest/FilesetOperations.java index b010f260978..713ba0c8eb3 100644 --- a/server/src/main/java/org/apache/gravitino/server/web/rest/FilesetOperations.java +++ b/server/src/main/java/org/apache/gravitino/server/web/rest/FilesetOperations.java @@ -49,8 +49,6 @@ import org.apache.gravitino.dto.util.DTOConverters; import org.apache.gravitino.file.Fileset; import org.apache.gravitino.file.FilesetChange; -import org.apache.gravitino.lock.LockType; -import org.apache.gravitino.lock.TreeLockUtils; import org.apache.gravitino.metrics.MetricNames; import org.apache.gravitino.rest.RESTUtils; import org.apache.gravitino.server.web.Utils; @@ -87,11 +85,7 @@ public Response listFilesets( httpRequest, () -> { Namespace filesetNS = NamespaceUtil.ofFileset(metalake, catalog, schema); - NameIdentifier[] idents = - TreeLockUtils.doWithTreeLock( - NameIdentifier.of(metalake, catalog, schema), - LockType.READ, - () -> dispatcher.listFilesets(filesetNS)); + NameIdentifier[] idents = dispatcher.listFilesets(filesetNS); Response response = Utils.ok(new EntityListResponse(idents)); LOG.info( "List {} filesets under schema: {}.{}.{}", @@ -131,16 +125,12 @@ public Response createFileset( NameIdentifierUtil.ofFileset(metalake, catalog, schema, request.getName()); Fileset fileset = - TreeLockUtils.doWithTreeLock( - NameIdentifierUtil.ofSchema(metalake, catalog, schema), - LockType.WRITE, - () -> - dispatcher.createFileset( - ident, - request.getComment(), - Optional.ofNullable(request.getType()).orElse(Fileset.Type.MANAGED), - request.getStorageLocation(), - request.getProperties())); + dispatcher.createFileset( + ident, + request.getComment(), + Optional.ofNullable(request.getType()).orElse(Fileset.Type.MANAGED), + request.getStorageLocation(), + request.getProperties()); Response response = Utils.ok(new FilesetResponse(DTOConverters.toDTO(fileset))); LOG.info("Fileset created: {}.{}.{}.{}", metalake, catalog, schema, request.getName()); return response; @@ -168,9 +158,7 @@ public Response loadFileset( httpRequest, () -> { NameIdentifier ident = NameIdentifierUtil.ofFileset(metalake, catalog, schema, fileset); - Fileset t = - TreeLockUtils.doWithTreeLock( - ident, LockType.READ, () -> dispatcher.loadFileset(ident)); + Fileset t = dispatcher.loadFileset(ident); Response response = Utils.ok(new FilesetResponse(DTOConverters.toDTO(t))); LOG.info("Fileset loaded: {}.{}.{}.{}", metalake, catalog, schema, fileset); return response; @@ -202,11 +190,7 @@ public Response alterFileset( request.getUpdates().stream() .map(FilesetUpdateRequest::filesetChange) .toArray(FilesetChange[]::new); - Fileset t = - TreeLockUtils.doWithTreeLock( - NameIdentifierUtil.ofSchema(metalake, catalog, schema), - LockType.WRITE, - () -> dispatcher.alterFileset(ident, changes)); + Fileset t = dispatcher.alterFileset(ident, changes); Response response = Utils.ok(new FilesetResponse(DTOConverters.toDTO(t))); LOG.info("Fileset altered: {}.{}.{}.{}", metalake, catalog, schema, t.name()); return response; @@ -233,11 +217,7 @@ public Response dropFileset( httpRequest, () -> { NameIdentifier ident = NameIdentifierUtil.ofFileset(metalake, catalog, schema, fileset); - boolean dropped = - TreeLockUtils.doWithTreeLock( - NameIdentifierUtil.ofSchema(metalake, catalog, schema), - LockType.WRITE, - () -> dispatcher.dropFileset(ident)); + boolean dropped = dispatcher.dropFileset(ident); if (!dropped) { LOG.warn("Failed to drop fileset {} under schema {}", fileset, schema); } @@ -283,10 +263,7 @@ public Response getFileLocation( CallerContext.CallerContextHolder.set(context); } String actualFileLocation = - TreeLockUtils.doWithTreeLock( - ident, - LockType.READ, - () -> dispatcher.getFileLocation(ident, RESTUtils.decodeString(subPath))); + dispatcher.getFileLocation(ident, RESTUtils.decodeString(subPath)); return Utils.ok(new FileLocationResponse(actualFileLocation)); }); } catch (Exception e) { diff --git a/server/src/main/java/org/apache/gravitino/server/web/rest/GroupOperations.java b/server/src/main/java/org/apache/gravitino/server/web/rest/GroupOperations.java index 12cf769932e..032f867418e 100644 --- a/server/src/main/java/org/apache/gravitino/server/web/rest/GroupOperations.java +++ b/server/src/main/java/org/apache/gravitino/server/web/rest/GroupOperations.java @@ -32,18 +32,14 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.Response; import org.apache.gravitino.GravitinoEnv; -import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; import org.apache.gravitino.authorization.AccessControlDispatcher; -import org.apache.gravitino.authorization.AuthorizationUtils; import org.apache.gravitino.dto.requests.GroupAddRequest; import org.apache.gravitino.dto.responses.GroupListResponse; import org.apache.gravitino.dto.responses.GroupResponse; import org.apache.gravitino.dto.responses.NameListResponse; import org.apache.gravitino.dto.responses.RemoveResponse; import org.apache.gravitino.dto.util.DTOConverters; -import org.apache.gravitino.lock.LockType; -import org.apache.gravitino.lock.TreeLockUtils; import org.apache.gravitino.metrics.MetricNames; import org.apache.gravitino.server.authorization.NameBindings; import org.apache.gravitino.server.web.Utils; @@ -78,14 +74,9 @@ public Response getGroup( return Utils.doAs( httpRequest, () -> - TreeLockUtils.doWithTreeLock( - AuthorizationUtils.ofGroup(metalake, group), - LockType.READ, - () -> - Utils.ok( - new GroupResponse( - DTOConverters.toDTO( - accessControlManager.getGroup(metalake, group)))))); + Utils.ok( + new GroupResponse( + DTOConverters.toDTO(accessControlManager.getGroup(metalake, group))))); } catch (Exception e) { return ExceptionHandlers.handleGroupException(OperationType.GET, group, metalake, e); } @@ -100,14 +91,10 @@ public Response addGroup(@PathParam("metalake") String metalake, GroupAddRequest return Utils.doAs( httpRequest, () -> - TreeLockUtils.doWithTreeLock( - NameIdentifier.of(AuthorizationUtils.ofGroupNamespace(metalake).levels()), - LockType.WRITE, - () -> - Utils.ok( - new GroupResponse( - DTOConverters.toDTO( - accessControlManager.addGroup(metalake, request.getName())))))); + Utils.ok( + new GroupResponse( + DTOConverters.toDTO( + accessControlManager.addGroup(metalake, request.getName()))))); } catch (Exception e) { return ExceptionHandlers.handleGroupException( OperationType.ADD, request.getName(), metalake, e); @@ -125,11 +112,7 @@ public Response removeGroup( return Utils.doAs( httpRequest, () -> { - boolean removed = - TreeLockUtils.doWithTreeLock( - NameIdentifier.of(AuthorizationUtils.ofGroupNamespace(metalake).levels()), - LockType.WRITE, - () -> accessControlManager.removeGroup(metalake, group)); + boolean removed = accessControlManager.removeGroup(metalake, group); if (!removed) { LOG.warn("Failed to remove group {} under metalake {}", group, metalake); } diff --git a/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectRoleOperations.java b/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectRoleOperations.java index ad27b22a3eb..39ddb353437 100644 --- a/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectRoleOperations.java +++ b/server/src/main/java/org/apache/gravitino/server/web/rest/MetadataObjectRoleOperations.java @@ -31,15 +31,11 @@ import org.apache.gravitino.GravitinoEnv; import org.apache.gravitino.MetadataObject; import org.apache.gravitino.MetadataObjects; -import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.authorization.AccessControlDispatcher; import org.apache.gravitino.dto.responses.NameListResponse; -import org.apache.gravitino.lock.LockType; -import org.apache.gravitino.lock.TreeLockUtils; import org.apache.gravitino.metrics.MetricNames; import org.apache.gravitino.server.authorization.NameBindings; import org.apache.gravitino.server.web.Utils; -import org.apache.gravitino.utils.MetadataObjectUtil; @NameBindings.AccessControlInterfaces @Path("/metalakes/{metalake}/objects/{type}/{fullName}/roles") @@ -70,18 +66,12 @@ public Response listRoles( MetadataObjects.parse( fullName, MetadataObject.Type.valueOf(type.toUpperCase(Locale.ROOT))); - NameIdentifier identifier = MetadataObjectUtil.toEntityIdent(metalake, object); return Utils.doAs( httpRequest, - () -> - TreeLockUtils.doWithTreeLock( - identifier, - LockType.READ, - () -> { - String[] names = - accessControlDispatcher.listRoleNamesByObject(metalake, object); - return Utils.ok(new NameListResponse(names)); - })); + () -> { + String[] names = accessControlDispatcher.listRoleNamesByObject(metalake, object); + return Utils.ok(new NameListResponse(names)); + }); } catch (Exception e) { return ExceptionHandlers.handleRoleException(OperationType.LIST, "", metalake, e); } diff --git a/server/src/main/java/org/apache/gravitino/server/web/rest/MetalakeOperations.java b/server/src/main/java/org/apache/gravitino/server/web/rest/MetalakeOperations.java index e28f28f15e5..f91b09f92f1 100644 --- a/server/src/main/java/org/apache/gravitino/server/web/rest/MetalakeOperations.java +++ b/server/src/main/java/org/apache/gravitino/server/web/rest/MetalakeOperations.java @@ -86,8 +86,7 @@ public Response listMetalakes() { return Utils.doAs( httpRequest, () -> { - Metalake[] metalakes = - TreeLockUtils.doWithRootTreeLock(LockType.READ, metalakeDispatcher::listMetalakes); + Metalake[] metalakes = metalakeDispatcher.listMetalakes(); MetalakeDTO[] metalakeDTOs = Arrays.stream(metalakes).map(DTOConverters::toDTO).toArray(MetalakeDTO[]::new); Response response = Utils.ok(new MetalakeListResponse(metalakeDTOs)); @@ -114,11 +113,8 @@ public Response createMetalake(MetalakeCreateRequest request) { request.validate(); NameIdentifier ident = NameIdentifierUtil.ofMetalake(request.getName()); Metalake metalake = - TreeLockUtils.doWithRootTreeLock( - LockType.WRITE, - () -> - metalakeDispatcher.createMetalake( - ident, request.getComment(), request.getProperties())); + metalakeDispatcher.createMetalake( + ident, request.getComment(), request.getProperties()); Response response = Utils.ok(new MetalakeResponse(DTOConverters.toDTO(metalake))); LOG.info("Metalake created: {}", metalake.name()); return response; @@ -166,17 +162,11 @@ public Response setMetalake(@PathParam("name") String metalakeName, MetalakeSetR httpRequest, () -> { NameIdentifier identifier = NameIdentifierUtil.ofMetalake(metalakeName); - TreeLockUtils.doWithTreeLock( - identifier, - LockType.WRITE, - () -> { - if (request.isInUse()) { - metalakeDispatcher.enableMetalake(identifier); - } else { - metalakeDispatcher.disableMetalake(identifier); - } - return null; - }); + if (request.isInUse()) { + metalakeDispatcher.enableMetalake(identifier); + } else { + metalakeDispatcher.disableMetalake(identifier); + } Response response = Utils.ok(new BaseResponse()); LOG.info( "Successfully {} metalake: {}", @@ -209,9 +199,7 @@ public Response alterMetalake( updatesRequest.getUpdates().stream() .map(MetalakeUpdateRequest::metalakeChange) .toArray(MetalakeChange[]::new); - Metalake updatedMetalake = - TreeLockUtils.doWithRootTreeLock( - LockType.WRITE, () -> metalakeDispatcher.alterMetalake(identifier, changes)); + Metalake updatedMetalake = metalakeDispatcher.alterMetalake(identifier, changes); Response response = Utils.ok(new MetalakeResponse(DTOConverters.toDTO(updatedMetalake))); LOG.info("Metalake altered: {}", updatedMetalake.name()); @@ -237,9 +225,7 @@ public Response dropMetalake( httpRequest, () -> { NameIdentifier identifier = NameIdentifierUtil.ofMetalake(metalakeName); - boolean dropped = - TreeLockUtils.doWithRootTreeLock( - LockType.WRITE, () -> metalakeDispatcher.dropMetalake(identifier, force)); + boolean dropped = metalakeDispatcher.dropMetalake(identifier, force); if (!dropped) { LOG.warn("Failed to drop metalake by name {}", metalakeName); } diff --git a/server/src/main/java/org/apache/gravitino/server/web/rest/OwnerOperations.java b/server/src/main/java/org/apache/gravitino/server/web/rest/OwnerOperations.java index ea5684b55f9..17bfc6ac7ff 100644 --- a/server/src/main/java/org/apache/gravitino/server/web/rest/OwnerOperations.java +++ b/server/src/main/java/org/apache/gravitino/server/web/rest/OwnerOperations.java @@ -33,15 +33,12 @@ import org.apache.gravitino.GravitinoEnv; import org.apache.gravitino.MetadataObject; import org.apache.gravitino.MetadataObjects; -import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.authorization.Owner; import org.apache.gravitino.authorization.OwnerManager; import org.apache.gravitino.dto.requests.OwnerSetRequest; import org.apache.gravitino.dto.responses.OwnerResponse; import org.apache.gravitino.dto.responses.SetResponse; import org.apache.gravitino.dto.util.DTOConverters; -import org.apache.gravitino.lock.LockType; -import org.apache.gravitino.lock.TreeLockUtils; import org.apache.gravitino.metrics.MetricNames; import org.apache.gravitino.server.authorization.NameBindings; import org.apache.gravitino.server.web.Utils; @@ -79,10 +76,7 @@ public Response getOwnerForObject( httpRequest, () -> { MetadataObjectUtil.checkMetadataObject(metalake, object); - NameIdentifier ident = MetadataObjectUtil.toEntityIdent(metalake, object); - Optional owner = - TreeLockUtils.doWithTreeLock( - ident, LockType.READ, () -> ownerManager.getOwner(metalake, object)); + Optional owner = ownerManager.getOwner(metalake, object); if (owner.isPresent()) { return Utils.ok(new OwnerResponse(DTOConverters.toDTO(owner.get()))); } else { @@ -114,14 +108,7 @@ public Response setOwnerForObject( httpRequest, () -> { MetadataObjectUtil.checkMetadataObject(metalake, object); - NameIdentifier objectIdent = MetadataObjectUtil.toEntityIdent(metalake, object); - TreeLockUtils.doWithTreeLock( - objectIdent, - LockType.READ, - () -> { - ownerManager.setOwner(metalake, object, request.getName(), request.getType()); - return null; - }); + ownerManager.setOwner(metalake, object, request.getName(), request.getType()); return Utils.ok(new SetResponse(true)); }); } catch (Exception e) { diff --git a/server/src/main/java/org/apache/gravitino/server/web/rest/PartitionOperations.java b/server/src/main/java/org/apache/gravitino/server/web/rest/PartitionOperations.java index 4c488db7cbd..5f545f745a4 100644 --- a/server/src/main/java/org/apache/gravitino/server/web/rest/PartitionOperations.java +++ b/server/src/main/java/org/apache/gravitino/server/web/rest/PartitionOperations.java @@ -45,8 +45,6 @@ import org.apache.gravitino.dto.responses.PartitionNameListResponse; import org.apache.gravitino.dto.responses.PartitionResponse; import org.apache.gravitino.dto.util.DTOConverters; -import org.apache.gravitino.lock.LockType; -import org.apache.gravitino.lock.TreeLockUtils; import org.apache.gravitino.metrics.MetricNames; import org.apache.gravitino.rel.partitions.Partition; import org.apache.gravitino.server.web.Utils; @@ -87,34 +85,29 @@ public Response listPartitionNames( httpRequest, () -> { NameIdentifier tableIdent = NameIdentifier.of(metalake, catalog, schema, table); - return TreeLockUtils.doWithTreeLock( - tableIdent, - LockType.READ, - () -> { - if (verbose) { - Partition[] partitions = dispatcher.listPartitions(tableIdent); - Response response = Utils.ok(new PartitionListResponse(toDTOs(partitions))); - LOG.info( - "List {} partitions in table {}.{}.{}.{}", - partitions.length, - metalake, - catalog, - schema, - table); - return response; - } else { - String[] partitionNames = dispatcher.listPartitionNames(tableIdent); - Response response = Utils.ok(new PartitionNameListResponse((partitionNames))); - LOG.info( - "List {} partition names in table {}.{}.{}.{}", - partitionNames.length, - metalake, - catalog, - schema, - table); - return response; - } - }); + if (verbose) { + Partition[] partitions = dispatcher.listPartitions(tableIdent); + Response response = Utils.ok(new PartitionListResponse(toDTOs(partitions))); + LOG.info( + "List {} partitions in table {}.{}.{}.{}", + partitions.length, + metalake, + catalog, + schema, + table); + return response; + } else { + String[] partitionNames = dispatcher.listPartitionNames(tableIdent); + Response response = Utils.ok(new PartitionNameListResponse((partitionNames))); + LOG.info( + "List {} partition names in table {}.{}.{}.{}", + partitionNames.length, + metalake, + catalog, + schema, + table); + return response; + } }); } catch (Exception e) { return ExceptionHandlers.handlePartitionException(OperationType.LIST, "", table, e); @@ -144,21 +137,16 @@ public Response getPartition( httpRequest, () -> { NameIdentifier tableIdent = NameIdentifier.of(metalake, catalog, schema, table); - return TreeLockUtils.doWithTreeLock( - tableIdent, - LockType.READ, - () -> { - Partition p = dispatcher.getPartition(tableIdent, partition); - Response response = Utils.ok(new PartitionResponse(DTOConverters.toDTO(p))); - LOG.info( - "Got partition[{}] in table[{}.{}.{}.{}]", - partition, - metalake, - catalog, - schema, - table); - return response; - }); + Partition p = dispatcher.getPartition(tableIdent, partition); + Response response = Utils.ok(new PartitionResponse(DTOConverters.toDTO(p))); + LOG.info( + "Got partition[{}] in table[{}.{}.{}.{}]", + partition, + metalake, + catalog, + schema, + table); + return response; }); } catch (Exception e) { return ExceptionHandlers.handlePartitionException(OperationType.GET, "", table, e); @@ -190,24 +178,12 @@ public Response addPartitions( httpRequest, () -> { NameIdentifier tableIdent = NameIdentifier.of(metalake, catalog, schema, table); - return TreeLockUtils.doWithTreeLock( - tableIdent, - LockType.WRITE, - () -> { - Partition p = - dispatcher.addPartition(tableIdent, fromDTO(request.getPartitions()[0])); - Response response = - Utils.ok( - new PartitionListResponse(new PartitionDTO[] {DTOConverters.toDTO(p)})); - LOG.info( - "Added {} partition(s) to table {}.{}.{}.{} ", - 1, - metalake, - catalog, - schema, - table); - return response; - }); + Partition p = dispatcher.addPartition(tableIdent, fromDTO(request.getPartitions()[0])); + Response response = + Utils.ok(new PartitionListResponse(new PartitionDTO[] {DTOConverters.toDTO(p)})); + LOG.info( + "Added {} partition(s) to table {}.{}.{}.{} ", 1, metalake, catalog, schema, table); + return response; }); } catch (Exception e) { return ExceptionHandlers.handlePartitionException(OperationType.CREATE, "", table, e); @@ -239,32 +215,27 @@ public Response dropPartition( httpRequest, () -> { NameIdentifier tableIdent = NameIdentifier.of(metalake, catalog, schema, table); - return TreeLockUtils.doWithTreeLock( - tableIdent, - LockType.WRITE, - () -> { - boolean dropped = - purge - ? dispatcher.purgePartition(tableIdent, partition) - : dispatcher.dropPartition(tableIdent, partition); - if (!dropped) { - LOG.warn( - "Failed to drop partition {} under table {} under schema {}", - partition, - table, - schema); - } - Response response = Utils.ok(new DropResponse(dropped)); - LOG.info( - "Partition {} {} in table {}.{}.{}.{}", - partition, - purge ? "purged" : "dropped", - metalake, - catalog, - schema, - table); - return response; - }); + boolean dropped = + purge + ? dispatcher.purgePartition(tableIdent, partition) + : dispatcher.dropPartition(tableIdent, partition); + if (!dropped) { + LOG.warn( + "Failed to drop partition {} under table {} under schema {}", + partition, + table, + schema); + } + Response response = Utils.ok(new DropResponse(dropped)); + LOG.info( + "Partition {} {} in table {}.{}.{}.{}", + partition, + purge ? "purged" : "dropped", + metalake, + catalog, + schema, + table); + return response; }); } catch (Exception e) { return ExceptionHandlers.handlePartitionException(OperationType.DROP, "", table, e); diff --git a/server/src/main/java/org/apache/gravitino/server/web/rest/PermissionOperations.java b/server/src/main/java/org/apache/gravitino/server/web/rest/PermissionOperations.java index 38fcd7380e6..d4d4735070d 100644 --- a/server/src/main/java/org/apache/gravitino/server/web/rest/PermissionOperations.java +++ b/server/src/main/java/org/apache/gravitino/server/web/rest/PermissionOperations.java @@ -33,7 +33,6 @@ import org.apache.gravitino.GravitinoEnv; import org.apache.gravitino.MetadataObject; import org.apache.gravitino.MetadataObjects; -import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.authorization.AccessControlDispatcher; import org.apache.gravitino.authorization.AuthorizationUtils; import org.apache.gravitino.dto.authorization.PrivilegeDTO; @@ -45,8 +44,6 @@ import org.apache.gravitino.dto.responses.RoleResponse; import org.apache.gravitino.dto.responses.UserResponse; import org.apache.gravitino.dto.util.DTOConverters; -import org.apache.gravitino.lock.LockType; -import org.apache.gravitino.lock.TreeLockUtils; import org.apache.gravitino.metrics.MetricNames; import org.apache.gravitino.server.authorization.NameBindings; import org.apache.gravitino.server.web.Utils; @@ -80,19 +77,11 @@ public Response grantRolesToUser( return Utils.doAs( httpRequest, () -> - TreeLockUtils.doWithTreeLock( - AuthorizationUtils.ofUser(metalake, user), - LockType.WRITE, - () -> - TreeLockUtils.doWithTreeLock( - NameIdentifier.of(AuthorizationUtils.ofRoleNamespace(metalake).levels()), - LockType.READ, - () -> - Utils.ok( - new UserResponse( - DTOConverters.toDTO( - accessControlManager.grantRolesToUser( - metalake, request.getRoleNames(), user))))))); + Utils.ok( + new UserResponse( + DTOConverters.toDTO( + accessControlManager.grantRolesToUser( + metalake, request.getRoleNames(), user))))); } catch (Exception e) { return ExceptionHandlers.handleUserPermissionOperationException( OperationType.GRANT, StringUtils.join(request.getRoleNames(), ","), user, e); @@ -112,19 +101,11 @@ public Response grantRolesToGroup( return Utils.doAs( httpRequest, () -> - TreeLockUtils.doWithTreeLock( - AuthorizationUtils.ofGroup(metalake, group), - LockType.WRITE, - () -> - TreeLockUtils.doWithTreeLock( - NameIdentifier.of(AuthorizationUtils.ofRoleNamespace(metalake).levels()), - LockType.READ, - () -> - Utils.ok( - new GroupResponse( - DTOConverters.toDTO( - accessControlManager.grantRolesToGroup( - metalake, request.getRoleNames(), group))))))); + Utils.ok( + new GroupResponse( + DTOConverters.toDTO( + accessControlManager.grantRolesToGroup( + metalake, request.getRoleNames(), group))))); } catch (Exception e) { return ExceptionHandlers.handleGroupPermissionOperationException( OperationType.GRANT, StringUtils.join(request.getRoleNames(), ","), group, e); @@ -144,19 +125,11 @@ public Response revokeRolesFromUser( return Utils.doAs( httpRequest, () -> - TreeLockUtils.doWithTreeLock( - AuthorizationUtils.ofUser(metalake, user), - LockType.WRITE, - () -> - TreeLockUtils.doWithTreeLock( - NameIdentifier.of(AuthorizationUtils.ofRoleNamespace(metalake).levels()), - LockType.READ, - () -> - Utils.ok( - new UserResponse( - DTOConverters.toDTO( - accessControlManager.revokeRolesFromUser( - metalake, request.getRoleNames(), user))))))); + Utils.ok( + new UserResponse( + DTOConverters.toDTO( + accessControlManager.revokeRolesFromUser( + metalake, request.getRoleNames(), user))))); } catch (Exception e) { return ExceptionHandlers.handleUserPermissionOperationException( OperationType.REVOKE, StringUtils.join(request.getRoleNames(), ","), user, e); @@ -176,19 +149,11 @@ public Response revokeRolesFromGroup( return Utils.doAs( httpRequest, () -> - TreeLockUtils.doWithTreeLock( - AuthorizationUtils.ofGroup(metalake, group), - LockType.WRITE, - () -> - TreeLockUtils.doWithTreeLock( - NameIdentifier.of(AuthorizationUtils.ofRoleNamespace(metalake).levels()), - LockType.READ, - () -> - Utils.ok( - new GroupResponse( - DTOConverters.toDTO( - accessControlManager.revokeRolesFromGroup( - metalake, request.getRoleNames(), group))))))); + Utils.ok( + new GroupResponse( + DTOConverters.toDTO( + accessControlManager.revokeRolesFromGroup( + metalake, request.getRoleNames(), group))))); } catch (Exception e) { return ExceptionHandlers.handleGroupPermissionOperationException( OperationType.REVOKE, StringUtils.join(request.getRoleNames()), group, e); @@ -219,20 +184,16 @@ public Response grantPrivilegeToRole( } MetadataObjectUtil.checkMetadataObject(metalake, object); - return TreeLockUtils.doWithTreeLock( - AuthorizationUtils.ofRole(metalake, role), - LockType.WRITE, - () -> - Utils.ok( - new RoleResponse( - DTOConverters.toDTO( - accessControlManager.grantPrivilegeToRole( - metalake, - role, - object, - privilegeGrantRequest.getPrivileges().stream() - .map(DTOConverters::fromPrivilegeDTO) - .collect(Collectors.toList())))))); + return Utils.ok( + new RoleResponse( + DTOConverters.toDTO( + accessControlManager.grantPrivilegeToRole( + metalake, + role, + object, + privilegeGrantRequest.getPrivileges().stream() + .map(DTOConverters::fromPrivilegeDTO) + .collect(Collectors.toList()))))); }); } catch (Exception e) { return ExceptionHandlers.handleRolePermissionOperationException( @@ -264,20 +225,16 @@ public Response revokePrivilegeFromRole( } MetadataObjectUtil.checkMetadataObject(metalake, object); - return TreeLockUtils.doWithTreeLock( - AuthorizationUtils.ofRole(metalake, role), - LockType.WRITE, - () -> - Utils.ok( - new RoleResponse( - DTOConverters.toDTO( - accessControlManager.revokePrivilegesFromRole( - metalake, - role, - object, - privilegeRevokeRequest.getPrivileges().stream() - .map(DTOConverters::fromPrivilegeDTO) - .collect(Collectors.toList())))))); + return Utils.ok( + new RoleResponse( + DTOConverters.toDTO( + accessControlManager.revokePrivilegesFromRole( + metalake, + role, + object, + privilegeRevokeRequest.getPrivileges().stream() + .map(DTOConverters::fromPrivilegeDTO) + .collect(Collectors.toList()))))); }); } catch (Exception e) { return ExceptionHandlers.handleRolePermissionOperationException( diff --git a/server/src/main/java/org/apache/gravitino/server/web/rest/RoleOperations.java b/server/src/main/java/org/apache/gravitino/server/web/rest/RoleOperations.java index e986753d0ce..073c0b8fe91 100644 --- a/server/src/main/java/org/apache/gravitino/server/web/rest/RoleOperations.java +++ b/server/src/main/java/org/apache/gravitino/server/web/rest/RoleOperations.java @@ -37,7 +37,6 @@ import org.apache.gravitino.GravitinoEnv; import org.apache.gravitino.MetadataObject; import org.apache.gravitino.MetadataObjects; -import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.authorization.AccessControlDispatcher; import org.apache.gravitino.authorization.AuthorizationUtils; import org.apache.gravitino.authorization.Privilege; @@ -52,8 +51,6 @@ import org.apache.gravitino.dto.util.DTOConverters; import org.apache.gravitino.exceptions.IllegalMetadataObjectException; import org.apache.gravitino.exceptions.NoSuchMetadataObjectException; -import org.apache.gravitino.lock.LockType; -import org.apache.gravitino.lock.TreeLockUtils; import org.apache.gravitino.metrics.MetricNames; import org.apache.gravitino.server.authorization.NameBindings; import org.apache.gravitino.server.web.Utils; @@ -82,14 +79,10 @@ public Response listRoles(@PathParam("metalake") String metalake) { try { return Utils.doAs( httpRequest, - () -> - TreeLockUtils.doWithTreeLock( - NameIdentifier.of(metalake), - LockType.READ, - () -> { - String[] names = accessControlManager.listRoleNames(metalake); - return Utils.ok(new NameListResponse(names)); - })); + () -> { + String[] names = accessControlManager.listRoleNames(metalake); + return Utils.ok(new NameListResponse(names)); + }); } catch (Exception e) { return ExceptionHandlers.handleRoleException(OperationType.LIST, "", metalake, e); } @@ -105,13 +98,9 @@ public Response getRole(@PathParam("metalake") String metalake, @PathParam("role return Utils.doAs( httpRequest, () -> - TreeLockUtils.doWithTreeLock( - AuthorizationUtils.ofRole(metalake, role), - LockType.READ, - () -> - Utils.ok( - new RoleResponse( - DTOConverters.toDTO(accessControlManager.getRole(metalake, role)))))); + Utils.ok( + new RoleResponse( + DTOConverters.toDTO(accessControlManager.getRole(metalake, role))))); } catch (Exception e) { return ExceptionHandlers.handleRoleException(OperationType.GET, role, metalake, e); } @@ -166,19 +155,14 @@ public Response createRole(@PathParam("metalake") String metalake, RoleCreateReq (PrivilegeDTO) privilege)) .collect(Collectors.toList()))) .collect(Collectors.toList()); - - return TreeLockUtils.doWithTreeLock( - NameIdentifier.of(AuthorizationUtils.ofRoleNamespace(metalake).levels()), - LockType.WRITE, - () -> - Utils.ok( - new RoleResponse( - DTOConverters.toDTO( - accessControlManager.createRole( - metalake, - request.getName(), - request.getProperties(), - securableObjects))))); + return Utils.ok( + new RoleResponse( + DTOConverters.toDTO( + accessControlManager.createRole( + metalake, + request.getName(), + request.getProperties(), + securableObjects)))); }); } catch (Exception e) { @@ -198,11 +182,7 @@ public Response deleteRole( return Utils.doAs( httpRequest, () -> { - boolean deleted = - TreeLockUtils.doWithTreeLock( - NameIdentifier.of(AuthorizationUtils.ofRoleNamespace(metalake).levels()), - LockType.WRITE, - () -> accessControlManager.deleteRole(metalake, role)); + boolean deleted = accessControlManager.deleteRole(metalake, role); if (!deleted) { LOG.warn("Failed to delete role {} under metalake {}", role, metalake); } diff --git a/server/src/main/java/org/apache/gravitino/server/web/rest/SchemaOperations.java b/server/src/main/java/org/apache/gravitino/server/web/rest/SchemaOperations.java index 8093da7ef79..9fda549ebfb 100644 --- a/server/src/main/java/org/apache/gravitino/server/web/rest/SchemaOperations.java +++ b/server/src/main/java/org/apache/gravitino/server/web/rest/SchemaOperations.java @@ -47,8 +47,6 @@ import org.apache.gravitino.dto.responses.EntityListResponse; import org.apache.gravitino.dto.responses.SchemaResponse; import org.apache.gravitino.dto.util.DTOConverters; -import org.apache.gravitino.lock.LockType; -import org.apache.gravitino.lock.TreeLockUtils; import org.apache.gravitino.metrics.MetricNames; import org.apache.gravitino.server.web.Utils; import org.apache.gravitino.utils.NameIdentifierUtil; @@ -84,11 +82,7 @@ public Response listSchemas( httpRequest, () -> { Namespace schemaNS = NamespaceUtil.ofSchema(metalake, catalog); - NameIdentifier[] idents = - TreeLockUtils.doWithTreeLock( - NameIdentifier.of(metalake, catalog), - LockType.READ, - () -> dispatcher.listSchemas(schemaNS)); + NameIdentifier[] idents = dispatcher.listSchemas(schemaNS); Response response = Utils.ok(new EntityListResponse(idents)); LOG.info("List {} schemas in catalog {}.{}", idents.length, metalake, catalog); return response; @@ -115,12 +109,7 @@ public Response createSchema( NameIdentifier ident = NameIdentifierUtil.ofSchema(metalake, catalog, request.getName()); Schema schema = - TreeLockUtils.doWithTreeLock( - NameIdentifierUtil.ofCatalog(metalake, catalog), - LockType.WRITE, - () -> - dispatcher.createSchema( - ident, request.getComment(), request.getProperties())); + dispatcher.createSchema(ident, request.getComment(), request.getProperties()); Response response = Utils.ok(new SchemaResponse(DTOConverters.toDTO(schema))); LOG.info("Schema created: {}.{}.{}", metalake, catalog, schema.name()); return response; @@ -179,11 +168,7 @@ public Response alterSchema( request.getUpdates().stream() .map(SchemaUpdateRequest::schemaChange) .toArray(SchemaChange[]::new); - Schema s = - TreeLockUtils.doWithTreeLock( - NameIdentifierUtil.ofCatalog(metalake, catalog), - LockType.WRITE, - () -> dispatcher.alterSchema(ident, changes)); + Schema s = dispatcher.alterSchema(ident, changes); Response response = Utils.ok(new SchemaResponse(DTOConverters.toDTO(s))); LOG.info("Schema altered: {}.{}.{}", metalake, catalog, s.name()); return response; @@ -210,11 +195,7 @@ public Response dropSchema( httpRequest, () -> { NameIdentifier ident = NameIdentifierUtil.ofSchema(metalake, catalog, schema); - boolean dropped = - TreeLockUtils.doWithTreeLock( - NameIdentifierUtil.ofCatalog(metalake, catalog), - LockType.WRITE, - () -> dispatcher.dropSchema(ident, cascade)); + boolean dropped = dispatcher.dropSchema(ident, cascade); if (!dropped) { LOG.warn("Fail to drop schema {} under namespace {}", schema, ident.namespace()); } diff --git a/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java b/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java index d5cf1ffc7be..41146d25460 100644 --- a/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java +++ b/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java @@ -46,8 +46,6 @@ import org.apache.gravitino.dto.responses.EntityListResponse; import org.apache.gravitino.dto.responses.TableResponse; import org.apache.gravitino.dto.util.DTOConverters; -import org.apache.gravitino.lock.LockType; -import org.apache.gravitino.lock.TreeLockUtils; import org.apache.gravitino.metrics.MetricNames; import org.apache.gravitino.rel.Table; import org.apache.gravitino.rel.TableChange; @@ -85,11 +83,7 @@ public Response listTables( httpRequest, () -> { Namespace tableNS = NamespaceUtil.ofTable(metalake, catalog, schema); - NameIdentifier[] idents = - TreeLockUtils.doWithTreeLock( - NameIdentifier.of(metalake, catalog, schema), - LockType.READ, - () -> dispatcher.listTables(tableNS)); + NameIdentifier[] idents = dispatcher.listTables(tableNS); Response response = Utils.ok(new EntityListResponse(idents)); LOG.info( "List {} tables under schema: {}.{}.{}", idents.length, metalake, catalog, schema); @@ -190,11 +184,7 @@ public Response alterTable( request.getUpdates().stream() .map(TableUpdateRequest::tableChange) .toArray(TableChange[]::new); - Table t = - TreeLockUtils.doWithTreeLock( - NameIdentifier.of(metalake, catalog, schema), - LockType.WRITE, - () -> dispatcher.alterTable(ident, changes)); + Table t = dispatcher.alterTable(ident, changes); Response response = Utils.ok(new TableResponse(DTOConverters.toDTO(t))); LOG.info("Table altered: {}.{}.{}.{}", metalake, catalog, schema, t.name()); return response; @@ -228,11 +218,7 @@ public Response dropTable( httpRequest, () -> { NameIdentifier ident = NameIdentifierUtil.ofTable(metalake, catalog, schema, table); - boolean dropped = - TreeLockUtils.doWithTreeLock( - NameIdentifier.of(metalake, catalog, schema), - LockType.WRITE, - () -> purge ? dispatcher.purgeTable(ident) : dispatcher.dropTable(ident)); + boolean dropped = purge ? dispatcher.purgeTable(ident) : dispatcher.dropTable(ident); if (!dropped) { LOG.warn("Failed to drop table {} under schema {}", table, schema); } diff --git a/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java b/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java index 4e9bcd55077..471a194a4f9 100644 --- a/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java +++ b/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java @@ -41,8 +41,6 @@ import org.apache.gravitino.dto.responses.EntityListResponse; import org.apache.gravitino.dto.responses.TopicResponse; import org.apache.gravitino.dto.util.DTOConverters; -import org.apache.gravitino.lock.LockType; -import org.apache.gravitino.lock.TreeLockUtils; import org.apache.gravitino.messaging.Topic; import org.apache.gravitino.messaging.TopicChange; import org.apache.gravitino.metrics.MetricNames; @@ -79,11 +77,7 @@ public Response listTopics( () -> { LOG.info("Listing topics under schema: {}.{}.{}", metalake, catalog, schema); Namespace topicNS = NamespaceUtil.ofTopic(metalake, catalog, schema); - NameIdentifier[] topics = - TreeLockUtils.doWithTreeLock( - NameIdentifier.of(metalake, catalog, schema), - LockType.READ, - () -> dispatcher.listTopics(topicNS)); + NameIdentifier[] topics = dispatcher.listTopics(topicNS); Response response = Utils.ok(new EntityListResponse(topics)); LOG.info( "List {} topics under schema: {}.{}.{}", topics.length, metalake, catalog, schema); @@ -186,11 +180,7 @@ public Response alterTopic( .map(TopicUpdateRequest::topicChange) .toArray(TopicChange[]::new); - Topic t = - TreeLockUtils.doWithTreeLock( - NameIdentifierUtil.ofSchema(metalake, catalog, schema), - LockType.WRITE, - () -> dispatcher.alterTopic(ident, changes)); + Topic t = dispatcher.alterTopic(ident, changes); Response response = Utils.ok(new TopicResponse(DTOConverters.toDTO(t))); LOG.info("Topic altered: {}.{}.{}.{}", metalake, catalog, schema, t.name()); return response; @@ -217,11 +207,7 @@ public Response dropTopic( () -> { LOG.info("Dropping topic under schema: {}.{}.{}", metalake, catalog, schema); NameIdentifier ident = NameIdentifierUtil.ofTopic(metalake, catalog, schema, topic); - boolean dropped = - TreeLockUtils.doWithTreeLock( - NameIdentifierUtil.ofSchema(metalake, catalog, schema), - LockType.WRITE, - () -> dispatcher.dropTopic(ident)); + boolean dropped = dispatcher.dropTopic(ident); if (!dropped) { LOG.warn("Failed to drop topic {} under schema {}", topic, schema); diff --git a/server/src/main/java/org/apache/gravitino/server/web/rest/UserOperations.java b/server/src/main/java/org/apache/gravitino/server/web/rest/UserOperations.java index 24f34d652ab..b722427cdd1 100644 --- a/server/src/main/java/org/apache/gravitino/server/web/rest/UserOperations.java +++ b/server/src/main/java/org/apache/gravitino/server/web/rest/UserOperations.java @@ -76,13 +76,9 @@ public Response getUser(@PathParam("metalake") String metalake, @PathParam("user return Utils.doAs( httpRequest, () -> - TreeLockUtils.doWithTreeLock( - AuthorizationUtils.ofGroup(metalake, user), - LockType.READ, - () -> - Utils.ok( - new UserResponse( - DTOConverters.toDTO(accessControlManager.getUser(metalake, user)))))); + Utils.ok( + new UserResponse( + DTOConverters.toDTO(accessControlManager.getUser(metalake, user))))); } catch (Exception e) { return ExceptionHandlers.handleUserException(OperationType.GET, user, metalake, e); } @@ -98,20 +94,15 @@ public Response listUsers( try { return Utils.doAs( httpRequest, - () -> - TreeLockUtils.doWithTreeLock( - NameIdentifier.of(AuthorizationUtils.ofUserNamespace(metalake).levels()), - LockType.READ, - () -> { - if (verbose) { - return Utils.ok( - new UserListResponse( - DTOConverters.toDTOs(accessControlManager.listUsers(metalake)))); - } else { - return Utils.ok( - new NameListResponse(accessControlManager.listUserNames(metalake))); - } - })); + () -> { + if (verbose) { + return Utils.ok( + new UserListResponse( + DTOConverters.toDTOs(accessControlManager.listUsers(metalake)))); + } else { + return Utils.ok(new NameListResponse(accessControlManager.listUserNames(metalake))); + } + }); } catch (Exception e) { return ExceptionHandlers.handleUserException(OperationType.LIST, "", metalake, e); } @@ -126,14 +117,10 @@ public Response addUser(@PathParam("metalake") String metalake, UserAddRequest r return Utils.doAs( httpRequest, () -> - TreeLockUtils.doWithTreeLock( - NameIdentifier.of(AuthorizationUtils.ofGroupNamespace(metalake).levels()), - LockType.WRITE, - () -> - Utils.ok( - new UserResponse( - DTOConverters.toDTO( - accessControlManager.addUser(metalake, request.getName())))))); + Utils.ok( + new UserResponse( + DTOConverters.toDTO( + accessControlManager.addUser(metalake, request.getName()))))); } catch (Exception e) { return ExceptionHandlers.handleUserException( OperationType.ADD, request.getName(), metalake, e);