From c3484094032fe00badff356603ea3922d89c2b64 Mon Sep 17 00:00:00 2001 From: zstan Date: Mon, 30 Nov 2020 14:27:27 +0300 Subject: [PATCH 1/5] ignite-13772 Calcite. NPE with join cases. --- .../calcite/exec/rel/NestedLoopJoinNode.java | 4 +- .../exec/rel/AbstractExecutionTest.java | 66 ++++++++++++++++++- 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java index b634ac21c2284..b26649a3aac2f 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java @@ -513,7 +513,7 @@ public RightJoin(ExecutionContext ctx, RelDataType rowType, Predicate } } - if (waitingLeft == NOT_WAITING && requested > 0 && !rightNotMatchedIndexes.isEmpty()) { + if (waitingLeft == NOT_WAITING && requested > 0 && (rightNotMatchedIndexes != null && !rightNotMatchedIndexes.isEmpty())) { assert lastPushedInd >= 0; inLoop = true; @@ -657,7 +657,7 @@ public FullOuterJoin(ExecutionContext ctx, RelDataType rowType, Predicate 0 && !rightNotMatchedIndexes.isEmpty()) { + if (waitingLeft == NOT_WAITING && requested > 0 && (rightNotMatchedIndexes != null && !rightNotMatchedIndexes.isEmpty())) { assert lastPushedInd >= 0; inLoop = true; diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java index 6d4bcb7b6d21c..c1bfa027bc1df 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java @@ -17,10 +17,17 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; import java.util.stream.Collectors; import java.util.stream.IntStream; import com.google.common.collect.ImmutableMap; @@ -38,7 +45,9 @@ import org.apache.ignite.internal.processors.query.calcite.message.TestIoManager; import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription; import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; @@ -85,7 +94,7 @@ public void setup() throws Exception { GridTestKernalContext kernal = newContext(); QueryTaskExecutorImpl taskExecutor = new QueryTaskExecutorImpl(kernal); - taskExecutor.stripedThreadPoolExecutor(new IgniteStripedThreadPoolExecutor( + taskExecutor.stripedThreadPoolExecutor(new IgniteRandomStripedThreadPoolExecutor( kernal.config().getQueryThreadPoolSize(), kernal.igniteInstanceName(), "calciteQry", @@ -114,6 +123,61 @@ public void setup() throws Exception { } } + /** Task reordering executor. */ + private static class IgniteRandomStripedThreadPoolExecutor extends IgniteStripedThreadPoolExecutor { + /** */ + final Deque> tasks = new ArrayDeque<>(); + + /** Internal stop flag. */ + AtomicBoolean stop = new AtomicBoolean(); + + /** Inner execution service. */ + ExecutorService exec = Executors.newWorkStealingPool(); + + /** {@inheritDoc} */ + public IgniteRandomStripedThreadPoolExecutor(int concurrentLvl, String igniteInstanceName, String threadNamePrefix, Thread.UncaughtExceptionHandler eHnd, boolean allowCoreThreadTimeOut, long keepAliveTime) { + super(concurrentLvl, igniteInstanceName, threadNamePrefix, eHnd, allowCoreThreadTimeOut, keepAliveTime); + + GridTestUtils.runAsync(() -> { + while (!stop.get()) { + synchronized (tasks) { + T2 r = tasks.pollLast(); + while (r != null) { + T2 r0 = r; + + exec.execute(() -> super.execute(r0.getKey(), r0.getValue())); + + r = tasks.pollLast(); + } + } + + LockSupport.parkNanos(ThreadLocalRandom.current().nextLong(1_000, 10_000)); + } + }); + } + + /** {@inheritDoc} */ + @Override public void execute(Runnable task, int idx) { + synchronized (tasks) { + tasks.add(new T2<>(task, idx)); + } + } + + /** {@inheritDoc} */ + @Override public void shutdown() { + stop.set(true); + + super.shutdown(); + } + + /** {@inheritDoc} */ + @Override public List shutdownNow() { + stop.set(true); + + return super.shutdownNow(); + } + } + /** */ @After public void tearDown() { From 5aef0ce2bb580861681d6773bed7ce467f9bb6c4 Mon Sep 17 00:00:00 2001 From: zstan Date: Mon, 30 Nov 2020 22:16:26 +0300 Subject: [PATCH 2/5] parameterized --- .../exec/rel/AbstractExecutionTest.java | 29 +++++++++++++++---- .../query/calcite/exec/rel/ExecutionTest.java | 3 ++ 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java index c1bfa027bc1df..4092734406947 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Deque; import java.util.HashMap; import java.util.List; @@ -53,6 +54,8 @@ import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; import org.junit.After; import org.junit.Before; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME; @@ -78,6 +81,22 @@ public class AbstractExecutionTest extends GridCommonAbstractTest { /** */ protected int nodesCnt = 3; + /** */ + @Parameterized.Parameters(name = "Execution direction = {0}") + public static List parameters() { + ArrayList params = new ArrayList<>(); + + params.add(new Object[]{true}); + params.add(new Object[]{false}); + params.add(new Object[]{null}); + + return params; + } + + /** Execution direction. */ + @Parameterized.Parameter + public static Boolean execDir; + /** */ @Before public void setup() throws Exception { @@ -141,13 +160,11 @@ public IgniteRandomStripedThreadPoolExecutor(int concurrentLvl, String igniteIns GridTestUtils.runAsync(() -> { while (!stop.get()) { synchronized (tasks) { - T2 r = tasks.pollLast(); - while (r != null) { - T2 r0 = r; - - exec.execute(() -> super.execute(r0.getKey(), r0.getValue())); + while (!tasks.isEmpty()) { + T2 r = execDir != null ? (execDir ? tasks.pollLast() : tasks.pollFirst()) : + ThreadLocalRandom.current().nextBoolean() ? tasks.pollLast() : tasks.pollFirst(); - r = tasks.pollLast(); + exec.execute(() -> super.execute(r.getKey(), r.getValue())); } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java index fa4f711cb3cf3..1920a3197c338 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java @@ -50,6 +50,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.apache.calcite.rel.core.JoinRelType.ANTI; import static org.apache.calcite.rel.core.JoinRelType.FULL; @@ -66,6 +68,7 @@ */ @SuppressWarnings("TypeMayBeWeakened") @WithSystemProperty(key = "calcite.debug", value = "true") +@RunWith(Parameterized.class) public class ExecutionTest extends AbstractExecutionTest { /** * @throws Exception If failed. From 8600ce5e607872cdf1fa83960884a30fadd4eb9c Mon Sep 17 00:00:00 2001 From: zstan Date: Thu, 10 Dec 2020 12:21:01 +0300 Subject: [PATCH 3/5] fix after review --- .../query/calcite/prepare/IgniteSqlValidator.java | 3 +-- .../query/calcite/exec/rel/AbstractExecutionTest.java | 8 ++++---- .../processors/query/calcite/exec/rel/ExecutionTest.java | 3 --- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java index b593eb9f79b8b..0be98cf5900db 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java @@ -145,8 +145,7 @@ public IgniteSqlValidator(SqlOperatorTable opTab, CalciteCatalogReader catalogRe super.validateSelect(select, targetRowType); } - - + /** * @param n Node to check limit. * @param nodeName Node name. diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java index 4092734406947..f10f91f529502 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java @@ -51,7 +51,6 @@ import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; import org.junit.After; import org.junit.Before; import org.junit.runner.RunWith; @@ -62,6 +61,7 @@ /** * */ +@RunWith(Parameterized.class) public class AbstractExecutionTest extends GridCommonAbstractTest { /** */ private Throwable lastE; @@ -113,7 +113,7 @@ public void setup() throws Exception { GridTestKernalContext kernal = newContext(); QueryTaskExecutorImpl taskExecutor = new QueryTaskExecutorImpl(kernal); - taskExecutor.stripedThreadPoolExecutor(new IgniteRandomStripedThreadPoolExecutor( + taskExecutor.stripedThreadPoolExecutor(new IgniteTestStripedThreadPoolExecutor( kernal.config().getQueryThreadPoolSize(), kernal.igniteInstanceName(), "calciteQry", @@ -143,7 +143,7 @@ public void setup() throws Exception { } /** Task reordering executor. */ - private static class IgniteRandomStripedThreadPoolExecutor extends IgniteStripedThreadPoolExecutor { + private static class IgniteTestStripedThreadPoolExecutor extends org.apache.ignite.thread.IgniteStripedThreadPoolExecutor { /** */ final Deque> tasks = new ArrayDeque<>(); @@ -154,7 +154,7 @@ private static class IgniteRandomStripedThreadPoolExecutor extends IgniteStriped ExecutorService exec = Executors.newWorkStealingPool(); /** {@inheritDoc} */ - public IgniteRandomStripedThreadPoolExecutor(int concurrentLvl, String igniteInstanceName, String threadNamePrefix, Thread.UncaughtExceptionHandler eHnd, boolean allowCoreThreadTimeOut, long keepAliveTime) { + public IgniteTestStripedThreadPoolExecutor(int concurrentLvl, String igniteInstanceName, String threadNamePrefix, Thread.UncaughtExceptionHandler eHnd, boolean allowCoreThreadTimeOut, long keepAliveTime) { super(concurrentLvl, igniteInstanceName, threadNamePrefix, eHnd, allowCoreThreadTimeOut, keepAliveTime); GridTestUtils.runAsync(() -> { diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java index 1920a3197c338..fa4f711cb3cf3 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java @@ -50,8 +50,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import static org.apache.calcite.rel.core.JoinRelType.ANTI; import static org.apache.calcite.rel.core.JoinRelType.FULL; @@ -68,7 +66,6 @@ */ @SuppressWarnings("TypeMayBeWeakened") @WithSystemProperty(key = "calcite.debug", value = "true") -@RunWith(Parameterized.class) public class ExecutionTest extends AbstractExecutionTest { /** * @throws Exception If failed. From 207131326d5bd1d696c33cb57862fb23a7948604 Mon Sep 17 00:00:00 2001 From: zstan Date: Thu, 10 Dec 2020 16:00:15 +0300 Subject: [PATCH 4/5] merge squash ignite-13772-p1 --- .../exec/rel/AbstractExecutionTest.java | 50 +++++++++++++++---- 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java index f10f91f529502..6f71c884220ea 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel; import java.util.ArrayDeque; -import java.util.ArrayList; import java.util.Deque; import java.util.HashMap; import java.util.List; @@ -31,6 +30,8 @@ import java.util.concurrent.locks.LockSupport; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; + import com.google.common.collect.ImmutableMap; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler; @@ -82,20 +83,48 @@ public class AbstractExecutionTest extends GridCommonAbstractTest { protected int nodesCnt = 3; /** */ - @Parameterized.Parameters(name = "Execution direction = {0}") - public static List parameters() { - ArrayList params = new ArrayList<>(); + enum ExecutionStrategy { + /** */ + FIFO { + @Override public T2 nextTask(Deque> tasks) { + return tasks.pollFirst(); + } + }, - params.add(new Object[]{true}); - params.add(new Object[]{false}); - params.add(new Object[]{null}); + /** */ + LIFO { + @Override public T2 nextTask(Deque> tasks) { + return tasks.pollLast(); + } + }, - return params; + /** */ + RANDOM { + @Override public T2 nextTask(Deque> tasks) { + return ThreadLocalRandom.current().nextBoolean() ? tasks.pollLast() : tasks.pollFirst(); + } + }; + + /** + * Returns a next task according to the strategy. + * + * @param tasks Task list. + * @return Next task. + */ + public T2 nextTask(Deque> tasks) { + throw new UnsupportedOperationException(); + } + } + + /** */ + @Parameterized.Parameters(name = "Execution strategy = {0}") + public static List parameters() { + return Stream.of(ExecutionStrategy.values()).map(s -> new Object[]{s}).collect(Collectors.toList()); } /** Execution direction. */ @Parameterized.Parameter - public static Boolean execDir; + public static ExecutionStrategy execStgy; /** */ @Before @@ -161,8 +190,7 @@ public IgniteTestStripedThreadPoolExecutor(int concurrentLvl, String igniteInsta while (!stop.get()) { synchronized (tasks) { while (!tasks.isEmpty()) { - T2 r = execDir != null ? (execDir ? tasks.pollLast() : tasks.pollFirst()) : - ThreadLocalRandom.current().nextBoolean() ? tasks.pollLast() : tasks.pollFirst(); + T2 r = execStgy.nextTask(tasks); exec.execute(() -> super.execute(r.getKey(), r.getValue())); } From 5f6482118bc896a8758c2e4878e17e3deba08f87 Mon Sep 17 00:00:00 2001 From: zstan Date: Thu, 10 Dec 2020 17:26:54 +0300 Subject: [PATCH 5/5] ContinuousExecutionTest fixed --- .../exec/rel/ContinuousExecutionTest.java | 46 ++++++++++++------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java index e3724f2adf195..8a5145a75ec93 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ContinuousExecutionTest.java @@ -17,11 +17,14 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; import java.util.Random; import java.util.UUID; +import java.util.stream.Stream; import com.google.common.collect.ImmutableList; import org.apache.calcite.rel.type.RelDataType; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; @@ -31,7 +34,6 @@ import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils; import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; @@ -39,30 +41,42 @@ * */ @SuppressWarnings("TypeMayBeWeakened") -@RunWith(Parameterized.class) public class ContinuousExecutionTest extends AbstractExecutionTest { /** */ - @Parameter() + @Parameter(1) public int rowsCnt; /** */ - @Parameter(1) + @Parameter(2) public int remoteFragmentsCnt; /** */ - @Parameterized.Parameters(name = "rowsCount={0}, remoteFragmentsCount={1}") - public static List parameters() { - return ImmutableList.of( - new Object[]{10, 1}, - new Object[]{10, 5}, - new Object[]{10, 10}, - new Object[]{100, 1}, - new Object[]{100, 5}, - new Object[]{100, 10}, - new Object[]{100_000, 1}, - new Object[]{100_000, 5}, - new Object[]{100_000, 10} + @Parameterized.Parameters(name = "rowsCount={1}, remoteFragmentsCount={2}") + public static List data() { + List extraParams = new ArrayList<>(); + + ImmutableList newParams = ImmutableList.of( + new Object[] {10, 1}, + new Object[] {10, 5}, + new Object[] {10, 10}, + new Object[] {100, 1}, + new Object[] {100, 5}, + new Object[] {100, 10}, + new Object[] {100_000, 1}, + new Object[] {100_000, 5}, + new Object[] {100_000, 10} ); + + for (Object[] newParam : newParams) { + for (Object[] inheritedParam : AbstractExecutionTest.parameters()) { + Object[] both = Stream.concat(Arrays.stream(inheritedParam), Arrays.stream(newParam)) + .toArray(Object[]::new); + + extraParams.add(both); + } + } + + return extraParams; } /**