diff --git a/src/test/resources/tpcds/hyperspace/approved-plans-v1_4/q0/explain.txt b/src/test/resources/tpcds/hyperspace/approved-plans-v1_4/q0/explain.txt new file mode 100644 index 000000000..b17bdec96 --- /dev/null +++ b/src/test/resources/tpcds/hyperspace/approved-plans-v1_4/q0/explain.txt @@ -0,0 +1,10 @@ +== Physical Plan == +CollectLimit 100 ++- *(3) Project [d_year#1, ss_customer_sk#2] + +- *(3) SortMergeJoin [d_date_sk#3], [ss_sold_date_sk#4], Inner + :- *(1) Project [d_date_sk#3, d_year#1] + : +- *(1) Filter isnotnull(d_date_sk#3) + : +- *(1) FileScan Hyperspace(Type: CI, Name: dtindex, LogVersion: 0) default.date_dim[d_date_sk#3,d_year#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(d_date_sk)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200 + +- *(2) Project [ss_sold_date_sk#4, ss_customer_sk#2] + +- *(2) Filter isnotnull(ss_sold_date_sk#4) + +- *(2) FileScan Hyperspace(Type: CI, Name: ssIndex, LogVersion: 0) default.store_sales[ss_sold_date_sk#4,ss_customer_sk#2] Batched: true, Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(ss_sold_date_sk)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200 \ No newline at end of file diff --git a/src/test/resources/tpcds/hyperspace/approved-plans-v1_4/q0/simplified.txt b/src/test/resources/tpcds/hyperspace/approved-plans-v1_4/q0/simplified.txt new file mode 100644 index 000000000..317695b4c --- /dev/null +++ b/src/test/resources/tpcds/hyperspace/approved-plans-v1_4/q0/simplified.txt @@ -0,0 +1,14 @@ +CollectLimit + WholeStageCodegen + Project [d_year,ss_customer_sk] + SortMergeJoin [d_date_sk,ss_sold_date_sk] + InputAdapter + WholeStageCodegen + Project [d_date_sk,d_year] + Filter [d_date_sk] + Scan Hyperspace(Type: CI, Name: dtindex, LogVersion: 0) default.date_dim [d_date_sk,d_year] [d_date_sk,d_year] + InputAdapter + WholeStageCodegen + Project [ss_customer_sk,ss_sold_date_sk] + Filter [ss_sold_date_sk] + Scan Hyperspace(Type: CI, Name: ssIndex, LogVersion: 0) default.store_sales [ss_customer_sk,ss_sold_date_sk] [ss_customer_sk,ss_sold_date_sk] diff --git a/src/test/resources/tpcds/hyperspace/diffsWithSpark/q0/explain.txt b/src/test/resources/tpcds/hyperspace/diffsWithSpark/q0/explain.txt new file mode 100644 index 000000000..a9adddcd4 --- /dev/null +++ b/src/test/resources/tpcds/hyperspace/diffsWithSpark/q0/explain.txt @@ -0,0 +1,28 @@ +1,14c1,10 +< == Physical Plan == +< CollectLimit 100 +< +- *(5) Project [d_year#1, ss_customer_sk#2] +< +- *(5) SortMergeJoin [d_date_sk#3], [ss_sold_date_sk#4], Inner +< :- *(2) Sort [d_date_sk#3 ASC NULLS FIRST], false, 0 +< : +- Exchange hashpartitioning(d_date_sk#3, 200) +< : +- *(1) Project [d_date_sk#3, d_year#1] +< : +- *(1) Filter isnotnull(d_date_sk#3) +< : +- *(1) FileScan parquet default.date_dim[d_date_sk#3,d_year#1] Batched: true, Format: Parquet, Location [not included in comparison]/{warehouse_dir}/date_dim], PartitionFilters: [], PushedFilters: [IsNotNull(d_date_sk)], ReadSchema: struct +< +- *(4) Sort [ss_sold_date_sk#4 ASC NULLS FIRST], false, 0 +< +- Exchange hashpartitioning(ss_sold_date_sk#4, 200) +< +- *(3) Project [ss_sold_date_sk#4, ss_customer_sk#2] +< +- *(3) Filter isnotnull(ss_sold_date_sk#4) +< +- *(3) FileScan parquet default.store_sales[ss_sold_date_sk#4,ss_customer_sk#2] Batched: true, Format: Parquet, Location [not included in comparison]/{warehouse_dir}/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_sold_date_sk)], ReadSchema: struct +\ No newline at end of file +--- +> == Physical Plan == +> CollectLimit 100 +> +- *(3) Project [d_year#1, ss_customer_sk#2] +> +- *(3) SortMergeJoin [d_date_sk#3], [ss_sold_date_sk#4], Inner +> :- *(1) Project [d_date_sk#3, d_year#1] +> : +- *(1) Filter isnotnull(d_date_sk#3) +> : +- *(1) FileScan Hyperspace(Type: CI, Name: dtindex, LogVersion: 0) default.date_dim[d_date_sk#3,d_year#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(d_date_sk)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200 +> +- *(2) Project [ss_sold_date_sk#4, ss_customer_sk#2] +> +- *(2) Filter isnotnull(ss_sold_date_sk#4) +> +- *(2) FileScan Hyperspace(Type: CI, Name: ssIndex, LogVersion: 0) default.store_sales[ss_sold_date_sk#4,ss_customer_sk#2] Batched: true, Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(ss_sold_date_sk)], ReadSchema: struct, SelectedBucketsCount: 200 out of 200 +\ No newline at end of file diff --git a/src/test/resources/tpcds/hyperspace/diffsWithSpark/q0/simplified.txt b/src/test/resources/tpcds/hyperspace/diffsWithSpark/q0/simplified.txt new file mode 100644 index 000000000..edb77f0d1 --- /dev/null +++ b/src/test/resources/tpcds/hyperspace/diffsWithSpark/q0/simplified.txt @@ -0,0 +1,38 @@ +1,22c1,14 +< CollectLimit +< WholeStageCodegen +< Project [d_year,ss_customer_sk] +< SortMergeJoin [d_date_sk,ss_sold_date_sk] +< InputAdapter +< WholeStageCodegen +< Sort [d_date_sk] +< InputAdapter +< Exchange [d_date_sk] #1 +< WholeStageCodegen +< Project [d_date_sk,d_year] +< Filter [d_date_sk] +< Scan parquet default.date_dim [d_date_sk,d_year] [d_date_sk,d_year] +< InputAdapter +< WholeStageCodegen +< Sort [ss_sold_date_sk] +< InputAdapter +< Exchange [ss_sold_date_sk] #2 +< WholeStageCodegen +< Project [ss_customer_sk,ss_sold_date_sk] +< Filter [ss_sold_date_sk] +< Scan parquet default.store_sales [ss_customer_sk,ss_sold_date_sk] [ss_customer_sk,ss_sold_date_sk] +--- +> CollectLimit +> WholeStageCodegen +> Project [d_year,ss_customer_sk] +> SortMergeJoin [d_date_sk,ss_sold_date_sk] +> InputAdapter +> WholeStageCodegen +> Project [d_date_sk,d_year] +> Filter [d_date_sk] +> Scan Hyperspace(Type: CI, Name: dtindex, LogVersion: 0) default.date_dim [d_date_sk,d_year] [d_date_sk,d_year] +> InputAdapter +> WholeStageCodegen +> Project [ss_customer_sk,ss_sold_date_sk] +> Filter [ss_sold_date_sk] +> Scan Hyperspace(Type: CI, Name: ssIndex, LogVersion: 0) default.store_sales [ss_customer_sk,ss_sold_date_sk] [ss_customer_sk,ss_sold_date_sk] diff --git a/src/test/resources/tpcds/hyperspace/withoutHyperspace/q0/explain.txt b/src/test/resources/tpcds/hyperspace/withoutHyperspace/q0/explain.txt new file mode 100644 index 000000000..eb4415dd9 --- /dev/null +++ b/src/test/resources/tpcds/hyperspace/withoutHyperspace/q0/explain.txt @@ -0,0 +1,14 @@ +== Physical Plan == +CollectLimit 100 ++- *(5) Project [d_year#1, ss_customer_sk#2] + +- *(5) SortMergeJoin [d_date_sk#3], [ss_sold_date_sk#4], Inner + :- *(2) Sort [d_date_sk#3 ASC NULLS FIRST], false, 0 + : +- Exchange hashpartitioning(d_date_sk#3, 200) + : +- *(1) Project [d_date_sk#3, d_year#1] + : +- *(1) Filter isnotnull(d_date_sk#3) + : +- *(1) FileScan parquet default.date_dim[d_date_sk#3,d_year#1] Batched: true, Format: Parquet, Location [not included in comparison]/{warehouse_dir}/date_dim], PartitionFilters: [], PushedFilters: [IsNotNull(d_date_sk)], ReadSchema: struct + +- *(4) Sort [ss_sold_date_sk#4 ASC NULLS FIRST], false, 0 + +- Exchange hashpartitioning(ss_sold_date_sk#4, 200) + +- *(3) Project [ss_sold_date_sk#4, ss_customer_sk#2] + +- *(3) Filter isnotnull(ss_sold_date_sk#4) + +- *(3) FileScan parquet default.store_sales[ss_sold_date_sk#4,ss_customer_sk#2] Batched: true, Format: Parquet, Location [not included in comparison]/{warehouse_dir}/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_sold_date_sk)], ReadSchema: struct \ No newline at end of file diff --git a/src/test/resources/tpcds/hyperspace/withoutHyperspace/q0/simplified.txt b/src/test/resources/tpcds/hyperspace/withoutHyperspace/q0/simplified.txt new file mode 100644 index 000000000..b5cd777ca --- /dev/null +++ b/src/test/resources/tpcds/hyperspace/withoutHyperspace/q0/simplified.txt @@ -0,0 +1,22 @@ +CollectLimit + WholeStageCodegen + Project [d_year,ss_customer_sk] + SortMergeJoin [d_date_sk,ss_sold_date_sk] + InputAdapter + WholeStageCodegen + Sort [d_date_sk] + InputAdapter + Exchange [d_date_sk] #1 + WholeStageCodegen + Project [d_date_sk,d_year] + Filter [d_date_sk] + Scan parquet default.date_dim [d_date_sk,d_year] [d_date_sk,d_year] + InputAdapter + WholeStageCodegen + Sort [ss_sold_date_sk] + InputAdapter + Exchange [ss_sold_date_sk] #2 + WholeStageCodegen + Project [ss_customer_sk,ss_sold_date_sk] + Filter [ss_sold_date_sk] + Scan parquet default.store_sales [ss_customer_sk,ss_sold_date_sk] [ss_customer_sk,ss_sold_date_sk] diff --git a/src/test/resources/tpcds/queries/q0.sql b/src/test/resources/tpcds/queries/q0.sql new file mode 100644 index 000000000..791f50ea6 --- /dev/null +++ b/src/test/resources/tpcds/queries/q0.sql @@ -0,0 +1,6 @@ +SELECT + date_dim.d_year, + store_sales.ss_customer_sk +FROM date_dim, store_sales +WHERE date_dim.d_date_sk = store_sales.ss_sold_date_sk +LIMIT 100 diff --git a/src/test/resources/tpcds/spark-2.4/approved-plans-v1_4/q0/explain.txt b/src/test/resources/tpcds/spark-2.4/approved-plans-v1_4/q0/explain.txt new file mode 100644 index 000000000..6798acb39 --- /dev/null +++ b/src/test/resources/tpcds/spark-2.4/approved-plans-v1_4/q0/explain.txt @@ -0,0 +1,11 @@ +== Physical Plan == +CollectLimit 100 ++- *(2) Project [d_year#1, ss_customer_sk#2] + +- *(2) BroadcastHashJoin [d_date_sk#3], [ss_sold_date_sk#4], Inner, BuildRight + :- *(2) Project [d_date_sk#3, d_year#1] + : +- *(2) Filter isnotnull(d_date_sk#3) + : +- *(2) FileScan parquet default.date_dim[d_date_sk#3,d_year#1] Batched: true, Format: Parquet, Location [not included in comparison]/{warehouse_dir}/date_dim], PartitionFilters: [], PushedFilters: [IsNotNull(d_date_sk)], ReadSchema: struct + +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) + +- *(1) Project [ss_sold_date_sk#4, ss_customer_sk#2] + +- *(1) Filter isnotnull(ss_sold_date_sk#4) + +- *(1) FileScan parquet default.store_sales[ss_sold_date_sk#4,ss_customer_sk#2] Batched: true, Format: Parquet, Location [not included in comparison]/{warehouse_dir}/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_sold_date_sk)], ReadSchema: struct \ No newline at end of file diff --git a/src/test/resources/tpcds/spark-2.4/approved-plans-v1_4/q0/simplified.txt b/src/test/resources/tpcds/spark-2.4/approved-plans-v1_4/q0/simplified.txt new file mode 100644 index 000000000..7055dbafb --- /dev/null +++ b/src/test/resources/tpcds/spark-2.4/approved-plans-v1_4/q0/simplified.txt @@ -0,0 +1,13 @@ +CollectLimit + WholeStageCodegen + Project [d_year,ss_customer_sk] + BroadcastHashJoin [d_date_sk,ss_sold_date_sk] + Project [d_date_sk,d_year] + Filter [d_date_sk] + Scan parquet default.date_dim [d_date_sk,d_year] [d_date_sk,d_year] + InputAdapter + BroadcastExchange #1 + WholeStageCodegen + Project [ss_customer_sk,ss_sold_date_sk] + Filter [ss_sold_date_sk] + Scan parquet default.store_sales [ss_customer_sk,ss_sold_date_sk] [ss_customer_sk,ss_sold_date_sk] diff --git a/src/test/scala/com/microsoft/hyperspace/goldstandard/IndexLogEntryCreator.scala b/src/test/scala/com/microsoft/hyperspace/goldstandard/IndexLogEntryCreator.scala new file mode 100644 index 000000000..a33d929c7 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/goldstandard/IndexLogEntryCreator.scala @@ -0,0 +1,97 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.goldstandard + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} +import org.apache.spark.sql.types.StructType + +import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.actions.Constants +import com.microsoft.hyperspace.index._ +import com.microsoft.hyperspace.util.PathUtils + +object IndexLogEntryCreator { + def createIndex(index: IndexDefinition, spark: SparkSession): Unit = { + val indexPath = + PathUtils.makeAbsolute(s"${spark.conf.get(IndexConstants.INDEX_SYSTEM_PATH)}/${index.name}") + val entry = getIndexLogEntry(index, indexPath, spark) + assert(new IndexLogManagerImpl(indexPath).writeLog(0, entry)) + } + + private def toRelation(sourceDf: DataFrame): Relation = { + val leafPlans = sourceDf.queryExecution.optimizedPlan.collectLeaves() + assert(leafPlans.size == 1) + leafPlans.head match { + case LogicalRelation( + HadoopFsRelation(location: PartitioningAwareFileIndex, _, dataSchema, _, _, _), + _, + _, + _) => + val sourceDataProperties = + Hdfs.Properties(Content.fromDirectory(location.rootPaths.head, new FileIdTracker)) + + Relation( + location.rootPaths.map(_.toString), + Hdfs(sourceDataProperties), + dataSchema.json, + "parquet", + Map.empty) + case _ => throw HyperspaceException("Unexpected relation found.") + } + } + + private def getIndexLogEntry( + index: IndexDefinition, + indexPath: Path, + spark: SparkSession): IndexLogEntry = { + val indexRootPath = new Path(indexPath, "v__=0") + val sourceDf = spark.table(index.tableName) + val indexSchema = { + val allCols = index.indexedCols ++ index.includedCols + StructType(sourceDf.schema.filter(f => allCols.contains(f.name))) + } + val relation = toRelation(sourceDf) + + val sourcePlanProperties = SparkPlan.Properties( + Seq(relation), + null, + null, + LogicalPlanFingerprint( + LogicalPlanFingerprint.Properties( + Seq( + Signature( + "com.microsoft.hyperspace.goldstandard.MockSignatureProvider", + index.tableName))))) + + val entry = IndexLogEntry( + index.name, + CoveringIndex( + CoveringIndex.Properties( + CoveringIndex.Properties + .Columns(index.indexedCols, index.includedCols), + IndexLogEntry.schemaString(indexSchema), + 200, + Map(IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY -> "true"))), + Content(Directory.fromDirectory(indexRootPath, new FileIdTracker)), + Source(SparkPlan(sourcePlanProperties)), + Map()) + entry.state = Constants.States.ACTIVE + entry + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/goldstandard/MockSignatureProvider.scala b/src/test/scala/com/microsoft/hyperspace/goldstandard/MockSignatureProvider.scala new file mode 100644 index 000000000..a18067e12 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/goldstandard/MockSignatureProvider.scala @@ -0,0 +1,44 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.goldstandard + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex} + +import com.microsoft.hyperspace.HyperspaceException +import com.microsoft.hyperspace.index.LogicalPlanSignatureProvider + +/** + * MockSignatureProvider is used in TPCDS_Hyperspace plan stability suite. It returns the + * table name of the source TPCDS table on which the index is created. + */ +class MockSignatureProvider extends LogicalPlanSignatureProvider { + override def signature(logicalPlan: LogicalPlan): Option[String] = { + val leafPlans = logicalPlan.collectLeaves() + if (leafPlans.size != 1) return None + + leafPlans.head match { + case LogicalRelation( + HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), + _, + _, + _) => + Some(location.rootPaths.head.getName) + case _ => throw HyperspaceException("Unexpected logical plan found.") + } + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/goldstandard/PlanStabilitySuite.scala b/src/test/scala/com/microsoft/hyperspace/goldstandard/PlanStabilitySuite.scala index 8136bf467..1d954f4a1 100644 --- a/src/test/scala/com/microsoft/hyperspace/goldstandard/PlanStabilitySuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/goldstandard/PlanStabilitySuite.scala @@ -5,7 +5,6 @@ * * The below license was copied from: https://github.com/FasterXML/jackson-module-scala/blob/2.10/src/main/resources/META-INF/LICENSE */ - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -100,7 +99,7 @@ trait PlanStabilitySuite extends TPCDSBase with SQLHelper with Logging { private def isApproved(dir: File, actualSimplifiedPlan: String): Boolean = { val file = new File(dir, "simplified.txt") val expected = FileUtils.readFileToString(file, StandardCharsets.UTF_8) - expected == actualSimplifiedPlan + expected.replaceAll("[\\r\\n]+", "") == actualSimplifiedPlan.replaceAll("[\\r\\n]+", "") } /** @@ -272,10 +271,10 @@ class TPCDSV1_4_SparkPlanStabilitySuite extends PlanStabilitySuite { override val goldenFilePath: String = new File(baseResourcePath, "spark-2.4/approved-plans-v1_4").getAbsolutePath - // Enable cross join because some queries fail during query optimization phase. - withSQLConf("spark.sql.crossJoin.enabled" -> "true") { - tpcdsQueries.foreach { q => - test(s"check simplified (tpcds-v1.4/$q)") { + tpcdsQueries.foreach { q => + test(s"check simplified (tpcds-v1.4/$q)") { + // Enable cross join because some queries fail during query optimization phase. + withSQLConf("spark.sql.crossJoin.enabled" -> "true") { testQuery("tpcds/queries", q) } } diff --git a/src/test/scala/com/microsoft/hyperspace/goldstandard/TPCDSBase.scala b/src/test/scala/com/microsoft/hyperspace/goldstandard/TPCDSBase.scala index 8b5139191..601a30c83 100644 --- a/src/test/scala/com/microsoft/hyperspace/goldstandard/TPCDSBase.scala +++ b/src/test/scala/com/microsoft/hyperspace/goldstandard/TPCDSBase.scala @@ -38,7 +38,7 @@ trait TPCDSBase extends SparkFunSuite with SparkInvolvedSuite { // The TPCDS queries below are based on v1.4. // TODO: Fix bulid pipeline for q49 and reenable q49. - val tpcdsQueries = Seq("q1") + val tpcdsQueries = Seq("q0") private val tableColumns = Map( "store_sales" -> diff --git a/src/test/scala/com/microsoft/hyperspace/goldstandard/TPCDS_Hyperspace.scala b/src/test/scala/com/microsoft/hyperspace/goldstandard/TPCDS_Hyperspace.scala new file mode 100644 index 000000000..3b6938db4 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/goldstandard/TPCDS_Hyperspace.scala @@ -0,0 +1,86 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.goldstandard + +import java.io.File + +import org.apache.hadoop.fs.Path + +import com.microsoft.hyperspace._ +import com.microsoft.hyperspace.goldstandard.IndexLogEntryCreator.createIndex +import com.microsoft.hyperspace.index.IndexConstants.INDEX_SYSTEM_PATH +import com.microsoft.hyperspace.util.FileUtils + +class TPCDS_Hyperspace extends PlanStabilitySuite { + + override val goldenFilePath: String = + new File(baseResourcePath, "hyperspace/approved-plans-v1_4").getAbsolutePath + + val indexSystemPath = new File(baseResourcePath, "hyperspace/indexes").toString + + override def beforeAll(): Unit = { + super.beforeAll() + spark.conf.set(INDEX_SYSTEM_PATH, indexSystemPath) + spark.enableHyperspace() + + val indexDefinitions = Seq( + "dtindex;date_dim;d_date_sk;d_year", + "ssIndex;store_sales;ss_sold_date_sk;ss_customer_sk") + indexDefinitions.foreach(i => createIndex(IndexDefinition.fromString(i), spark)) + } + + override def afterAll(): Unit = { + FileUtils.delete(new Path(indexSystemPath)) + super.afterAll() + } + + tpcdsQueries.foreach { q => + test(s"check simplified (tpcds-v1.4/$q)") { + // Enable cross join because some queries fail during query optimization phase. + withSQLConf( + ("spark.sql.crossJoin.enabled" -> "true"), + ("spark.sql.autoBroadcastJoinThreshold" -> "-1")) { + testQuery("tpcds/queries", q) + } + } + } +} + +case class IndexDefinition( + name: String, + indexedCols: Seq[String], + includedCols: Seq[String], + tableName: String) { +} + +object IndexDefinition { + /** + * Index definition from conf files should be provided in the following format: + * "index-name;table-name;comma-separated-indexed-cols;comma-separated-included-cols" + * + * @param definition: Index definition in string representation mentioned above. + * @return IndexDefinition. + */ + def fromString(definition: String): IndexDefinition = { + val splits = definition.split(";") + val indexName = splits(0) + val tableName = splits(1) + val indexCols = splits(2).split(",").toSeq + val includedCols = splits(3).split(",").toSeq + IndexDefinition(indexName, indexCols, includedCols, tableName) + } +}