Skip to content

Commit

Permalink
Batch update inserted point number metric (#13910)
Browse files Browse the repository at this point in the history
  • Loading branch information
HTHou authored Oct 25, 2024
1 parent 0db4ec4 commit 5d83392
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
Expand Down Expand Up @@ -205,7 +206,7 @@ private IWritableMemChunkGroup createAlignedMemChunkGroupIfNotExistAndGet(
}

@Override
public void insert(InsertRowNode insertRowNode) {
public int insert(InsertRowNode insertRowNode) {

String[] measurements = insertRowNode.getMeasurements();
Object[] values = insertRowNode.getValues();
Expand Down Expand Up @@ -235,39 +236,11 @@ public void insert(InsertRowNode insertRowNode) {
- nullPointsNumber;

totalPointsNum += pointsInserted;

MetricService.getInstance()
.count(
pointsInserted,
Metric.QUANTITY.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
METRIC_POINT_IN,
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
dataRegionId,
Tag.TYPE.toString(),
Metric.MEMTABLE_POINT_COUNT.toString());
if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) {
MetricService.getInstance()
.count(
pointsInserted,
Metric.LEADER_QUANTITY.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
METRIC_POINT_IN,
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
dataRegionId,
Tag.TYPE.toString(),
Metric.MEMTABLE_POINT_COUNT.toString());
}
return pointsInserted;
}

@Override
public void insertAlignedRow(InsertRowNode insertRowNode) {
public int insertAlignedRow(InsertRowNode insertRowNode) {

String[] measurements = insertRowNode.getMeasurements();
Object[] values = insertRowNode.getValues();
Expand All @@ -287,47 +260,19 @@ public void insertAlignedRow(InsertRowNode insertRowNode) {
dataTypes.add(schema.getType());
}
if (schemaList.isEmpty()) {
return;
return 0;
}
memSize +=
MemUtils.getAlignedRowRecordSize(dataTypes, values, insertRowNode.getColumnCategories());
writeAlignedRow(insertRowNode.getDeviceID(), schemaList, insertRowNode.getTime(), values);
int pointsInserted =
insertRowNode.getMeasurementColumnCnt() - insertRowNode.getFailedMeasurementNumber();
totalPointsNum += pointsInserted;

MetricService.getInstance()
.count(
pointsInserted,
Metric.QUANTITY.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
METRIC_POINT_IN,
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
dataRegionId,
Tag.TYPE.toString(),
Metric.MEMTABLE_POINT_COUNT.toString());
if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) {
MetricService.getInstance()
.count(
pointsInserted,
Metric.LEADER_QUANTITY.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
METRIC_POINT_IN,
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
dataRegionId,
Tag.TYPE.toString(),
Metric.MEMTABLE_POINT_COUNT.toString());
}
return pointsInserted;
}

@Override
public void insertTablet(InsertTabletNode insertTabletNode, int start, int end)
public int insertTablet(InsertTabletNode insertTabletNode, int start, int end)
throws WriteProcessException {
try {
writeTabletNode(insertTabletNode, start, end);
Expand All @@ -336,41 +281,14 @@ public void insertTablet(InsertTabletNode insertTabletNode, int start, int end)
(insertTabletNode.getDataTypes().length - insertTabletNode.getFailedMeasurementNumber())
* (end - start);
totalPointsNum += pointsInserted;
MetricService.getInstance()
.count(
pointsInserted,
Metric.QUANTITY.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
METRIC_POINT_IN,
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
dataRegionId,
Tag.TYPE.toString(),
Metric.MEMTABLE_POINT_COUNT.toString());
if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
MetricService.getInstance()
.count(
pointsInserted,
Metric.LEADER_QUANTITY.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
METRIC_POINT_IN,
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
dataRegionId,
Tag.TYPE.toString(),
Metric.MEMTABLE_POINT_COUNT.toString());
}
return pointsInserted;
} catch (RuntimeException e) {
throw new WriteProcessException(e);
}
}

@Override
public void insertAlignedTablet(
public int insertAlignedTablet(
InsertTabletNode insertTabletNode, int start, int end, TSStatus[] results)
throws WriteProcessException {
try {
Expand All @@ -382,10 +300,31 @@ public void insertAlignedTablet(
- insertTabletNode.getFailedMeasurementNumber())
* (end - start);
totalPointsNum += pointsInserted;
return pointsInserted;
} catch (RuntimeException e) {
throw new WriteProcessException(e);
}
}

public void updateMemtablePointCountMetric(InsertNode insertNode, int pointsInserted) {
MetricService.getInstance()
.count(
pointsInserted,
Metric.QUANTITY.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
METRIC_POINT_IN,
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
dataRegionId,
Tag.TYPE.toString(),
Metric.MEMTABLE_POINT_COUNT.toString());
if (!insertNode.isGeneratedByRemoteConsensusLeader()) {
MetricService.getInstance()
.count(
pointsInserted,
Metric.QUANTITY.toString(),
Metric.LEADER_QUANTITY.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
METRIC_POINT_IN,
Expand All @@ -395,23 +334,6 @@ public void insertAlignedTablet(
dataRegionId,
Tag.TYPE.toString(),
Metric.MEMTABLE_POINT_COUNT.toString());
if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
MetricService.getInstance()
.count(
pointsInserted,
Metric.LEADER_QUANTITY.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
METRIC_POINT_IN,
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
dataRegionId,
Tag.TYPE.toString(),
Metric.MEMTABLE_POINT_COUNT.toString());
}
} catch (RuntimeException e) {
throw new WriteProcessException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus;
Expand Down Expand Up @@ -95,9 +96,9 @@ void writeAlignedRow(
*
* @param insertRowNode insertRowNode
*/
void insert(InsertRowNode insertRowNode);
int insert(InsertRowNode insertRowNode);

void insertAlignedRow(InsertRowNode insertRowNode);
int insertAlignedRow(InsertRowNode insertRowNode);

/**
* insert tablet into this memtable. The rows to be inserted are in the range [start, end). Null
Expand All @@ -108,11 +109,10 @@ void writeAlignedRow(
* @param start included
* @param end excluded
*/
void insertTablet(InsertTabletNode insertTabletNode, int start, int end)
int insertTablet(InsertTabletNode insertTabletNode, int start, int end)
throws WriteProcessException;

void insertAlignedTablet(
InsertTabletNode insertTabletNode, int start, int end, TSStatus[] results)
int insertAlignedTablet(InsertTabletNode insertTabletNode, int start, int end, TSStatus[] results)
throws WriteProcessException;

ReadOnlyMemChunk query(
Expand Down Expand Up @@ -208,4 +208,6 @@ void queryForDeviceRegionScan(
void markAsNotGeneratedByPipe();

boolean isTotallyGeneratedByPipe();

void updateMemtablePointCountMetric(InsertNode insertNode, int pointsInserted);
}
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,11 @@ public void insert(InsertRowNode insertRowNode, long[] costsForMetrics)
insertRowNode,
tsFileResource);

int pointInserted;
if (insertRowNode.isAligned()) {
workMemTable.insertAlignedRow(insertRowNode);
pointInserted = workMemTable.insertAlignedRow(insertRowNode);
} else {
workMemTable.insert(insertRowNode);
pointInserted = workMemTable.insert(insertRowNode);
}

// Update start time of this memtable
Expand All @@ -345,6 +346,7 @@ public void insert(InsertRowNode insertRowNode, long[] costsForMetrics)
if (!sequence) {
tsFileResource.updateEndTime(insertRowNode.getDeviceID(), insertRowNode.getTime());
}
workMemTable.updateMemtablePointCountMetric(insertRowNode, pointInserted);

tsFileResource.updateProgressIndex(insertRowNode.getProgressIndex());
// RecordScheduleMemTableCost
Expand Down Expand Up @@ -419,14 +421,14 @@ public void insertRows(InsertRowsNode insertRowsNode, long[] costsForMetrics)
walFlushListener.getWalEntryHandler(),
insertRowsNode,
tsFileResource);
for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {

int pointInserted = 0;
for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
if (insertRowNode.isAligned()) {
workMemTable.insertAlignedRow(insertRowNode);
pointInserted += workMemTable.insertAlignedRow(insertRowNode);
} else {
workMemTable.insert(insertRowNode);
pointInserted += workMemTable.insert(insertRowNode);
}

// update start time of this memtable
tsFileResource.updateStartTime(insertRowNode.getDeviceID(), insertRowNode.getTime());
// for sequence tsfile, we update the endTime only when the file is prepared to be closed.
Expand All @@ -435,6 +437,8 @@ public void insertRows(InsertRowsNode insertRowsNode, long[] costsForMetrics)
tsFileResource.updateEndTime(insertRowNode.getDeviceID(), insertRowNode.getTime());
}
}
workMemTable.updateMemtablePointCountMetric(insertRowsNode, pointInserted);

tsFileResource.updateProgressIndex(insertRowsNode.getProgressIndex());
// recordScheduleMemTableCost
costsForMetrics[3] += System.nanoTime() - startTime;
Expand Down Expand Up @@ -584,15 +588,17 @@ public void insertTablet(
insertTabletNode,
tsFileResource);

int pointInserted = 0;
for (int[] rangePair : rangeList) {
int start = rangePair[0];
int end = rangePair[1];
try {
if (insertTabletNode.isAligned()) {
workMemTable.insertAlignedTablet(
insertTabletNode, start, end, noFailure ? null : results);
pointInserted +=
workMemTable.insertAlignedTablet(
insertTabletNode, start, end, noFailure ? null : results);
} else {
workMemTable.insertTablet(insertTabletNode, start, end);
pointInserted += workMemTable.insertTablet(insertTabletNode, start, end);
}
} catch (WriteProcessException e) {
for (int i = start; i < end; i++) {
Expand Down Expand Up @@ -627,7 +633,7 @@ public void insertTablet(
}
}
}

workMemTable.updateMemtablePointCountMetric(insertTabletNode, pointInserted);
tsFileResource.updateProgressIndex(insertTabletNode.getProgressIndex());

// recordScheduleMemTableCost
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,29 @@ void redoInsert(InsertNode node) throws WriteProcessException {
}
}

int pointsInserted;
if (node instanceof InsertRowNode) {
if (node.isAligned()) {
recoveryMemTable.insertAlignedRow((InsertRowNode) node);
pointsInserted = recoveryMemTable.insertAlignedRow((InsertRowNode) node);
} else {
recoveryMemTable.insert((InsertRowNode) node);
pointsInserted = recoveryMemTable.insert((InsertRowNode) node);
}
} else {
if (node.isAligned()) {
recoveryMemTable.insertAlignedTablet(
(InsertTabletNode) node, 0, ((InsertTabletNode) node).getRowCount(), null);
pointsInserted =
recoveryMemTable.insertAlignedTablet(
(InsertTabletNode) node, 0, ((InsertTabletNode) node).getRowCount(), null);
} else {
recoveryMemTable.insertTablet(
(InsertTabletNode) node, 0, ((InsertTabletNode) node).getRowCount());
pointsInserted =
recoveryMemTable.insertTablet(
(InsertTabletNode) node, 0, ((InsertTabletNode) node).getRowCount());
}
}
recoveryMemTable.updateMemtablePointCountMetric(node, pointsInserted);
}

void redoInsertRows(InsertRowsNode insertRowsNode) {
int pointsInserted = 0;
for (InsertRowNode node : insertRowsNode.getInsertRowNodeList()) {
if (!node.hasValidMeasurements()) {
continue;
Expand All @@ -125,11 +130,12 @@ void redoInsertRows(InsertRowsNode insertRowsNode) {
}
}
if (node.isAligned()) {
recoveryMemTable.insertAlignedRow(node);
pointsInserted += recoveryMemTable.insertAlignedRow(node);
} else {
recoveryMemTable.insert(node);
pointsInserted += recoveryMemTable.insert(node);
}
}
recoveryMemTable.updateMemtablePointCountMetric(insertRowsNode, pointsInserted);
}

void resetRecoveryMemTable(IMemTable memTable) {
Expand Down

0 comments on commit 5d83392

Please sign in to comment.