Skip to content

Commit

Permalink
Create Temp Table Indexes After Temp Table Write (#7782)
Browse files Browse the repository at this point in the history
* remove temp table index creation from repeatable migration
* drop indexes before persist
* re-create indexes before upsert
Signed-off-by: Jesse Nelson <[email protected]>
  • Loading branch information
jnels124 authored Feb 26, 2024
1 parent 59a3db9 commit be044f2
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import lombok.CustomLog;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.CollectionUtils;

/**
Expand All @@ -38,6 +39,7 @@ public class BatchUpserter extends BatchInserter {

private final String finalTableName;
private final String tempTableCleanupSql;
private final String createIndexSql;
private final String upsertSql;
private final Timer upsertMetric;

Expand All @@ -48,9 +50,12 @@ public BatchUpserter(
CommonParserProperties properties,
UpsertQueryGenerator upsertQueryGenerator) {
super(entityClass, dataSource, meterRegistry, properties, upsertQueryGenerator.getTemporaryTableName());
tempTableCleanupSql = String.format("truncate table %s restart identity cascade", tableName);
var truncateSql = String.format("truncate table %s restart identity cascade", tableName);
var dropIndexSql = upsertQueryGenerator.getCleanupTempIndexQuery();
tempTableCleanupSql = StringUtils.joinWith(";\n", truncateSql, dropIndexSql);
finalTableName = upsertQueryGenerator.getFinalTableName();
upsertSql = upsertQueryGenerator.getUpsertQuery();
createIndexSql = upsertQueryGenerator.getCreateTempIndexQuery();
log.trace("Table: {}, Entity: {}, upsertSql:\n{}", finalTableName, entityClass, upsertSql);
upsertMetric = Timer.builder(LATENCY_METRIC)
.description("The time it took to batch insert rows")
Expand All @@ -72,6 +77,9 @@ protected void persistItems(Collection<?> items, Connection connection) {
// copy items to temp table
super.persistItems(items, connection);

// create index on temp table
createTempTableIndex(connection);

// Upsert items from the temporary table to the final table
upsert(connection);
} catch (Exception e) {
Expand All @@ -88,6 +96,12 @@ private void cleanupTempTable(Connection connection) throws SQLException {
log.trace("Cleaned temp table {}", tableName);
}

private void createTempTableIndex(Connection connection) throws SQLException {
try (var preparedStatement = connection.prepareStatement(createIndexSql)) {
preparedStatement.execute();
}
}

private void upsert(Connection connection) throws SQLException {
var startTime = System.nanoTime();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ select consensus_timestamp, jsonb_agg(jsonb_build_object(
""",
TEMP_TABLE_NAME, FINAL_TABLE_NAME);

@Override
public String getCreateTempIndexQuery() {
return MessageFormat.format(
"create index if not exists {0}_idx on {0} (token_id, account_id)", TEMP_TABLE_NAME);
}

@Override
public String getFinalTableName() {
return FINAL_TABLE_NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.hedera.mirror.importer.repository.upsert;

import java.io.StringWriter;
import java.text.MessageFormat;
import lombok.CustomLog;
import lombok.RequiredArgsConstructor;
import org.apache.velocity.Template;
Expand All @@ -34,6 +35,13 @@ public class GenericUpsertQueryGenerator implements UpsertQueryGenerator {

private final EntityMetadata metadata;

@Override
public String getCreateTempIndexQuery() {
String columns = metadata.columns(ColumnMetadata::isId, "{0}");
return MessageFormat.format(
"create index if not exists {0}_idx on {0} ({1})", getTemporaryTableName(), columns);
}

@Override
public String getFinalTableName() {
return metadata.getTableName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@

package com.hedera.mirror.importer.repository.upsert;

import java.text.MessageFormat;

public interface UpsertQueryGenerator {

String TEMP_SUFFIX = "_temp";

String getCreateTempIndexQuery();

default String getCleanupTempIndexQuery() {
return MessageFormat.format("drop index if exists {0}_idx", getTemporaryTableName());
}

String getFinalTableName();

default String getTemporaryTableName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,6 @@ alter table if exists ${tempSchema}.token_allowance_temp owner to temporary_admi
alter table if exists ${tempSchema}.token_temp owner to temporary_admin;
alter table if exists ${tempSchema}.topic_message_lookup_temp owner to temporary_admin;

create index if not exists contract_state_temp_idx on ${tempSchema}.contract_state_temp (contract_id,slot);
create index if not exists crypto_allowance_temp_idx on ${tempSchema}.crypto_allowance_temp (owner, spender);
create index if not exists custom_fee_temp_idx on ${tempSchema}.custom_fee_temp (token_id);
create index if not exists dissociate_token_transfer_idx on ${tempSchema}.dissociate_token_transfer (token_id, account_id);
create index if not exists entity_temp_idx on ${tempSchema}.entity_temp (id);
create index if not exists nft_allowance_temp_idx on ${tempSchema}.nft_allowance_temp (owner, spender, token_id);
create index if not exists nft_temp_idx on ${tempSchema}.nft_temp (token_id, serial_number);
create index if not exists schedule_temp_idx on ${tempSchema}.schedule_temp (schedule_id);
create index if not exists token_account_temp_idx on ${tempSchema}.token_account_temp (account_id, token_id);
create index if not exists token_allowance_temp_idx on ${tempSchema}.token_allowance_temp (owner, spender, token_id);
create index if not exists token_temp_idx on ${tempSchema}.token_temp (token_id);
create index if not exists topic_message_lookup_temp_idx on ${tempSchema}.topic_message_lookup_temp (topic_id, partition);

alter table if exists ${tempSchema}.contract_state_temp set (
autovacuum_enabled = false
);
Expand Down

0 comments on commit be044f2

Please sign in to comment.