Skip to content

Commit

Permalink
PHOENIX-7474 Migrate IndexTool tables and make sure they are created
Browse files Browse the repository at this point in the history
  • Loading branch information
richardantal committed Nov 28, 2024
1 parent a75c4dc commit 34ff76d
Show file tree
Hide file tree
Showing 11 changed files with 690 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.mapreduce.index;



import java.io.IOException;
import java.sql.Connection;

import java.sql.SQLException;
import java.util.UUID;


import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.client.*;
import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
import org.apache.phoenix.query.QueryConstants;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.TableName;

import org.apache.phoenix.jdbc.PhoenixConnection;

import org.apache.phoenix.query.ConnectionQueryServices;

import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.phoenix.query.QueryConstants.SYSTEM_SCHEMA_NAME;

/**
* An MR job to populate the index table from the data table.
*
*/
public class IndexToolClient extends Configured {
private static final Logger LOGGER = LoggerFactory.getLogger(IndexToolClient.class);

public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL";
public static String SYSTEM_OUTPUT_TABLE_NAME = SchemaUtil.getTableName(SYSTEM_SCHEMA_NAME,
OUTPUT_TABLE_NAME);

public final static String RESULT_TABLE_NAME = "PHOENIX_INDEX_TOOL_RESULT";
public static String SYSTEM_RESULT_TABLE_NAME = SchemaUtil.getTableName(SYSTEM_SCHEMA_NAME,
RESULT_TABLE_NAME);

public static void setIndexToolTableName(Connection connection) throws Exception {
ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, queryServices.getConfiguration())) {
SYSTEM_OUTPUT_TABLE_NAME = SYSTEM_OUTPUT_TABLE_NAME.replace(
QueryConstants.NAME_SEPARATOR,
QueryConstants.NAMESPACE_SEPARATOR);
SYSTEM_RESULT_TABLE_NAME = SYSTEM_RESULT_TABLE_NAME.replace(
QueryConstants.NAME_SEPARATOR,
QueryConstants.NAMESPACE_SEPARATOR);
}
}

public static Table createResultTable(Connection connection) throws IOException, SQLException {
ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
try (Admin admin = queryServices.getAdmin()) {
TableName resultTableName = TableName.valueOf(SYSTEM_RESULT_TABLE_NAME);
return createTable(admin, resultTableName);
}
}

public static Table createOutputTable(Connection connection) throws IOException, SQLException {
ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
try (Admin admin = queryServices.getAdmin()) {
TableName outputTableName = TableName.valueOf(SYSTEM_OUTPUT_TABLE_NAME);
return createTable(admin, outputTableName);
}
}

@VisibleForTesting
private static Table createTable(Admin admin, TableName tableName) throws IOException {
if (!admin.tableExists(tableName)) {
ColumnFamilyDescriptor columnDescriptor =
ColumnFamilyDescriptorBuilder
.newBuilder(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES)
.setTimeToLive(MetaDataProtocol.DEFAULT_LOG_TTL)
.build();
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(columnDescriptor).build();
try {
admin.createTable(tableDescriptor);
} catch (TableExistsException e) {
LOGGER.warn("Table exists, ignoring", e);
}
}
return admin.getConnection().getTable(tableName);
}


public static void createNewIndexToolTables(Connection connection) throws Exception {
setIndexToolTableName(connection);

migrateTable(connection, OUTPUT_TABLE_NAME);
migrateTable(connection, RESULT_TABLE_NAME);
}

private static void migrateTable(Connection connection, String tableName) throws Exception {
if (!tableName.equals(OUTPUT_TABLE_NAME) && !tableName.equals(RESULT_TABLE_NAME)) {
LOGGER.info("Only migrating PHOENIX_INDEX_TOOL tables!");
} else {
ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
try (Admin admin = queryServices.getAdmin()) {
TableName oldTableName = TableName.valueOf(tableName);
String newTableNameString = tableName.equals(OUTPUT_TABLE_NAME) ?
SYSTEM_OUTPUT_TABLE_NAME : SYSTEM_RESULT_TABLE_NAME;

TableName newTableName = TableName.valueOf(newTableNameString);

if (admin.tableExists(oldTableName)) {
String snapshotName = tableName + "_" + UUID.randomUUID();
admin.disableTable(oldTableName);
admin.snapshot(snapshotName, oldTableName);
admin.cloneSnapshot(snapshotName, newTableName);
admin.deleteSnapshot(snapshotName);
admin.deleteTable(oldTableName);
} else {
createTable(admin, newTableName);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;

import org.apache.phoenix.mapreduce.index.IndexToolClient;

public class ConnectionQueryServicesImpl extends DelegateQueryServices implements ConnectionQueryServices {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConnectionQueryServicesImpl.class);
Expand Down Expand Up @@ -3844,6 +3846,12 @@ Collections.<Class<? extends Exception>> singletonList(
LOGGER.error("Upgrade is required. Must run 'EXECUTE UPGRADE' "
+ "before any other command");
}
try {
// check if we have old PHOENIX_INDEX_TOOL tables
// move data to the new tables under System, or simply create the new tables
IndexToolClient.createNewIndexToolTables(metaConnection);

} catch (Exception ignore) {}
}
success = true;
} catch (RetriableUpgradeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class IndexVerificationOutputRepository implements AutoCloseable {
IndexTool.IndexDisableLoggingType.NONE;
private boolean shouldLogBeyondMaxLookback = true;

public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL";
public final static String OUTPUT_TABLE_NAME = IndexToolClient.SYSTEM_OUTPUT_TABLE_NAME;
public final static byte[] OUTPUT_TABLE_NAME_BYTES = Bytes.toBytes(OUTPUT_TABLE_NAME);
public final static byte[] OUTPUT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;

Expand Down Expand Up @@ -177,26 +177,7 @@ private static byte[] generatePartialOutputTableRowKey(long ts, byte[] indexTabl
}

public void createOutputTable(Connection connection) throws IOException, SQLException {
ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
try (Admin admin = queryServices.getAdmin()) {
TableName outputTableName = TableName.valueOf(OUTPUT_TABLE_NAME);
if (!admin.tableExists(outputTableName)) {
ColumnFamilyDescriptor columnDescriptor =
ColumnFamilyDescriptorBuilder
.newBuilder(OUTPUT_TABLE_COLUMN_FAMILY)
.setTimeToLive(MetaDataProtocol.DEFAULT_LOG_TTL)
.build();
TableDescriptor tableDescriptor = TableDescriptorBuilder
.newBuilder(TableName.valueOf(OUTPUT_TABLE_NAME))
.setColumnFamily(columnDescriptor).build();
try {
admin.createTable(tableDescriptor);
} catch (TableExistsException e) {
LOGGER.warn("Table exists, ignoring", e);
}
outputTable = admin.getConnection().getTable(outputTableName);
}
}
outputTable = IndexToolClient.createOutputTable(connection);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class IndexVerificationResultRepository implements AutoCloseable {
private Table indexTable;
public static final String ROW_KEY_SEPARATOR = "|";
public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes(ROW_KEY_SEPARATOR);
public final static String RESULT_TABLE_NAME = "PHOENIX_INDEX_TOOL_RESULT";
public final static String RESULT_TABLE_NAME = IndexToolClient.SYSTEM_RESULT_TABLE_NAME;
public final static byte[] RESULT_TABLE_NAME_BYTES = Bytes.toBytes(RESULT_TABLE_NAME);
public final static byte[] RESULT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
public final static String SCANNED_DATA_ROW_COUNT = "ScannedDataRowCount";
Expand Down Expand Up @@ -163,26 +163,7 @@ public IndexVerificationResultRepository(byte[] indexName,
}

public void createResultTable(Connection connection) throws IOException, SQLException {
ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
try (Admin admin = queryServices.getAdmin()) {
TableName resultTableName = TableName.valueOf(RESULT_TABLE_NAME);
if (!admin.tableExists(resultTableName)) {
ColumnFamilyDescriptor columnDescriptor =
ColumnFamilyDescriptorBuilder
.newBuilder(RESULT_TABLE_COLUMN_FAMILY)
.setTimeToLive(MetaDataProtocol.DEFAULT_LOG_TTL)
.build();
TableDescriptor tableDescriptor =
TableDescriptorBuilder.newBuilder(resultTableName)
.setColumnFamily(columnDescriptor).build();
try {
admin.createTable(tableDescriptor);
} catch (TableExistsException e) {
LOGGER.warn("Table exists, ignoring", e);
}
resultTable = admin.getConnection().getTable(resultTableName);
}
}
resultTable = IndexToolClient.createResultTable(connection);
}

private static byte[] generatePartialResultTableRowKey(long ts, byte[] indexTableName) {
Expand Down
Loading

0 comments on commit 34ff76d

Please sign in to comment.