Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

[Gold Standard] Initial Code showing Hyperspace Indexes with a sample query #385

Open
wants to merge 36 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
208ea5e
gold standard initial commit
apoorvedave1 Feb 19, 2021
cc14991
fix q32
apoorvedave1 Feb 19, 2021
7c8ee78
Merge branch 'master' of github.com:apoorvedave1/hyperspace-1 into gs
apoorvedave1 Feb 19, 2021
fa9bd4a
Merge branch 'master' of github.com:apoorvedave1/hyperspace-1 into gs
apoorvedave1 Mar 10, 2021
4b007a5
keep only tpcds v1.4 and remove others
apoorvedave1 Mar 10, 2021
59207f6
Merge remote-tracking branch 'upstream/master' into gs_initial
apoorvedave1 Mar 10, 2021
5c0cee9
Trigger Build
apoorvedave1 Mar 10, 2021
900539d
build error: test with sequential run
apoorvedave1 Mar 10, 2021
530dfa7
revert previous commit
apoorvedave1 Mar 10, 2021
add01f1
update plans
apoorvedave1 Mar 10, 2021
8b58b6b
update plan
apoorvedave1 Mar 10, 2021
97e8441
added sorting for fixing build pipeline
apoorvedave1 Mar 11, 2021
411450f
udpated plans with sorting
apoorvedave1 Mar 11, 2021
b70f00b
fix q49
apoorvedave1 Mar 11, 2021
7880f40
updated instructions on how to run tests
apoorvedave1 Mar 11, 2021
3dcd2d5
test with updated plans for q47, q49
apoorvedave1 Mar 12, 2021
a7a1149
update q47, 49 plans
apoorvedave1 Mar 12, 2021
dd295a9
remove rogue query
apoorvedave1 Mar 12, 2021
f47fbd3
remove q49
apoorvedave1 Mar 12, 2021
d15a4e8
fix scalastyle
apoorvedave1 Mar 12, 2021
4c511fb
add query files for q49.sql
apoorvedave1 Mar 12, 2021
e48c713
restructuring
apoorvedave1 Mar 12, 2021
6c5a2f7
cleanup before rebase
apoorvedave1 Mar 13, 2021
25cb361
Merge remote-tracking branch 'upstream/master' into gs_initial
apoorvedave1 Mar 13, 2021
0b40853
updated plans based on the plan stability suite
apoorvedave1 Mar 13, 2021
32f5899
normalize location: fix
apoorvedave1 Mar 15, 2021
352f2f5
Merge branch 'master' of github.com:apoorvedave1/hyperspace-1 into gs…
apoorvedave1 Mar 16, 2021
d5fbec8
query plans for all queries
apoorvedave1 Mar 16, 2021
3b51533
sample query q0 with sample indexes
apoorvedave1 Mar 18, 2021
fada1d6
revert changes from a dependency PR and depend directly on master
apoorvedave1 Mar 18, 2021
1fb6f4b
cleanup
apoorvedave1 Mar 18, 2021
e4ad365
code cleanup
apoorvedave1 Mar 19, 2021
cb211fb
diffs added wrt spark
apoorvedave1 Mar 22, 2021
659d65c
review comments and refactor
apoorvedave1 Mar 22, 2021
40864ae
remove CR and LF chars when comparing plans
apoorvedave1 Mar 22, 2021
ad399a3
review comments
apoorvedave1 Mar 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
== Physical Plan ==
CollectLimit 100
+- *(3) Project [d_year#1, ss_customer_sk#2]
+- *(3) SortMergeJoin [d_date_sk#3], [ss_sold_date_sk#4], Inner
:- *(1) Project [d_date_sk#3, d_year#1]
: +- *(1) Filter isnotnull(d_date_sk#3)
: +- *(1) FileScan Hyperspace(Type: CI, Name: dtindex, LogVersion: 0) default.date_dim[d_date_sk#3,d_year#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(d_date_sk)], ReadSchema: struct<d_date_sk:int,d_year:int>, SelectedBucketsCount: 200 out of 200
+- *(2) Project [ss_sold_date_sk#4, ss_customer_sk#2]
+- *(2) Filter isnotnull(ss_sold_date_sk#4)
+- *(2) FileScan Hyperspace(Type: CI, Name: ssIndex, LogVersion: 0) default.store_sales[ss_sold_date_sk#4,ss_customer_sk#2] Batched: true, Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(ss_sold_date_sk)], ReadSchema: struct<ss_sold_date_sk:int,ss_customer_sk:int>, SelectedBucketsCount: 200 out of 200
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CollectLimit
WholeStageCodegen
Project [d_year,ss_customer_sk]
SortMergeJoin [d_date_sk,ss_sold_date_sk]
InputAdapter
WholeStageCodegen
Project [d_date_sk,d_year]
Filter [d_date_sk]
Scan Hyperspace(Type: CI, Name: dtindex, LogVersion: 0) default.date_dim [d_date_sk,d_year] [d_date_sk,d_year]
InputAdapter
WholeStageCodegen
Project [ss_customer_sk,ss_sold_date_sk]
Filter [ss_sold_date_sk]
Scan Hyperspace(Type: CI, Name: ssIndex, LogVersion: 0) default.store_sales [ss_customer_sk,ss_sold_date_sk] [ss_customer_sk,ss_sold_date_sk]
28 changes: 28 additions & 0 deletions src/test/resources/tpcds/hyperspace/diffsWithSpark/q0/explain.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
1,14c1,10
< == Physical Plan ==
< CollectLimit 100
< +- *(5) Project [d_year#1, ss_customer_sk#2]
< +- *(5) SortMergeJoin [d_date_sk#3], [ss_sold_date_sk#4], Inner
< :- *(2) Sort [d_date_sk#3 ASC NULLS FIRST], false, 0
< : +- Exchange hashpartitioning(d_date_sk#3, 200)
< : +- *(1) Project [d_date_sk#3, d_year#1]
< : +- *(1) Filter isnotnull(d_date_sk#3)
< : +- *(1) FileScan parquet default.date_dim[d_date_sk#3,d_year#1] Batched: true, Format: Parquet, Location [not included in comparison]/{warehouse_dir}/date_dim], PartitionFilters: [], PushedFilters: [IsNotNull(d_date_sk)], ReadSchema: struct<d_date_sk:int,d_year:int>
< +- *(4) Sort [ss_sold_date_sk#4 ASC NULLS FIRST], false, 0
< +- Exchange hashpartitioning(ss_sold_date_sk#4, 200)
< +- *(3) Project [ss_sold_date_sk#4, ss_customer_sk#2]
< +- *(3) Filter isnotnull(ss_sold_date_sk#4)
< +- *(3) FileScan parquet default.store_sales[ss_sold_date_sk#4,ss_customer_sk#2] Batched: true, Format: Parquet, Location [not included in comparison]/{warehouse_dir}/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_sold_date_sk)], ReadSchema: struct<ss_sold_date_sk:int,ss_customer_sk:int>
\ No newline at end of file
---
> == Physical Plan ==
> CollectLimit 100
> +- *(3) Project [d_year#1, ss_customer_sk#2]
> +- *(3) SortMergeJoin [d_date_sk#3], [ss_sold_date_sk#4], Inner
> :- *(1) Project [d_date_sk#3, d_year#1]
> : +- *(1) Filter isnotnull(d_date_sk#3)
> : +- *(1) FileScan Hyperspace(Type: CI, Name: dtindex, LogVersion: 0) default.date_dim[d_date_sk#3,d_year#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(d_date_sk)], ReadSchema: struct<d_date_sk:int,d_year:int>, SelectedBucketsCount: 200 out of 200
> +- *(2) Project [ss_sold_date_sk#4, ss_customer_sk#2]
> +- *(2) Filter isnotnull(ss_sold_date_sk#4)
> +- *(2) FileScan Hyperspace(Type: CI, Name: ssIndex, LogVersion: 0) default.store_sales[ss_sold_date_sk#4,ss_customer_sk#2] Batched: true, Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(ss_sold_date_sk)], ReadSchema: struct<ss_sold_date_sk:int,ss_customer_sk:int>, SelectedBucketsCount: 200 out of 200
\ No newline at end of file
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
1,22c1,14
< CollectLimit
< WholeStageCodegen
< Project [d_year,ss_customer_sk]
< SortMergeJoin [d_date_sk,ss_sold_date_sk]
< InputAdapter
< WholeStageCodegen
< Sort [d_date_sk]
< InputAdapter
< Exchange [d_date_sk] #1
< WholeStageCodegen
< Project [d_date_sk,d_year]
< Filter [d_date_sk]
< Scan parquet default.date_dim [d_date_sk,d_year] [d_date_sk,d_year]
< InputAdapter
< WholeStageCodegen
< Sort [ss_sold_date_sk]
< InputAdapter
< Exchange [ss_sold_date_sk] #2
< WholeStageCodegen
< Project [ss_customer_sk,ss_sold_date_sk]
< Filter [ss_sold_date_sk]
< Scan parquet default.store_sales [ss_customer_sk,ss_sold_date_sk] [ss_customer_sk,ss_sold_date_sk]
---
> CollectLimit
> WholeStageCodegen
> Project [d_year,ss_customer_sk]
> SortMergeJoin [d_date_sk,ss_sold_date_sk]
> InputAdapter
> WholeStageCodegen
> Project [d_date_sk,d_year]
> Filter [d_date_sk]
> Scan Hyperspace(Type: CI, Name: dtindex, LogVersion: 0) default.date_dim [d_date_sk,d_year] [d_date_sk,d_year]
> InputAdapter
> WholeStageCodegen
> Project [ss_customer_sk,ss_sold_date_sk]
> Filter [ss_sold_date_sk]
> Scan Hyperspace(Type: CI, Name: ssIndex, LogVersion: 0) default.store_sales [ss_customer_sk,ss_sold_date_sk] [ss_customer_sk,ss_sold_date_sk]
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
== Physical Plan ==
CollectLimit 100
+- *(5) Project [d_year#1, ss_customer_sk#2]
+- *(5) SortMergeJoin [d_date_sk#3], [ss_sold_date_sk#4], Inner
:- *(2) Sort [d_date_sk#3 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(d_date_sk#3, 200)
: +- *(1) Project [d_date_sk#3, d_year#1]
: +- *(1) Filter isnotnull(d_date_sk#3)
: +- *(1) FileScan parquet default.date_dim[d_date_sk#3,d_year#1] Batched: true, Format: Parquet, Location [not included in comparison]/{warehouse_dir}/date_dim], PartitionFilters: [], PushedFilters: [IsNotNull(d_date_sk)], ReadSchema: struct<d_date_sk:int,d_year:int>
+- *(4) Sort [ss_sold_date_sk#4 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(ss_sold_date_sk#4, 200)
+- *(3) Project [ss_sold_date_sk#4, ss_customer_sk#2]
+- *(3) Filter isnotnull(ss_sold_date_sk#4)
+- *(3) FileScan parquet default.store_sales[ss_sold_date_sk#4,ss_customer_sk#2] Batched: true, Format: Parquet, Location [not included in comparison]/{warehouse_dir}/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_sold_date_sk)], ReadSchema: struct<ss_sold_date_sk:int,ss_customer_sk:int>
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
CollectLimit
WholeStageCodegen
Project [d_year,ss_customer_sk]
SortMergeJoin [d_date_sk,ss_sold_date_sk]
InputAdapter
WholeStageCodegen
Sort [d_date_sk]
InputAdapter
Exchange [d_date_sk] #1
WholeStageCodegen
Project [d_date_sk,d_year]
Filter [d_date_sk]
Scan parquet default.date_dim [d_date_sk,d_year] [d_date_sk,d_year]
InputAdapter
WholeStageCodegen
Sort [ss_sold_date_sk]
InputAdapter
Exchange [ss_sold_date_sk] #2
WholeStageCodegen
Project [ss_customer_sk,ss_sold_date_sk]
Filter [ss_sold_date_sk]
Scan parquet default.store_sales [ss_customer_sk,ss_sold_date_sk] [ss_customer_sk,ss_sold_date_sk]
6 changes: 6 additions & 0 deletions src/test/resources/tpcds/queries/q0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
SELECT
date_dim.d_year,
store_sales.ss_customer_sk
FROM date_dim, store_sales
WHERE date_dim.d_date_sk = store_sales.ss_sold_date_sk
LIMIT 100
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
== Physical Plan ==
CollectLimit 100
+- *(2) Project [d_year#1, ss_customer_sk#2]
+- *(2) BroadcastHashJoin [d_date_sk#3], [ss_sold_date_sk#4], Inner, BuildRight
apoorvedave1 marked this conversation as resolved.
Show resolved Hide resolved
:- *(2) Project [d_date_sk#3, d_year#1]
: +- *(2) Filter isnotnull(d_date_sk#3)
: +- *(2) FileScan parquet default.date_dim[d_date_sk#3,d_year#1] Batched: true, Format: Parquet, Location [not included in comparison]/{warehouse_dir}/date_dim], PartitionFilters: [], PushedFilters: [IsNotNull(d_date_sk)], ReadSchema: struct<d_date_sk:int,d_year:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
+- *(1) Project [ss_sold_date_sk#4, ss_customer_sk#2]
+- *(1) Filter isnotnull(ss_sold_date_sk#4)
+- *(1) FileScan parquet default.store_sales[ss_sold_date_sk#4,ss_customer_sk#2] Batched: true, Format: Parquet, Location [not included in comparison]/{warehouse_dir}/store_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ss_sold_date_sk)], ReadSchema: struct<ss_sold_date_sk:int,ss_customer_sk:int>
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CollectLimit
WholeStageCodegen
Project [d_year,ss_customer_sk]
BroadcastHashJoin [d_date_sk,ss_sold_date_sk]
Project [d_date_sk,d_year]
Filter [d_date_sk]
Scan parquet default.date_dim [d_date_sk,d_year] [d_date_sk,d_year]
InputAdapter
BroadcastExchange #1
WholeStageCodegen
Project [ss_customer_sk,ss_sold_date_sk]
Filter [ss_sold_date_sk]
Scan parquet default.store_sales [ss_customer_sk,ss_sold_date_sk] [ss_customer_sk,ss_sold_date_sk]
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.goldstandard

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex}
import org.apache.spark.sql.types.StructType

import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.util.PathUtils

object IndexLogEntryCreator {
def createIndex(index: IndexDefinition, spark: SparkSession): Unit = {
val indexPath =
PathUtils.makeAbsolute(s"${spark.conf.get(IndexConstants.INDEX_SYSTEM_PATH)}/${index.name}")
val entry = getIndexLogEntry(index, indexPath, spark)
assert(new IndexLogManagerImpl(indexPath).writeLog(0, entry))
}

private def toRelation(sourceDf: DataFrame): Relation = {
val leafPlans = sourceDf.queryExecution.optimizedPlan.collectLeaves()
assert(leafPlans.size == 1)
leafPlans.head match {
case LogicalRelation(
HadoopFsRelation(location: PartitioningAwareFileIndex, _, dataSchema, _, _, _),
_,
_,
_) =>
val sourceDataProperties =
Hdfs.Properties(Content.fromDirectory(location.rootPaths.head, new FileIdTracker))

Relation(
location.rootPaths.map(_.toString),
Hdfs(sourceDataProperties),
dataSchema.json,
"parquet",
Map.empty)
case _ => throw HyperspaceException("Unexpected relation found.")
}
}

private def getIndexLogEntry(
index: IndexDefinition,
indexPath: Path,
spark: SparkSession): IndexLogEntry = {
val indexRootPath = new Path(indexPath, "v__=0")
val sourceDf = spark.table(index.tableName)
val indexSchema = {
val allCols = index.indexedCols ++ index.includedCols
StructType(sourceDf.schema.filter(f => allCols.contains(f.name)))
}
val relation = toRelation(sourceDf)

val sourcePlanProperties = SparkPlan.Properties(
Seq(relation),
null,
null,
LogicalPlanFingerprint(
LogicalPlanFingerprint.Properties(
Seq(
Signature(
"com.microsoft.hyperspace.goldstandard.MockSignatureProvider",
index.tableName)))))

val entry = IndexLogEntry(
index.name,
CoveringIndex(
CoveringIndex.Properties(
CoveringIndex.Properties
.Columns(index.indexedCols, index.includedCols),
IndexLogEntry.schemaString(indexSchema),
200,
Map(IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY -> "true"))),
Content(Directory.fromDirectory(indexRootPath, new FileIdTracker)),
Source(SparkPlan(sourcePlanProperties)),
Map())
entry.state = Constants.States.ACTIVE
entry
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.goldstandard

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex}

import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.index.LogicalPlanSignatureProvider

/**
* MockSignatureProvider is used in TPCDS_Hyperspace plan stability suite. It returns the
* table name of the source TPCDS table on which the index is created.
*/
class MockSignatureProvider extends LogicalPlanSignatureProvider {
override def signature(logicalPlan: LogicalPlan): Option[String] = {
val leafPlans = logicalPlan.collectLeaves()
if (leafPlans.size != 1) return None

leafPlans.head match {
case LogicalRelation(
HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _),
_,
_,
_) =>
Some(location.rootPaths.head.getName)
case _ => throw HyperspaceException("Unexpected logical plan found.")
}
apoorvedave1 marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*
* The below license was copied from: https://github.com/FasterXML/jackson-module-scala/blob/2.10/src/main/resources/META-INF/LICENSE
*/

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
Expand Down Expand Up @@ -100,7 +99,7 @@ trait PlanStabilitySuite extends TPCDSBase with SQLHelper with Logging {
private def isApproved(dir: File, actualSimplifiedPlan: String): Boolean = {
val file = new File(dir, "simplified.txt")
val expected = FileUtils.readFileToString(file, StandardCharsets.UTF_8)
expected == actualSimplifiedPlan
expected.replaceAll("[\\r\\n]+", "") == actualSimplifiedPlan.replaceAll("[\\r\\n]+", "")
}

/**
Expand Down Expand Up @@ -272,10 +271,10 @@ class TPCDSV1_4_SparkPlanStabilitySuite extends PlanStabilitySuite {
override val goldenFilePath: String =
new File(baseResourcePath, "spark-2.4/approved-plans-v1_4").getAbsolutePath

// Enable cross join because some queries fail during query optimization phase.
withSQLConf("spark.sql.crossJoin.enabled" -> "true") {
tpcdsQueries.foreach { q =>
test(s"check simplified (tpcds-v1.4/$q)") {
tpcdsQueries.foreach { q =>
test(s"check simplified (tpcds-v1.4/$q)") {
// Enable cross join because some queries fail during query optimization phase.
withSQLConf("spark.sql.crossJoin.enabled" -> "true") {
testQuery("tpcds/queries", q)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ trait TPCDSBase extends SparkFunSuite with SparkInvolvedSuite {

// The TPCDS queries below are based on v1.4.
// TODO: Fix bulid pipeline for q49 and reenable q49.
val tpcdsQueries = Seq("q1")
val tpcdsQueries = Seq("q0")

private val tableColumns = Map(
"store_sales" ->
Expand Down
Loading