Skip to content

Commit

Permalink
Add logging for HiveMetadataPreservingTableOperations
Browse files Browse the repository at this point in the history
  • Loading branch information
rzhang10 committed Dec 2, 2024
1 parent dc5f7e8 commit d1cd3df
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 2 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ buildscript {
}
dependencies {
classpath 'gradle.plugin.com.github.jengelman.gradle.plugins:shadow:7.0.0'
classpath 'com.palantir.baseline:gradle-baseline-java:4.0.0'
classpath 'com.palantir.baseline:gradle-baseline-java:4.40.0'
classpath 'com.palantir.gradle.gitversion:gradle-git-version:0.12.3'
classpath 'com.diffplug.spotless:spotless-plugin-gradle:6.8.0'
classpath 'gradle.plugin.org.inferred:gradle-processors:3.3.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

// inspired in part by
// https://github.com/apache/avro/blob/release-1.8.2/lang/java/guava/src/main/java/org/apache/avro/GuavaClasses.java
@SuppressWarnings("ReturnValueIgnored")
public class GuavaClasses {

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import jline.internal.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
Expand Down Expand Up @@ -568,9 +569,12 @@ protected long acquireLock() throws UnknownHostException, TException, Interrupte
Lists.newArrayList(lockComponent),
System.getProperty("user.name"),
InetAddress.getLocalHost().getHostName());
LOG.warn("In thread {}, trying to call hmsclient.lock() on table {}", Thread.currentThread(), fullName);
LockResponse lockResponse = metaClients.run(client -> client.lock(lockRequest));
LOG.warn("In thread {}, hmsclient.lock() finished on table {}", Thread.currentThread(), fullName);
AtomicReference<LockState> state = new AtomicReference<>(lockResponse.getState());
long lockId = lockResponse.getLockid();
LOG.warn("In thread {}, lockId returned from hmsclient.lock() on table {} is {}", Thread.currentThread(), fullName, lockId);

final long start = System.currentTimeMillis();
long duration = 0;
Expand Down Expand Up @@ -663,6 +667,9 @@ private void unlock(Optional<Long> lockId) {
LOG.warn("Failed to unlock {}.{}", database, tableName, e);
}
}
else {
LOG.warn("No lockId to unlock for {}.{}", database, tableName);
}
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
*/
public class HiveMetadataPreservingCatalog extends HiveCatalog {

private static final String DEFAULT_NAME = "hive_meta_preserving";
private static final String DEFAULT_NAME = "hive_meta_preserving_catalog";

public HiveMetadataPreservingCatalog() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,17 +195,23 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
ReentrantLock tableLevelMutex = commitLockCache.get(fullName, t -> new ReentrantLock(true));
tableLevelMutex.lock();
try {
LOG.warn("In thread {}, starting to create a lock for table {}", Thread.currentThread(), fullName);
lockId = Optional.of(acquireLock());
LOG.warn("In thread {}, acquired lock id: {} for table {}", Thread.currentThread(), lockId.get(), fullName);
// TODO add lock heart beating for cases where default lock timeout is too low.
Table tbl;
// [LINKEDIN] Instead of checking if base != null to check for table existence, we query
// metastore for existence
// base can be null if not Iceberg metadata exists, but Hive table exists, so we want to get
// the current table
// definition and not create a new definition
LOG.warn("In thread {}, checking if table exists {}", Thread.currentThread(), fullName);
boolean tableExists = metaClients.run(client -> client.tableExists(database, tableName));
LOG.warn("In thread {}, checking table exists finishes with result: {}", Thread.currentThread(), tableExists);
if (tableExists) {
LOG.warn("In thread {}, starting to get table {} from HMS", Thread.currentThread(), fullName);
tbl = metaClients.run(client -> client.getTable(database, tableName));
LOG.warn("In thread {}, got table {} from HMS", Thread.currentThread(), fullName);
fixMismatchedSchema(tbl);
} else {
final long currentTimeMillis = System.currentTimeMillis();
Expand Down Expand Up @@ -243,7 +249,9 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
updateMetadataLocationInHms(newMetadataLocation, tbl);

try {
LOG.warn("In thread {}, starting to persist table {}", Thread.currentThread(), fullName);
persistTable(tbl, tableExists);
LOG.warn("In thread {}, persisted table {}", Thread.currentThread(), fullName);
commitStatus = CommitStatus.SUCCESS;
} catch (Throwable persistFailure) {
LOG.error(
Expand Down Expand Up @@ -280,7 +288,9 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
throw new RuntimeException("Interrupted during commit", e);

} finally {
LOG.warn("In thread {}, trying to cleanupMetadataAndUnlock of lock id: {} for table: {}", Thread.currentThread(), lockId, fullName);
cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId, tableLevelMutex);
LOG.warn("In thread {}, cleanupMetadataAndUnlock finishes for table: {}", Thread.currentThread(), fullName);
}
}

Expand Down

0 comments on commit d1cd3df

Please sign in to comment.