Skip to content

Commit

Permalink
- Property overrides - po|pol|por - are not honored in v2, were worki…
Browse files Browse the repository at this point in the history
…ng in 1.6.x: #156

- Partition auto-discovery config value is misconfigured during setup: #155
- Enhancement for hms-mirror setup: #154
- Remove automatic DB location as db_name.db in db location: #153
- Add support for Hive table backup in DISTCP mode: #152

- Beta Flag to be used for future beta features.
  • Loading branch information
dstreev committed Nov 6, 2024
1 parent 72fb11f commit e22cd41
Show file tree
Hide file tree
Showing 21 changed files with 499 additions and 247 deletions.
25 changes: 25 additions & 0 deletions Writerside/topics/Transfer-Storage-Migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,31 @@ transfer:
dataMovementStrategy: "SQL|DISTCP"
```
## Skip Database Location Adjustments
When true, the database location will NOT be adjusted to match the table locations that are migrated. " +
"This is useful in the case of an 'archive' strategy where you only want to migrate the table(s) but are not yet " +
"ready to migration or stage future tables at the new location.
```yaml
transfer:
storageMigration:
skipDatabaseLocationAdjustments: true|false
```
## Create Archive
When true (default is false), the tables being migrated will be archived vs. simply changing the location details " +
"of the tables metadata. This is relevant only for STORAGE_MIGRATION when using the 'DISTCP' data movement " +
"strategy. When the data movement strategy is 'SQL' for STORAGE_MIGRATION, this flag is ignored because the default " +
"behavior is to create an archive table anyhow.
```yaml
transfer:
storageMigration:
createArchive: true|false
```
## Consolidate Source Tables
Used to help define how a `distcp` plan will be built when asked for. The default is `false` which means that will
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/cloudera/utils/hms/mirror/MessageCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ public enum MessageCode {
"Please build or create GLM's and try again."),
STORAGE_MIGRATION_NAMESPACE_LEFT_MISSING_RDL_GLM("You're using the same namespace in STORAGE_MIGRATION, without `-rdl` you'll need to " +
"ensure you have `-glm` set to map locations."),
STORAGE_MIGRATION_ACID_ARCHIVE_DISTCP("STORAGE_MIGRATION with 'distcp' can't support the direct transfer of ACID tables when an 'archive' has " +
"been requested. Use the SQL data movement strategy to handle the ACID tables if you'd like to create an archive."),
STORAGE_MIGRATION_NOT_AVAILABLE_FOR_LEGACY("Storage Migration is NOT available for Legacy Hive."),
STORAGE_MIGRATION_REQUIRED_NAMESPACE("STORAGE_MIGRATION requires -smn or -cs to define the new namespace."),
STORAGE_MIGRATION_REQUIRED_STRATEGY("STORAGE_MIGRATION requires -sms to set the Data Strategy. Applicable options " +
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/cloudera/utils/hms/mirror/MirrorConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ public interface MirrorConf {
String ALTER_TABLE_PARTITION_LOCATION =
"ALTER TABLE {0} PARTITION ({1}) SET LOCATION \"{2}\"";

String ALTER_TABLE_ADD_PARTITION_LOCATION_DESC = "Alter Table ADD Partition Spec {0} Location ";
String ALTER_TABLE_ADD_PARTITION_LOCATION =
"ALTER TABLE {0} ADD PARTITION ({1}) LOCATION \"{2}\"";

String ARCHIVE = "archive";
String SQL_DATA_TRANSFER = "FROM {0} INSERT INTO TABLE {1} SELECT *";
String SQL_DATA_TRANSFER_OVERWRITE = "FROM {0} INSERT OVERWRITE TABLE {1} SELECT *";
Expand Down

Large diffs are not rendered by default.

49 changes: 27 additions & 22 deletions src/main/java/com/cloudera/utils/hms/mirror/domain/DBMirror.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.cloudera.utils.hms.mirror.Pair;
import com.cloudera.utils.hms.mirror.PhaseState;
import com.cloudera.utils.hms.mirror.domain.support.Environment;
import com.cloudera.utils.hms.util.NamespaceUtils;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Getter;
Expand All @@ -28,6 +29,8 @@

import java.util.*;

import static com.cloudera.utils.hms.mirror.MirrorConf.DB_LOCATION;
import static com.cloudera.utils.hms.mirror.MirrorConf.DB_MANAGED_LOCATION;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;

Expand Down Expand Up @@ -75,28 +78,6 @@ public Map<String, TableMirror> getTablesByPhase(PhaseState phaseState) {
return rtn;
}

/*
Load a DBMirror instance from a yaml file using the Jackson YAML parser.
*/
// public static DBMirror load(String fileName) {
// ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
// mapper.enable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
// File dbMirrorFile = new File(fileName);
// DBMirror dbMirror = null;
// String yamlDBMirrorFile = null;
// try {
// yamlDBMirrorFile = IOUtils.toString(dbMirrorFile.toURI(), StandardCharsets.UTF_8);
// } catch (IOException e) {
// throw new RuntimeException(e);
// }
// try {
// dbMirror = mapper.readerFor(DBMirror.class).readValue(yamlDBMirrorFile);
// } catch (JsonProcessingException e) {
// throw new RuntimeException(e);
// }
// return dbMirror;
// }
//
public void addIssue(Environment environment, String issue) {
String scrubbedIssue = issue.replace("\n", "<br/>");
List<String> issuesList = issues.get(environment);
Expand Down Expand Up @@ -139,6 +120,30 @@ public String getProperty(Environment environment, String dbProperty) {
return rtn;
}

@JsonIgnore
public String getLocationDirectory() {
String location = null;
location = getProperty(Environment.LEFT, DB_LOCATION);
if (nonNull(location)) {
location = NamespaceUtils.getLastDirectory(location);
} else {
location = getName() + ".db"; // Set to the database name.
}
return location;
}

@JsonIgnore
public String getManagedLocationDirectory() {
String location = null;
location = getProperty(Environment.LEFT, DB_MANAGED_LOCATION);
if (nonNull(location)) {
location = NamespaceUtils.getLastDirectory(location);
} else {
location = getName() + ".db"; // Set to the database name.
}
return location;
}

public Map<String, String> getProperty(Environment environment) {
Map<String, String> rtn = properties.get(environment);
// if (isNull(rtn)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.cloudera.utils.hms.mirror.domain;

import com.cloudera.utils.hive.config.DBStore;
import com.cloudera.utils.hms.mirror.domain.support.*;
import com.cloudera.utils.hms.mirror.exceptions.RequiredConfigurationException;
import com.cloudera.utils.hms.mirror.feature.LegacyTranslations;
Expand Down Expand Up @@ -413,12 +414,50 @@ public static void setup(String configFile) throws IOException {
response = scanner.next();
if (response.equalsIgnoreCase("y")) {
pd.setAuto(Boolean.TRUE);
} else {
pd.setAuto(Boolean.FALSE);
}
}
System.out.println("Run 'MSCK' after table creation?(Y/N)");
response = scanner.next();
if (response.equalsIgnoreCase("y")) {
pd.setInitMSCK(Boolean.TRUE);
} else {
pd.setInitMSCK(Boolean.FALSE);
}
}
// Ask if they'd like to define the metastore direct link.
// Only on the LEFT cluster.
if (env == Environment.LEFT) {
System.out.println("----------------------------------------------------------------------------------------");
System.out.println("** A direct link to the metastore database allows for efficient collection of ");
System.out.println(" partition location metadata, which is a vital part of some migration strategies ");
System.out.println(" that need to work on partition movements. Without this, you may not have all ");
System.out.println(" the information needed to complete the migration under certain conditions and ");
System.out.println(" the functionality will be limited. **");
System.out.println("----------------------------------------------------------------------------------------");
System.out.println("Would you like to define the 'metastore direct' link? (Y/N)");
response = scanner.next();
if (response.equalsIgnoreCase("y")) {
DBStore metastoreDirect = new DBStore();
hmsMirrorConfig.getCluster(env).setMetastoreDirect(metastoreDirect);
System.out.println("What is the JDBC URI for the metastore database? ");
response = scanner.next();
metastoreDirect.setUri(response);
System.out.println("========================================================================================");
System.out.println("** NOTE: The metastore JDBC driver needs to be place in the 'aux_libs' directory before startup.");
System.out.println("========================================================================================");
// Get the driver type
System.out.print("What is the Metastore DB type for the " + env + " cluster? (" + Arrays.deepToString(DBStore.DB_TYPE.values()));
response = scanner.next();
metastoreDirect.setType(DBStore.DB_TYPE.valueOf(response.toUpperCase()));

System.out.println("Connection username?");
response = scanner.next();
metastoreDirect.getConnectionProperties().setProperty("user",response);
System.out.println("Connection password?");
response = scanner.next();
metastoreDirect.getConnectionProperties().setProperty("password",response);
}
}
}
Expand All @@ -433,6 +472,7 @@ public HmsMirrorConfig clone() {
try {
HmsMirrorConfig clone = (HmsMirrorConfig) super.clone();
// TODO: copy mutable state here, so the clone can't change the internals of the original

return clone;
} catch (CloneNotSupportedException e) {
throw new AssertionError();
Expand Down
18 changes: 11 additions & 7 deletions src/main/java/com/cloudera/utils/hms/mirror/domain/Overrides.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.Map;
import java.util.TreeMap;

import static org.apache.commons.lang3.StringUtils.isBlank;

@Slf4j
@Getter
@Setter
Expand All @@ -35,10 +37,13 @@ public class Overrides {
private Map<String, Map<SideType, String>> properties = new TreeMap<String, Map<SideType, String>>();

public void addProperty(String key, String value, SideType side) {
if (!properties.containsKey(key)) {
properties.put(key, new TreeMap<SideType, String>());
// Don't save unless both key and value are present and not blank.
if (!isBlank(key) && !isBlank(value)) {
if (!properties.containsKey(key)) {
properties.put(key, new TreeMap<SideType, String>());
}
properties.get(key).put(side, value);
}
properties.get(key).put(side, value);
}

@JsonIgnore
Expand Down Expand Up @@ -77,14 +82,13 @@ public void setPropertyOverridesStr(String[] inPropsStr, SideType side) {
if (keyValue.length == 2) {
switch (side) {
case BOTH:
getLeft().put(keyValue[0], keyValue[1]);
getRight().put(keyValue[0], keyValue[1]);
addProperty(keyValue[0],keyValue[1],SideType.BOTH);
break;
case LEFT:
getLeft().put(keyValue[0], keyValue[1]);
addProperty(keyValue[0],keyValue[1],SideType.LEFT);
break;
case RIGHT:
getRight().put(keyValue[0], keyValue[1]);
addProperty(keyValue[0],keyValue[1],SideType.RIGHT);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ public class StorageMigration implements Cloneable {
"ready to migration or stage future tables at the new location.")
private boolean skipDatabaseLocationAdjustments = Boolean.FALSE;

@Schema(description = "When true (default is false), the tables being migrated will be archived vs. simply changing the location details " +
"of the tables metadata. This is relevant only for STORAGE_MIGRATION when using the 'DISTCP' data movement " +
"strategy. When the data movement strategy is 'SQL' for STORAGE_MIGRATION, this flag is ignored because the default " +
"behavior is to create an archive table anyhow.")
private boolean createArchive = Boolean.FALSE;

@Schema(description = "When true, the tables will be consolidated into a single directory for distcp. " +
"This is useful when the source tables are spread across multiple directories.")
private boolean consolidateTablesForDistcp = Boolean.FALSE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ public enum StageEnum {
VALIDATING_CONFIG("Validating Configuration"),
VALIDATE_CONNECTION_CONFIG("Validating Connection Configuration"),
CONNECTION("Connecting to Endpoints"),
GLM_BUILD("Building GLM's from Sources"),
GATHERING_DATABASES("Gathering Databases"),
ENVIRONMENT_VARS("Retrieving Environment Variables"),
DATABASES("Loading Databases"),
TABLES("Loading Tables"),
GLM_BUILD("Building GLM's from Sources"),
CREATE_DATABASES("Creating Databases"),
LOAD_TABLE_METADATA("Loading Table Metadata"),
MIGRATE_TABLES("Migrating Tables"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,20 @@ public Boolean validate(ExecuteSession session, CliEnvironment cli) {
// Set distcp options.
canDeriveDistcpPlan(session);

// When doing STORARE_MIGRATION using DISTCP, we can alter the table location of an ACID table and 'distcp' the
// data to the new location without any other changes because the filesystem directory names will still match the
// metastores internal transactional values.
// BUT, if you've asked to save an ARCHIVE of the table and use 'distcp', we need to 'create' the new table, which
// will not maintain the ACID properties. This is an invalid state.
if (config.getDataStrategy() == STORAGE_MIGRATION) {
if (config.getTransfer().getStorageMigration().isDistcp() &&
config.getMigrateACID().isOn() &&
config.getTransfer().getStorageMigration().isCreateArchive()) {
session.addError(STORAGE_MIGRATION_ACID_ARCHIVE_DISTCP);
rtn.set(Boolean.FALSE);
}
}

// Both of these can't be set together.
if (!isBlank(config.getDbRename()) && !isBlank(config.getDbPrefix())) {
rtn.set(Boolean.FALSE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class DatabaseService {
private ConnectionPoolService connectionPoolService;
private ExecuteSessionService executeSessionService;
private QueryDefinitionsService queryDefinitionsService;
private TranslatorService translatorService;
// private TranslatorService translatorService;
private WarehouseService warehouseService;
private ConfigService configService;

Expand All @@ -84,10 +84,10 @@ public void setQueryDefinitionsService(QueryDefinitionsService queryDefinitionsS
this.queryDefinitionsService = queryDefinitionsService;
}

@Autowired
public void setTranslatorService(TranslatorService translatorService) {
this.translatorService = translatorService;
}
// @Autowired
// public void setTranslatorService(TranslatorService translatorService) {
// this.translatorService = translatorService;
// }

@Autowired
public void setWarehouseService(WarehouseService warehouseService) {
Expand Down Expand Up @@ -544,22 +544,26 @@ public boolean buildDBStatements(DBMirror dbMirror) {
if (!isBlank(originalLocation) || forceLocations) {
log.debug("Original Location: {}", originalLocation);
// Get the base location without the original namespace.
// TODO: Need to address NULL here!!!!
targetLocation = NamespaceUtils.stripNamespace(originalLocation);
log.debug("Target Location from Original Location: {}", targetLocation);
// Only set to warehouse location if the translation type is 'ALIGNED',
// otherwise we want to keep the same relative location.
if (nonNull(warehouse) && config.getTransfer().getStorageMigration().getTranslationType() == TranslationTypeEnum.ALIGNED) {
log.debug("Aligned Translation Type. Adjusting Target Location to include Warehouse Location.");
String dbDirectory = NamespaceUtils.getLastDirectory(targetLocation);
switch (warehouse.getSource()) {
case PLAN:
case GLOBAL:
targetLocation = warehouse.getExternalDirectory() + "/" + targetDatabase + ".db";
// targetLocation = warehouse.getExternalDirectory() + "/" + targetDatabase + ".db";
targetLocation = warehouse.getExternalDirectory() + "/" + dbDirectory;
break;
default:
// Didn't find an explicit location. So we're going to leave it as 'relative'.
// This handles any DB rename process.
targetLocation = NamespaceUtils.getParentDirectory(targetLocation);
targetLocation = targetLocation + "/" + targetDatabase + ".db";
// targetLocation = targetLocation + "/" + targetDatabase + ".db";
targetLocation = targetLocation + "/" + dbDirectory;
break;
}
log.debug("Target Location after Warehouse Adjustment: {}", targetLocation);
Expand Down Expand Up @@ -595,14 +599,25 @@ public boolean buildDBStatements(DBMirror dbMirror) {
log.debug("Original Managed Location: {}", originalManagedLocation);
targetManagedLocation = NamespaceUtils.stripNamespace(originalManagedLocation);
log.debug("Target Managed Location from Original Managed Location: {}", targetManagedLocation);
String dbDirectory = nonNull(targetManagedLocation)?NamespaceUtils.getLastDirectory(targetManagedLocation):
nonNull(dbMirror.getLocationDirectory())?dbMirror.getLocationDirectory():targetDatabase + ".db";

// Only set to warehouse location if the translation type is 'ALIGNED',
// otherwise we want to keep the same relative location.
if (nonNull(warehouse) && config.getTransfer().getStorageMigration().getTranslationType() == TranslationTypeEnum.ALIGNED) {
log.debug("Aligned Translation Type. Adjusting Target Managed Location to include Warehouse Location.");
switch (warehouse.getSource()) {
case PLAN:
case GLOBAL:
targetManagedLocation = warehouse.getManagedDirectory() + "/" + targetDatabase + ".db";
// targetManagedLocation = warehouse.getManagedDirectory() + "/" + targetDatabase + ".db";
targetManagedLocation = warehouse.getManagedDirectory() + "/" + dbDirectory;
break;
default:
// Didn't find an explicit location. So we're going to leave it as 'relative'.
// This handles any DB rename process.
targetManagedLocation = NamespaceUtils.getParentDirectory(targetManagedLocation);
// targetManagedLocation = targetManagedLocation + "/" + targetDatabase + ".db";
targetManagedLocation = targetManagedLocation + "/" + dbDirectory;
break;
}
log.debug("Target Managed Location after Warehouse Adjustment: {}", targetManagedLocation);
Expand Down
Loading

0 comments on commit e22cd41

Please sign in to comment.