Skip to content

Commit

Permalink
STORAGE_MIGRATION checks for:
Browse files Browse the repository at this point in the history
- Previously Migrated
- Old Artifact Tables
- Validations when using -dc (table name mismatches to directory name)
- Table Status Rollups in Report
- Simplified options for -rdl, -dc, when using STORAGE_MIGRATION
- Support for STORAGE_MIGRATION in same Namespace (used to migrate/organize data to another directory)

Fixes Issue #35 #35 for STORAGE_MIGRATION strategy.
  • Loading branch information
dstreev committed May 10, 2023
1 parent 6273251 commit e0c9db1
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

<groupId>com.cloudera.utils.hadoop</groupId>
<artifactId>hms-mirror</artifactId>
<version>1.5.4.6-SNAPSHOT</version>
<version>1.5.4.7-SNAPSHOT</version>
<name>hms-mirror</name>

<url>https://github.com/cloudera-labs/hms_mirror</url>
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/cloudera/utils/hadoop/hms/Mirror.java
Original file line number Diff line number Diff line change
Expand Up @@ -1779,10 +1779,16 @@ public long go(String[] args) {
System.err.println("\nSee log for stack trace ($HOME/.hms-mirror/logs)");
} finally {
if (config != null) {
if (config.getErrors().getReturnCode() != 0) {
System.err.println("******* ERRORS *********");
}
for (String error : config.getErrors().getMessages()) {
LOG.error(error);
System.err.println(error);
}
if (config.getWarnings().getReturnCode() != 0) {
System.err.println("******* WARNINGS *********");
}
for (String warning : config.getWarnings().getMessages()) {
LOG.warn(warning);
System.err.println(warning);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ public void getTables(Config config, DBMirror dbMirror) throws SQLException {
LOG.info("Database: " + database + " Table: " + tableName + " was NOT added to list. " +
"The name matches the transfer prefix and is most likely a remnant of a previous " +
"event. If this is a mistake, change the 'transferPrefix' to something more unique.");
} else if (tableName.endsWith("storage_migration")) {
LOG.info("Database: " + database + " Table: " + tableName + " was NOT added to list. " +
"The name is the result of a previous STORAGE_MIGRATION attempt that has not been " +
"cleaned up.");
} else {
if (config.getTblRegEx() == null && config.getTblExcludeRegEx() == null) {
TableMirror tableMirror = dbMirror.addTable(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -872,8 +872,13 @@ && getMigrateACID().isDowngrade()
if (dataStrategy == DataStrategy.STORAGE_MIGRATION) {
// The commonStorage and Storage Migration Namespace are the same thing.
if (this.getTransfer().getCommonStorage() == null) {
errors.set(STORAGE_MIGRATION_REQUIRED_NAMESPACE.getCode());
rtn = Boolean.FALSE;
// Use the same namespace, we're assuming that was the intent.
this.getTransfer().setCommonStorage(getCluster(Environment.LEFT).getHcfsNamespace());
// Force reset to default location.
this.setResetToDefaultLocation(Boolean.TRUE);
warnings.set(STORAGE_MIGRATION_NAMESPACE_LEFT.getCode(), getCluster(Environment.LEFT).getHcfsNamespace());
warnings.set(STORAGE_MIGRATION_NAMESPACE_LEFT_MISSING_RDL.getCode());
// rtn = Boolean.FALSE;
}
if (this.getTransfer().getWarehouse() == null ||
(this.getTransfer().getWarehouse().getManagedDirectory() == null ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ public String toReport(Config config, String database) throws JsonProcessingExce
sb.append("none\n");
}

sb.append("\n## Table Status (").append(dbMirror.getTableMirrors().size()).append(")\n\n");
sb.append("\n## Table Status (").append(dbMirror.getTableMirrors().size()).append(") ");
sb.append(dbMirror.getPhaseSummaryString()).append("\n\n");

sb.append("*NOTE* SQL in this report may be altered by the renderer. Do NOT COPY/PASTE from this report. Use the LEFT|RIGHT_execution.sql files for accurate scripts\n\n");

Expand Down
24 changes: 23 additions & 1 deletion src/main/java/com/cloudera/utils/hadoop/hms/mirror/DBMirror.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,28 @@ public boolean isThereAnIssue() {
return issues.size() > 0 ? Boolean.TRUE : Boolean.FALSE;
}

public Map<PhaseState, Integer> getPhaseSummary() {
Map<PhaseState, Integer> rtn = new HashMap<PhaseState, Integer>();
for (String tableName: getTableMirrors().keySet()) {
TableMirror tableMirror = getTableMirrors().get(tableName);
Integer count = rtn.get(tableMirror.getPhaseState());
if (count != null)
rtn.put(tableMirror.getPhaseState(), count+1);
else
rtn.put(tableMirror.getPhaseState(), 1);
}
return rtn;
}

public String getPhaseSummaryString() {
StringBuilder sb = new StringBuilder();
Map<PhaseState, Integer> psMap = getPhaseSummary();
for (PhaseState ps: psMap.keySet()) {
sb.append(ps).append("(").append(psMap.get(ps)).append(") ");
}
return sb.toString();
}

public void addIssue(Environment environment, String issue) {
String scrubbedIssue = issue.replace("\n", "<br/>");
List<String> issuesList = issues.get(environment);
Expand Down Expand Up @@ -387,7 +409,7 @@ public void buildDBStatements(Config config) {
String alterDbMngdLoc = MessageFormat.format(MirrorConf.ALTER_DB_MNGD_LOCATION, database, sbMngdLoc.toString());
this.getSql(Environment.LEFT).add(new Pair(MirrorConf.ALTER_DB_MNGD_LOCATION_DESC, alterDbMngdLoc));

this.addIssue(Environment.LEFT,"This process, when 'executed' will leave the original tables intact in there renamed " +
this.addIssue(Environment.LEFT,"This process, when 'executed' will leave the original tables intact in their renamed " +
"version. They are NOT automatically cleaned up. Run the produced '" +
getName() + "_LEFT_CleanUp_execute.sql' " +
"file to permanently remove them. Managed and External/Purge table data will be " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public enum MessageCode {
ENVIRONMENT_CONNECTION_ISSUE(49, "There is an issue connecting to the {0} HS2 environment. Check jdbc setup."),
FLIP_WITHOUT_RIGHT(80, "You can use the 'flip' option if there isn't a RIGHT cluster defined in the configuration."),


// WARNINGS
SYNC_TBL_FILTER(50, "'sync' with 'table filter' will be bi-directional ONLY for tables that meet the table filter '"
+ "' ON BOTH SIDES!!!"),
Expand All @@ -109,8 +108,11 @@ public enum MessageCode {
RDL_DC_WARNING_TABLE_ALIGNMENT(59, "Using the options `-dc` and `-rdl` together may yield some inconsistent results. " +
"__If the 'current' table locations don't match the table name__, `distcp` will NOT realign those directories to the " +
"table names. Which means the adjusted tables may not align with the directories. See: [Issue #35](https://github.com/cloudera-labs/hms-mirror/issues/35) " +
"for work going on to address this.")

"for work going on to address this."),
STORAGE_MIGRATION_NAMESPACE_LEFT(60, "You didn't specify -smn or -cs for STORAGE_MIGRATION. We're assuming you are migrating " +
"within the same namespace {0}."),
STORAGE_MIGRATION_NAMESPACE_LEFT_MISSING_RDL(61, "You're using the same namespace in STORAGE_MIGRATION, which " +
"requires the use of `reset-to-default-location`. This feature has automatically been set.")
;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1103,12 +1103,28 @@ public Boolean buildoutSTORAGEMIGRATIONDefinition(Config config, DBMirror dbMirr
CopySpec copySpec = null;

let = getEnvironmentTable(Environment.LEFT);
ret = getEnvironmentTable(Environment.RIGHT);

if (TableUtils.getLocation(let.getName(), let.getDefinition()).startsWith(config.getTransfer().getCommonStorage())) {
addIssue(Environment.LEFT, "Table namespace already matches target MIGRATION namespace. Nothing to do.");
// Check that the table isn't already in the target location.
StringBuilder sb = new StringBuilder();
sb.append(config.getTransfer().getCommonStorage());
String warehouseDir = null;
if (TableUtils.isExternal(let)) {
// External Location
warehouseDir = config.getTransfer().getWarehouse().getExternalDirectory();
} else {
// Managed Location
warehouseDir = config.getTransfer().getWarehouse().getManagedDirectory();
}
if (!config.getTransfer().getCommonStorage().endsWith("/") && !warehouseDir.startsWith("/")) {
sb.append("/");
}
sb.append(warehouseDir);
String lLocation = TableUtils.getLocation(this.getName(), let.getDefinition());
if (lLocation.startsWith(sb.toString())) {
addIssue(Environment.LEFT, "Table has already been migrated");
return Boolean.FALSE;
}
// ret = getEnvironmentTable(Environment.RIGHT);

// Create a 'target' table definition on left cluster with right definition (used only as place holder)
copySpec = new CopySpec(config, Environment.LEFT, Environment.RIGHT);
Expand Down Expand Up @@ -1230,7 +1246,8 @@ public Boolean buildoutSTORAGEMIGRATIONSql(Config config, DBMirror dbMirror) {
String unSetSql = MessageFormat.format(MirrorConf.REMOVE_TABLE_PROP, ret.getName(), MirrorConf.TRANSLATED_TO_EXTERNAL);
let.addSql(MirrorConf.REMOVE_TABLE_PROP_DESC, unSetSql);
}
let.setName(let.getName() + "_" + getUnique());
// Set unique name for old target to rename.
let.setName(let.getName() + "_" + getUnique()+"storage_migration");
String origAlterRename = MessageFormat.format(MirrorConf.RENAME_TABLE, ret.getName(), let.getName());
let.addSql(MirrorConf.RENAME_TABLE_DESC, origAlterRename);

Expand Down Expand Up @@ -1648,6 +1665,7 @@ public Boolean buildTransferSql(EnvironmentTable source, EnvironmentTable shadow
public Boolean buildTableSchema(CopySpec copySpec) {
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Config config = copySpec.getConfig();
Boolean rtn = Boolean.TRUE;

EnvironmentTable source = getEnvironmentTable(copySpec.getSource());
EnvironmentTable target = getEnvironmentTable(copySpec.getTarget());
Expand All @@ -1670,6 +1688,20 @@ public Boolean buildTableSchema(CopySpec copySpec) {
if (copySpec.getLocation() != null)
TableUtils.updateTableLocation(target, copySpec.getLocation());

if (!TableUtils.doesTableNameMatchDirectoryName(target.getDefinition())) {
if (config.getResetToDefaultLocation()) {
target.addIssue("Tablename does NOT match last directory name. Using `rdl` will change " +
"the implied path from the original. This may affect other applications that aren't " +
"relying on the metastore.");
if (config.getTransfer().getStorageMigration().isDistcp()) {
// We need to FAIL the table to ensure we point out that there is a disconnect.
rtn = Boolean.FALSE;
target.addIssue("Tablename does NOT match last directory name. Using `dc|distcp` will copy " +
"the data but the table will not align with the directory.");
}
}
}

// 1. If Managed, convert to EXTERNAL
// When coming from legacy and going to non-legacy (Hive 3).
Boolean converted = Boolean.FALSE;
Expand Down Expand Up @@ -1910,7 +1942,7 @@ public Boolean buildTableSchema(CopySpec copySpec) {

TableUtils.fixTableDefinition(target);
}
return Boolean.TRUE;
return rtn;
}

public String getCreateStatement(Environment environment) {
Expand Down
42 changes: 42 additions & 0 deletions src/main/java/com/cloudera/utils/hadoop/hms/util/TableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
import com.cloudera.utils.hadoop.hms.mirror.Cluster;
import com.cloudera.utils.hadoop.hms.mirror.EnvironmentTable;
import com.cloudera.utils.hadoop.hms.mirror.MirrorConf;
import com.cloudera.utils.hadoop.hms.mirror.TableMirror;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.math.BigInteger;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class TableUtils {
private static final Logger LOG = LogManager.getLogger(TableUtils.class);
Expand Down Expand Up @@ -65,6 +68,8 @@ public class TableUtils {
public static final String IMPORT_TABLE = "IMPORT Table";
public static final String RENAME_TABLE = "RENAME Table";
public static final String ACID_NOT_ON = "This is an ACID table. Turn on ACID migration `-ma|--migrate-acid`.";
public static Pattern tableCreatePattern = Pattern.compile(".*TABLE `?([a-z,A-Z,_,0-9,_]+)`?\\.?`?([a-z,A-Z,_,0-9,_]+)?");
// public static Pattern dbdottable = Pattern.compile(".*`?\\.`?(.*)");

public static String getLocation(String tableName, List<String> tableDefinition) {
LOG.trace("Getting table location data for: " + tableName);
Expand All @@ -76,6 +81,43 @@ public static String getLocation(String tableName, List<String> tableDefinition)
return location;
}

public static String getTableNameFromDefinition(List<String> tableDefinition) {
String tableName = null;
for (String line: tableDefinition) {
LOG.debug("Tablename Check: " + line);
if (line.contains("CREATE")) {
Matcher matcher = tableCreatePattern.matcher(line);
if (matcher.find()) {
if (matcher.groupCount() == 2) {
tableName = matcher.group(1);
} else if (matcher.groupCount() == 3) {
tableName = matcher.group(2);
}
break;
} else {
LOG.error("Couldn't locate tablename in: " + line);
}
}
}
return tableName;
}

public static Boolean doesTableNameMatchDirectoryName(List<String> tableDefinition) {
String tableName = getTableNameFromDefinition(tableDefinition);
return doesTableNameMatchDirectoryName(tableName, tableDefinition);
}

public static Boolean doesTableNameMatchDirectoryName(String tableName, List<String> tableDefinition) {
String location = getLocation(tableName, tableDefinition);
int idx = location.lastIndexOf('/');
String dirName = location.substring(idx+1);
if (tableName.equals(dirName)) {
return Boolean.TRUE;
} else {
return Boolean.FALSE;
}
}

public static String getSerdePath(String tableName, List<String> tableDefinition) {
LOG.trace("Getting table serde path (if available) data for: " + tableName);
String location = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ public void testUpdateTableLocation() {

}

@Test
public void testTableNameDirMatch_01() {
List<String> tblDef = fromStatic(table_01);
assertFalse(TableUtils.doesTableNameMatchDirectoryName(tblDef));
}

@Test
public void testTableNameDirMatch_02() {
List<String> tblDef = fromStatic(table_04);
assertTrue(TableUtils.doesTableNameMatchDirectoryName(tblDef));
}

public List<String> fromStatic(List<String> source) {
List<String> rtn = new ArrayList<String>();
for (String line: source) {
Expand Down Expand Up @@ -306,7 +318,7 @@ public void setUp() throws Exception {
, "OUTPUTFORMAT "
, " 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'"
, "LOCATION"
, " 'hdfs://HOME90/user/dstreev/datasets/junk'"
, " 'hdfs://HOME90/user/dstreev/datasets/call_center'"
, "TBLPROPERTIES ("
, " 'bucketing_version'='2', "
, " 'transactional'='true', "
Expand Down

0 comments on commit e0c9db1

Please sign in to comment.