From f1d802906b3649a8025bae38c30814cdaa976912 Mon Sep 17 00:00:00 2001 From: Dmitriy Chernykh Date: Tue, 24 Mar 2020 00:51:58 +0700 Subject: [PATCH 1/6] add "isin" test for column that fails --- .../apache/spark/sql/DefaultSourceSuite.scala | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala b/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala index 1093a119..02e3d707 100644 --- a/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala @@ -153,8 +153,29 @@ class DefaultSourceSuite extends SHC with Logging { assert(c == 256) } + test("IN filter") { + val df = withCatalog(catalog) + df.show() + println(df.count()) + val s = df.filter($"col4" isin (4, 5, 6)).select("col0") + s.explain(true) + s.show + assert(s.count() == 3) + } + + test("IN filter rowkey") { + val df = withCatalog(catalog) + df.show() + println(df.count()) + val s = df.filter($"col0" isin ("row005", "row001", "row002")).select("col0") + s.explain(true) + s.show + assert(s.count() == 3) + } + test("IN and Not IN filter1") { val df = withCatalog(catalog) + df.show() val s = df.filter(($"col0" isin ("row005", "row001", "row002")) and !($"col0" isin ("row001", "row002"))) .select("col0") s.explain(true) From 36f70aba9fcdf6aa60b4b4cfce70e5555703c2f8 Mon Sep 17 00:00:00 2001 From: Dmitriy Chernykh Date: Tue, 24 Mar 2020 00:53:12 +0700 Subject: [PATCH 2/6] enable column filters --- .../sql/execution/datasources/hbase/HBaseFilter.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseFilter.scala b/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseFilter.scala index 8ee281bb..bcbce90a 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseFilter.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseFilter.scala @@ -369,13 +369,21 @@ object HBaseFilter extends Logging{ case In(attribute: String, values: Array[Any]) => //converting a "key in (x1, x2, x3..) filter to (key == x1) or (key == x2) or ... val ranges = new ArrayBuffer[ScanRange[Array[Byte]]]() + var typedFilters = ArrayBuffer[TypedFilter]() values.foreach{ value => val sparkFilter = EqualTo(attribute, value) val hbaseFilter = buildFilter(sparkFilter, relation) ranges ++= hbaseFilter.ranges + typedFilters += hbaseFilter.tf } - HRF[Array[Byte]](ranges.toArray, TypedFilter.empty, handled = true) + val resultingTypedFilter = typedFilters.foldLeft(TypedFilter.empty){ + (acc, tf) => acc match { + case TypedFilter(None, FilterType.Und) => tf + case _ => TypedFilter.or(acc, tf) + } + } + HRF[Array[Byte]](ranges.toArray, resultingTypedFilter, handled = true) case Not(In(attribute: String, values: Array[Any])) => //converting a "not(key in (x1, x2, x3..)) filter to (key != x1) and (key != x2) and .. val hrf = values.map{v => buildFilter(Not(EqualTo(attribute, v)),relation)} From 2f76f1f489c745abb988e0d457a65b537e58b124 Mon Sep 17 00:00:00 2001 From: Dmitriy Chernykh Date: Tue, 24 Mar 2020 02:06:14 +0700 Subject: [PATCH 3/6] fix row filters --- .../execution/datasources/hbase/HBaseFilter.scala | 5 +++-- .../org/apache/spark/sql/DefaultSourceSuite.scala | 12 ++---------- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseFilter.scala b/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseFilter.scala index bcbce90a..7b9e2530 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseFilter.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseFilter.scala @@ -374,7 +374,7 @@ object HBaseFilter extends Logging{ value => val sparkFilter = EqualTo(attribute, value) val hbaseFilter = buildFilter(sparkFilter, relation) - ranges ++= hbaseFilter.ranges + ranges ++= hbaseFilter.ranges.filter(_ != ScanRange.empty[Array[Byte]]) typedFilters += hbaseFilter.tf } val resultingTypedFilter = typedFilters.foldLeft(TypedFilter.empty){ @@ -383,7 +383,8 @@ object HBaseFilter extends Logging{ case _ => TypedFilter.or(acc, tf) } } - HRF[Array[Byte]](ranges.toArray, resultingTypedFilter, handled = true) + val resultingRanges = if (ranges.isEmpty) Array(ScanRange.empty[Array[Byte]]) else ranges.toArray + HRF[Array[Byte]](resultingRanges, resultingTypedFilter, handled = true) case Not(In(attribute: String, values: Array[Any])) => //converting a "not(key in (x1, x2, x3..)) filter to (key != x1) and (key != x2) and .. val hrf = values.map{v => buildFilter(Not(EqualTo(attribute, v)),relation)} diff --git a/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala b/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala index 02e3d707..b65dc4f6 100644 --- a/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala @@ -153,23 +153,15 @@ class DefaultSourceSuite extends SHC with Logging { assert(c == 256) } - test("IN filter") { + test("IN filter for column") { val df = withCatalog(catalog) - df.show() - println(df.count()) val s = df.filter($"col4" isin (4, 5, 6)).select("col0") - s.explain(true) - s.show assert(s.count() == 3) } - test("IN filter rowkey") { + test("IN filter for rowkey") { val df = withCatalog(catalog) - df.show() - println(df.count()) val s = df.filter($"col0" isin ("row005", "row001", "row002")).select("col0") - s.explain(true) - s.show assert(s.count() == 3) } From 3385f01f6b913f4dfd1c97f473d8fdcc45dd129b Mon Sep 17 00:00:00 2001 From: Dmitriy Chernykh Date: Tue, 24 Mar 2020 02:09:41 +0700 Subject: [PATCH 4/6] remove unnecessary show() --- .../src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala b/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala index b65dc4f6..938273d1 100644 --- a/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala @@ -167,7 +167,6 @@ class DefaultSourceSuite extends SHC with Logging { test("IN and Not IN filter1") { val df = withCatalog(catalog) - df.show() val s = df.filter(($"col0" isin ("row005", "row001", "row002")) and !($"col0" isin ("row001", "row002"))) .select("col0") s.explain(true) From 89e85cd1a7ff1e531e700dedcd208a82d54aa71c Mon Sep 17 00:00:00 2001 From: Dmitriy Chernykh Date: Tue, 24 Mar 2020 02:24:24 +0700 Subject: [PATCH 5/6] add stack overflow test for column filter --- .../org/apache/spark/sql/DefaultSourceSuite.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala b/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala index 938273d1..3496f51b 100644 --- a/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala @@ -183,7 +183,7 @@ class DefaultSourceSuite extends SHC with Logging { assert(s.count() == 1) } - test("IN filter stack overflow") { + test("IN filter rowkey stack overflow") { val df = withCatalog(catalog) val items = (0 to 2000).map{i => s"xaz$i"} val filterInItems = Seq("row001") ++: items @@ -194,6 +194,17 @@ class DefaultSourceSuite extends SHC with Logging { assert(s.count() == 1) } + test("IN filter column stack overflow") { + val df = withCatalog(catalog) + val items = (0 to 2000).map(_ + df.count() + 1) + val filterInItems = Seq(1) ++: items + + val s = df.filter($"col4" isin(filterInItems:_*)).select("col0") + s.explain(true) + s.show() + assert(s.count() == 1) + } + test("NOT IN filter stack overflow") { val df = withCatalog(catalog) val items = (0 to 2000).map{i => s"xaz$i"} From f2ee39d6f112ce891de25d524e61ef856b477931 Mon Sep 17 00:00:00 2001 From: Dmitriy Chernykh Date: Wed, 25 Mar 2020 12:03:11 +0700 Subject: [PATCH 6/6] avoid 2000 dataframe size recalculations --- .../test/scala/org/apache/spark/sql/DefaultSourceSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala b/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala index 3496f51b..1b1b62f7 100644 --- a/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala @@ -196,7 +196,8 @@ class DefaultSourceSuite extends SHC with Logging { test("IN filter column stack overflow") { val df = withCatalog(catalog) - val items = (0 to 2000).map(_ + df.count() + 1) + val df_size = df.count() + val items = (0 to 2000).map(_ + df_size + 1) val filterInItems = Seq(1) ++: items val s = df.filter($"col4" isin(filterInItems:_*)).select("col0")