Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNDB-12739: Fix row filter ignoring distinct index and query analyzers #1548

Merged
merged 4 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 162 additions & 44 deletions src/java/org/apache/cassandra/cql3/Operator.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ else if (otherValue == null)
// the condition value is not null, so only NEQ can return true
return operator == Operator.NEQ;
}
return operator.isSatisfiedBy(type, otherValue, value, null); // We don't use any analyzers in LWT, see CNDB-11658
return operator.isSatisfiedBy(type, otherValue, value, null, null); // We don't use any analyzers in LWT, see CNDB-11658
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public Set<ColumnMetadata> getAnalyzedColumns(IndexRegistry indexRegistry)

for (ColumnCondition condition : this)
{
if (indexRegistry.getAnalyzerFor(condition.column, condition.operator).isPresent())
if (indexRegistry.getIndexAnalyzerFor(condition.column, condition.operator).isPresent())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated to the PR, so should not block anything, but I wonder if we should add a method that checks if the column is analyzed instead of this getter, which allocates at least 2 objects per analyzer.

{
analyzedColumns.add(condition.column);
}
Expand Down
80 changes: 55 additions & 25 deletions src/java/org/apache/cassandra/db/filter/RowFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public boolean partitionKeyRestrictionsAreSatisfiedBy(DecoratedKey key, Abstract
ByteBuffer value = keyValidator instanceof CompositeType
? ((CompositeType) keyValidator).split(key.getKey())[e.column.position()]
: key.getKey();
if (!e.operator().isSatisfiedBy(e.column.type, value, e.value, e.analyzer()))
if (!e.operator().isSatisfiedBy(e.column.type, value, e.value, e.indexAnalyzer(), e.queryAnalyzer()))
return false;
}
return true;
Expand All @@ -278,7 +278,7 @@ public boolean clusteringKeyRestrictionsAreSatisfiedBy(Clustering<?> clustering)
if (!e.column.isClusteringColumn())
continue;

if (!e.operator().isSatisfiedBy(e.column.type, clustering.bufferAt(e.column.position()), e.value, e.analyzer()))
if (!e.operator().isSatisfiedBy(e.column.type, clustering.bufferAt(e.column.position()), e.value, e.indexAnalyzer(), e.queryAnalyzer()))
return false;
}
return true;
Expand Down Expand Up @@ -465,7 +465,7 @@ else if (builder.current.children.size() == 1 && builder.current.expressions.isE
public SimpleExpression add(ColumnMetadata def, Operator op, ByteBuffer value)
{
assert op != Operator.ANN : "ANN expressions should be added with the addANNExpression method";
SimpleExpression expression = new SimpleExpression(def, op, value, analyzer(def, op), null);
SimpleExpression expression = new SimpleExpression(def, op, value, indexAnalyzer(def, op), queryAnalyzer(def, op), null);
add(expression);
return expression;
}
Expand All @@ -479,18 +479,24 @@ public SimpleExpression add(ColumnMetadata def, Operator op, ByteBuffer value)
*/
public void addANNExpression(ColumnMetadata def, ByteBuffer value, ANNOptions annOptions)
{
add(new SimpleExpression(def, Operator.ANN, value, null, annOptions));
add(new SimpleExpression(def, Operator.ANN, value, null, null, annOptions));
}

public void addMapComparison(ColumnMetadata def, ByteBuffer key, Operator op, ByteBuffer value)
{
add(new MapComparisonExpression(def, key, op, value, analyzer(def, op)));
add(new MapComparisonExpression(def, key, op, value, indexAnalyzer(def, op), queryAnalyzer(def, op)));
}

@Nullable
private Index.Analyzer analyzer(ColumnMetadata def, Operator op)
private Index.Analyzer indexAnalyzer(ColumnMetadata def, Operator op)
{
return indexRegistry == null ? null : indexRegistry.getAnalyzerFor(def, op).orElse(null);
return indexRegistry == null ? null : indexRegistry.getIndexAnalyzerFor(def, op).orElse(null);
}

@Nullable
private Index.Analyzer queryAnalyzer(ColumnMetadata def, Operator op)
{
return indexRegistry == null ? null : indexRegistry.getQueryAnalyzerFor(def, op).orElse(null);
}

public void addGeoDistanceExpression(ColumnMetadata def, ByteBuffer point, Operator op, ByteBuffer distance)
Expand Down Expand Up @@ -870,7 +876,13 @@ public Operator operator()
}

@Nullable
public Index.Analyzer analyzer()
public Index.Analyzer indexAnalyzer()
{
return null;
}

@Nullable
public Index.Analyzer queryAnalyzer()
{
return null;
}
Expand Down Expand Up @@ -1046,7 +1058,9 @@ public Expression deserialize(DataInputPlus in, int version, TableMetadata metad
ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
Operator operator = Operator.readFrom(in);
ColumnMetadata column = metadata.getColumn(name);
Index.Analyzer analyzer = IndexRegistry.obtain(metadata).getAnalyzerFor(column, operator).orElse(null);
IndexRegistry indexRegistry = IndexRegistry.obtain(metadata);
Index.Analyzer indexAnalyzer = indexRegistry.getIndexAnalyzerFor(column, operator).orElse(null);
Index.Analyzer queryAnalyzer = indexRegistry.getQueryAnalyzerFor(column, operator).orElse(null);

// Compact storage tables, when used with thrift, used to allow falling through this withouot throwing an
// exception. However, since thrift was removed in 4.0, this behaviour was not restored in CASSANDRA-16217
Expand All @@ -1058,11 +1072,11 @@ public Expression deserialize(DataInputPlus in, int version, TableMetadata metad
case SIMPLE:
ByteBuffer value = ByteBufferUtil.readWithShortLength(in);
ANNOptions annOptions = operator == Operator.ANN ? ANNOptions.serializer.deserialize(in, version) : null;
return new SimpleExpression(column, operator, value, analyzer, annOptions);
return new SimpleExpression(column, operator, value, indexAnalyzer, queryAnalyzer, annOptions);
case MAP_COMPARISON:
ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
ByteBuffer val = ByteBufferUtil.readWithShortLength(in);
return new MapComparisonExpression(column, key, operator, val, analyzer);
return new MapComparisonExpression(column, key, operator, val, indexAnalyzer, queryAnalyzer);
case VECTOR_RADIUS:
Operator boundaryOperator = Operator.readFrom(in);
ByteBuffer distance = ByteBufferUtil.readWithShortLength(in);
Expand Down Expand Up @@ -1119,26 +1133,40 @@ public long serializedSize(Expression expression, int version)
public abstract static class AnalyzableExpression extends Expression
{
@Nullable
protected final Index.Analyzer analyzer;
protected final Index.Analyzer indexAnalyzer;

@Nullable
protected final Index.Analyzer queryAnalyzer;

public AnalyzableExpression(ColumnMetadata column, Operator operator, ByteBuffer value, @Nullable Index.Analyzer analyzer)
public AnalyzableExpression(ColumnMetadata column,
Operator operator,
ByteBuffer value,
@Nullable Index.Analyzer indexAnalyzer,
@Nullable Index.Analyzer queryAnalyzer)
{
super(column, operator, value);
this.analyzer = analyzer;
this.indexAnalyzer = indexAnalyzer;
this.queryAnalyzer = queryAnalyzer;
}

@Nullable
public final Index.Analyzer indexAnalyzer()
{
return indexAnalyzer;
}

@Nullable
public final Index.Analyzer analyzer()
public final Index.Analyzer queryAnalyzer()
{
return analyzer;
return queryAnalyzer;
}

@Override
public int numFilteredValues()
{
return analyzer == null
return indexAnalyzer == null
? super.numFilteredValues()
: analyzer().analyze(value).size();
: indexAnalyzer().analyze(value).size();
}
}

Expand All @@ -1153,10 +1181,11 @@ public static class SimpleExpression extends AnalyzableExpression
public SimpleExpression(ColumnMetadata column,
Operator operator,
ByteBuffer value,
@Nullable Index.Analyzer analyzer,
@Nullable Index.Analyzer indexAnalyzer,
@Nullable Index.Analyzer queryAnalyzer,
@Nullable ANNOptions annOptions)
{
super(column, operator, value, analyzer);
super(column, operator, value, indexAnalyzer, queryAnalyzer);
this.annOptions = annOptions;
}

Expand Down Expand Up @@ -1194,13 +1223,13 @@ public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey partitionKey,
return false;

ByteBuffer counterValue = LongType.instance.decompose(CounterContext.instance().total(foundValue, ByteBufferAccessor.instance));
return operator.isSatisfiedBy(LongType.instance, counterValue, value, analyzer);
return operator.isSatisfiedBy(LongType.instance, counterValue, value, indexAnalyzer, queryAnalyzer);
}
else
{
// Note that CQL expression are always of the form 'x < 4', i.e. the tested value is on the left.
ByteBuffer foundValue = getValue(metadata, partitionKey, row);
return foundValue != null && operator.isSatisfiedBy(column.type, foundValue, value, analyzer);
return foundValue != null && operator.isSatisfiedBy(column.type, foundValue, value, indexAnalyzer, queryAnalyzer);
}
}
case NEQ:
Expand All @@ -1214,7 +1243,7 @@ public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey partitionKey,
assert !column.isComplex() : "Only CONTAINS and CONTAINS_KEY are supported for 'complex' types";
ByteBuffer foundValue = getValue(metadata, partitionKey, row);
// Note that CQL expression are always of the form 'x < 4', i.e. the tested value is on the left.
return foundValue != null && operator.isSatisfiedBy(column.type, foundValue, value, analyzer);
return foundValue != null && operator.isSatisfiedBy(column.type, foundValue, value, indexAnalyzer, queryAnalyzer);
}
case CONTAINS:
return contains(metadata, partitionKey, row);
Expand Down Expand Up @@ -1345,9 +1374,10 @@ public MapComparisonExpression(ColumnMetadata column,
ByteBuffer key,
Operator operator,
ByteBuffer value,
@Nullable Index.Analyzer analyzer)
@Nullable Index.Analyzer indexAnalyzer,
@Nullable Index.Analyzer queryAnalyzer)
{
super(column, operator, value, analyzer);
super(column, operator, value, indexAnalyzer, queryAnalyzer);
assert column.type instanceof MapType && (operator == Operator.EQ || operator == Operator.NEQ || operator.isSlice());
this.key = key;
}
Expand Down
19 changes: 15 additions & 4 deletions src/java/org/apache/cassandra/index/Index.java
Original file line number Diff line number Diff line change
Expand Up @@ -431,12 +431,23 @@ default RowFilter.CustomExpression customExpressionFor(TableMetadata metadata, B
}

/**
* Returns the {@link Analyzer} for this index, if any. If the index doesn't transform the column values, this
* method will return an empty optional.
* Returns the write-time {@link Analyzer} for this index, if any. If the index doesn't transform the column values,
* this method will return an empty optional.
*
* @return the transforming column value analyzer for the index, if any
* @return the write-time transforming column value analyzer for the index, if any
*/
default Optional<Analyzer> getAnalyzer()
default Optional<Analyzer> getIndexAnalyzer()
{
return Optional.empty();
}

/**
* Returns the query-time {@link Analyzer} for this index, if any. If the index doesn't transform the column values,
* this method will return an empty optional.
*
* @return the query-time transforming column value analyzer for the index, if any
*/
default Optional<Analyzer> getQueryAnalyzer()
{
return Optional.empty();
}
Expand Down
17 changes: 15 additions & 2 deletions src/java/org/apache/cassandra/index/IndexRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

Expand Down Expand Up @@ -314,13 +315,25 @@ default void registerIndex(Index index)
Index getIndex(IndexMetadata indexMetadata);
Collection<Index> listIndexes();

default Optional<Index.Analyzer> getAnalyzerFor(ColumnMetadata column, Operator operator)
default Optional<Index.Analyzer> getIndexAnalyzerFor(ColumnMetadata column, Operator operator)
{
return getAnalyzerFor(column, operator, Index::getIndexAnalyzer);
}

default Optional<Index.Analyzer> getQueryAnalyzerFor(ColumnMetadata column, Operator operator)
{
return getAnalyzerFor(column, operator, Index::getQueryAnalyzer);
}

default Optional<Index.Analyzer> getAnalyzerFor(ColumnMetadata column,
Operator operator,
Function<Index, Optional<Index.Analyzer>> analyzerGetter)
{
for (Index index : listIndexes())
{
if (index.supportsExpression(column, operator))
{
Optional<Index.Analyzer> analyzer = index.getAnalyzer();
Optional<Index.Analyzer> analyzer = analyzerGetter.apply(index);
if (analyzer.isPresent())
return analyzer;
}
Expand Down
46 changes: 28 additions & 18 deletions src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -660,26 +660,36 @@ public AbstractType<?> customExpressionValueType()
}

@Override
public Optional<Analyzer> getAnalyzer()
public Optional<Analyzer> getIndexAnalyzer()
{
if (!indexContext.isAnalyzed())
return Optional.empty();
return indexContext.isAnalyzed()
? Optional.of(value -> analyze(indexContext.getAnalyzerFactory(), value))
: Optional.empty();
}

return Optional.of(value -> {
List<ByteBuffer> tokens = new ArrayList<>();
AbstractAnalyzer analyzer = indexContext.getQueryAnalyzerFactory().create();
try
{
analyzer.reset(value);
while (analyzer.hasNext())
tokens.add(analyzer.next());
}
finally
{
analyzer.end();
}
return tokens;
});
@Override
public Optional<Analyzer> getQueryAnalyzer()
{
return indexContext.isAnalyzed()
? Optional.of(value -> analyze(indexContext.getQueryAnalyzerFactory(), value))
: Optional.empty();
}

private static List<ByteBuffer> analyze(AbstractAnalyzer.AnalyzerFactory factory, ByteBuffer value)
{
List<ByteBuffer> tokens = new ArrayList<>();
AbstractAnalyzer analyzer = factory.create();
try
{
analyzer.reset(value.duplicate());
while (analyzer.hasNext())
tokens.add(analyzer.next());
}
finally
{
analyzer.end();
}
return tokens;
}

@Override
Expand Down
3 changes: 2 additions & 1 deletion src/java/org/apache/cassandra/index/sai/plan/Operation.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ static Node buildExpression(QueryController controller, RowFilter.Expression exp
ByteBufferAccessor.instance,
offset,
ProtocolVersion.V3),
expression.analyzer(),
expression.indexAnalyzer(),
expression.queryAnalyzer(),
expression.annOptions())));
offset += TypeSizes.INT_SIZE + ByteBufferAccessor.instance.getInt(expression.getIndexValue(), offset);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.junit.Assert;
import org.junit.BeforeClass;

import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.Feature;
Expand Down Expand Up @@ -180,4 +181,38 @@ void assertCannotStartDueToConfigurationException(Cluster cluster)
Assert.assertEquals(ConfigurationException.class.getName(), tr.getClass().getName());
}
}

/**
* Runs the given function before and after a flush of sstables. This is useful for checking that behavior is
* the same whether data is in memtables or sstables.
*
* @param cluster the tested cluster
* @param keyspace the keyspace to flush
* @param runnable the test to run
*/
public static void beforeAndAfterFlush(Cluster cluster, String keyspace, CQLTester.CheckedFunction runnable) throws Throwable
{
try
{
runnable.apply();
}
catch (Throwable t)
{
throw new AssertionError("Test failed before flush:\n" + t, t);
}

for (int i = 1; i <= cluster.size(); i++)
{
cluster.get(i).flush(keyspace);

try
{
runnable.apply();
}
catch (Throwable t)
{
throw new AssertionError("Test failed after flushing node " + i + ":\n" + t, t);
}
}
}
}
Loading