Skip to content

Commit

Permalink
Fix raw index conversion from v4 (#14171)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Oct 6, 2024
1 parent 8334add commit f7067db
Show file tree
Hide file tree
Showing 14 changed files with 408 additions and 298 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,14 @@ public void testStarTreeTriggering() {
// Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded
@Test(enabled = false)
@Override
public void testDefaultColumns(boolean useMultiStageQueryEngineg) {
public void testDefaultColumns(boolean useMultiStageQueryEngine) {
// Ignored
}

// Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded
@Test(enabled = false)
@Override
public void testForwardIndexTriggering() {
// Ignored
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,7 @@ public int getLengthOfShortestElement() {

@Override
public int getLengthOfLargestElement() {
if (_sealed) {
return _maxLength;
}
throw new IllegalStateException("you must seal the collector first before asking for longest value");
return _maxLength;
}

@Override
Expand All @@ -106,10 +103,7 @@ public int getMaxRowLengthInBytes() {

@Override
public int getCardinality() {
if (_sealed) {
return _sortedValues.length;
}
throw new IllegalStateException("you must seal the collector first before asking for cardinality");
return _sealed ? _sortedValues.length : _values.size();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,7 @@ public int getLengthOfShortestElement() {

@Override
public int getLengthOfLargestElement() {
if (_sealed) {
return _maxLength;
}
throw new IllegalStateException("you must seal the collector first before asking for longest value");
return _maxLength;
}

@Override
Expand All @@ -118,10 +115,7 @@ public int getMaxRowLengthInBytes() {

@Override
public int getCardinality() {
if (_sealed) {
return _sortedValues.length;
}
throw new IllegalStateException("you must seal the collector first before asking for cardinality");
return _sealed ? _sortedValues.length : _values.size();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,7 @@ public Object getUniqueValuesSet() {

@Override
public int getCardinality() {
if (_sealed) {
return _sortedValues.length;
}
throw new IllegalStateException("you must seal the collector first before asking for cardinality");
return _sealed ? _sortedValues.length : _values.size();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,7 @@ public Object getUniqueValuesSet() {

@Override
public int getCardinality() {
if (_sealed) {
return _sortedValues.length;
}
throw new IllegalStateException("you must seal the collector first before asking for cardinality");
return _sealed ? _sortedValues.length : _values.size();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,7 @@ public Object getUniqueValuesSet() {

@Override
public int getCardinality() {
if (_sealed) {
return _sortedValues.length;
}
throw new IllegalStateException("you must seal the collector first before asking for cardinality");
return _sealed ? _sortedValues.length : _values.size();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,7 @@ public Object getUniqueValuesSet() {

@Override
public int getCardinality() {
if (_sealed) {
return _sortedValues.length;
}
throw new IllegalStateException("you must seal the collector first before asking for cardinality");
return _sealed ? _sortedValues.length : _values.size();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
*/
package org.apache.pinot.segment.local.segment.creator.impl.stats;

import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
import java.util.HashMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
import java.util.Arrays;
import java.util.Map;
import org.apache.pinot.common.utils.PinotDataType;
import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
Expand All @@ -44,26 +44,19 @@
* Assumptions that are made:
* 1. Each key has a single type for the value's associated with it across all documents.
* 2. At this point in the Pinot process, the type consistency of a key should already be enforced, so if a
* heterogenous value types for a key are encountered will constructing the Map statistics it can be raised as a
* fault.
* heterogeneous value types for a key are encountered will construct the Map statistics it can be raised as a fault.
*/
public class MapColumnPreIndexStatsCollector extends AbstractColumnStatisticsCollector {
private ObjectOpenHashSet<String> _keys = new ObjectOpenHashSet<>(INITIAL_HASH_SET_SIZE);
private final HashMap<String, AbstractColumnStatisticsCollector> _keyStats;
private final Object2ObjectOpenHashMap<String, AbstractColumnStatisticsCollector> _keyStats =
new Object2ObjectOpenHashMap<>(INITIAL_HASH_SET_SIZE);
private String[] _sortedValues;
private int _minLength = Integer.MAX_VALUE;
private int _maxLength = 0;
private int _maxRowLength = 0;
private String _minValue = null;
private String _maxValue = null;
private boolean _sealed = false;
private final String _column;

public MapColumnPreIndexStatsCollector(String column, StatsCollectorConfig statsCollectorConfig) {
super(column, statsCollectorConfig);
super._sorted = false;
_keyStats = new HashMap<>();
_column = column;
_sorted = false;
}

public AbstractColumnStatisticsCollector getKeyStatistics(String key) {
Expand All @@ -75,40 +68,24 @@ public void collect(Object entry) {
assert !_sealed;

if (entry instanceof Map) {
final Map<String, Object> mapValue = (Map<String, Object>) entry;
byte[] serializeMap = MapUtils.serializeMap(mapValue);
//noinspection unchecked
Map<String, Object> mapValue = (Map<String, Object>) entry;
int length = MapUtils.serializeMap(mapValue).length;
_minLength = Math.min(_minLength, length);
_maxLength = Math.max(_maxLength, length);

_maxRowLength = Math.max(_maxRowLength, serializeMap.length);
for (Map.Entry<String, Object> mapValueEntry : mapValue.entrySet()) {
final String key = mapValueEntry.getKey();

// Record statistics about the key
int length = serializeMap.length;
if (_keys.add(key)) {
String key = mapValueEntry.getKey();
Object value = mapValueEntry.getValue();
AbstractColumnStatisticsCollector keyStats = _keyStats.get(key);
if (keyStats == null) {
keyStats = createKeyStatsCollector(key, value);
_keyStats.put(key, keyStats);
if (isPartitionEnabled()) {
updatePartition(key);
}
if (_minValue == null) {
_minValue = key;
} else {
if (key.compareTo(_minValue) < 0) {
_minValue = key;
}
}
if (_maxValue == null) {
_maxValue = key;
} else {
if (key.compareTo(_maxValue) > 0) {
_maxValue = key;
}
}
_minLength = Math.min(_minLength, length);
_maxLength = Math.max(_maxLength, length);
}

// Record statistics about the value within the key
AbstractColumnStatisticsCollector keyStats = getOrCreateKeyStatsCollector(key, mapValueEntry.getValue());
keyStats.collect(mapValueEntry.getValue());
keyStats.collect(value);
}
_totalNumberOfEntries++;
} else {
Expand All @@ -119,15 +96,15 @@ public void collect(Object entry) {
@Override
public String getMinValue() {
if (_sealed) {
return _minValue;
return _sortedValues[0];
}
throw new IllegalStateException("you must seal the collector first before asking for min value");
}

@Override
public String getMaxValue() {
if (_sealed) {
return _maxValue;
return _sortedValues[_sortedValues.length - 1];
}
throw new IllegalStateException("you must seal the collector first before asking for max value");
}
Expand All @@ -147,30 +124,24 @@ public int getLengthOfShortestElement() {

@Override
public int getLengthOfLargestElement() {
if (_sealed) {
return _maxLength;
}
throw new IllegalStateException("you must seal the collector first before asking for longest value");
return _maxLength;
}

@Override
public int getMaxRowLengthInBytes() {
return _maxRowLength;
return _maxLength;
}

@Override
public int getCardinality() {
if (_sealed) {
return _sortedValues.length;
}
throw new IllegalStateException("you must seal the collector first before asking for cardinality");
return _keyStats.size();
}

@Override
public void seal() {
if (!_sealed) {
_sortedValues = _keys.stream().sorted().toArray(String[]::new);
_keys = null;
_sortedValues = _keyStats.keySet().toArray(new String[0]);
Arrays.sort(_sortedValues);

// Iterate through every key stats collector and seal them
for (AbstractColumnStatisticsCollector keyStatsCollector : _keyStats.values()) {
Expand All @@ -186,79 +157,31 @@ public void seal() {
*
* NOTE: this could raise an issue if there are millions of keys with very few values (Sparse keys, in other words).
* So a less memory intensive option may be better for this.
*
* @param key
* @param value
* @return
*/
private AbstractColumnStatisticsCollector getOrCreateKeyStatsCollector(String key, Object value) {
// Check if the key stats collector exists just return it
if (!_keyStats.containsKey(key)) {
// Get the type of the value
PinotDataType type = PinotDataType.getSingleValueType(value.getClass());
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
.setTableName(_column)
.build();
Schema keySchema = new Schema.SchemaBuilder()
.setSchemaName(key)
.addField(new DimensionFieldSpec(key, convertToDataType(type), false))
.build();
StatsCollectorConfig config = new StatsCollectorConfig(tableConfig, keySchema, null);

AbstractColumnStatisticsCollector keyStatsCollector = null;
switch (type) {
case INTEGER:
keyStatsCollector = new IntColumnPreIndexStatsCollector(key, config);
break;
case LONG:
keyStatsCollector = new LongColumnPreIndexStatsCollector(key, config);
break;
case FLOAT:
keyStatsCollector = new FloatColumnPreIndexStatsCollector(key, config);
break;
case DOUBLE:
keyStatsCollector = new DoubleColumnPreIndexStatsCollector(key, config);
break;
case BIG_DECIMAL:
keyStatsCollector = new BigDecimalColumnPreIndexStatsCollector(key, config);
break;
case STRING:
keyStatsCollector = new StringColumnPreIndexStatsCollector(key, config);
break;
case TIMESTAMP:
case BOOLEAN:
case BYTE:
case CHARACTER:
case SHORT:
case JSON:
case BYTES:
case OBJECT:
case MAP:
case BYTE_ARRAY:
case CHARACTER_ARRAY:
case SHORT_ARRAY:
case PRIMITIVE_INT_ARRAY:
case INTEGER_ARRAY:
case PRIMITIVE_LONG_ARRAY:
case LONG_ARRAY:
case PRIMITIVE_FLOAT_ARRAY:
case FLOAT_ARRAY:
case PRIMITIVE_DOUBLE_ARRAY:
case DOUBLE_ARRAY:
case BOOLEAN_ARRAY:
case TIMESTAMP_ARRAY:
case STRING_ARRAY:
case BYTES_ARRAY:
case COLLECTION:
case OBJECT_ARRAY:
default:
throw new UnsupportedOperationException(String.format("MAP column does not yet support '%s'", type));
}

_keyStats.put(key, keyStatsCollector);
private AbstractColumnStatisticsCollector createKeyStatsCollector(String key, Object value) {
// Get the type of the value
PinotDataType type = PinotDataType.getSingleValueType(value.getClass());
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(key).build();
Schema keySchema = new Schema.SchemaBuilder().setSchemaName(key)
.addField(new DimensionFieldSpec(key, convertToDataType(type), false)).build();
StatsCollectorConfig config = new StatsCollectorConfig(tableConfig, keySchema, null);

switch (type) {
case INTEGER:
return new IntColumnPreIndexStatsCollector(key, config);
case LONG:
return new LongColumnPreIndexStatsCollector(key, config);
case FLOAT:
return new FloatColumnPreIndexStatsCollector(key, config);
case DOUBLE:
return new DoubleColumnPreIndexStatsCollector(key, config);
case BIG_DECIMAL:
return new BigDecimalColumnPreIndexStatsCollector(key, config);
case STRING:
return new StringColumnPreIndexStatsCollector(key, config);
default:
throw new UnsupportedOperationException(String.format("MAP column does not yet support '%s'", type));
}

return _keyStats.get(key);
}

static FieldSpec.DataType convertToDataType(PinotDataType ty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,7 @@ public Object[] getUniqueValuesSet() {

@Override
public int getLengthOfLargestElement() {
if (_sealed) {
return _maxLength;
}
throw new IllegalStateException("you must seal the collector first before asking for longest value");
return _maxLength;
}

@Override
Expand All @@ -138,10 +135,7 @@ public int getMaxRowLengthInBytes() {

@Override
public int getCardinality() {
if (_sealed) {
return _sortedValues.length;
}
throw new IllegalStateException("you must seal the collector first before asking for cardinality");
return _sealed ? _sortedValues.length : _values.size();
}

@Override
Expand Down
Loading

0 comments on commit f7067db

Please sign in to comment.