Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-13772 Calcite, NPE on join. #8518

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ public RightJoin(ExecutionContext<Row> ctx, RelDataType rowType, Predicate<Row>
}
}

if (waitingLeft == NOT_WAITING && requested > 0 && !rightNotMatchedIndexes.isEmpty()) {
if (waitingLeft == NOT_WAITING && requested > 0 && (rightNotMatchedIndexes != null && !rightNotMatchedIndexes.isEmpty())) {
assert lastPushedInd >= 0;

inLoop = true;
Expand Down Expand Up @@ -657,7 +657,7 @@ public FullOuterJoin(ExecutionContext<Row> ctx, RelDataType rowType, Predicate<R
}
}

if (waitingLeft == NOT_WAITING && requested > 0 && !rightNotMatchedIndexes.isEmpty()) {
if (waitingLeft == NOT_WAITING && requested > 0 && (rightNotMatchedIndexes != null && !rightNotMatchedIndexes.isEmpty())) {
assert lastPushedInd >= 0;

inLoop = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,18 @@

package org.apache.ignite.internal.processors.query.calcite.exec.rel;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import com.google.common.collect.ImmutableMap;
Expand All @@ -38,12 +46,16 @@
import org.apache.ignite.internal.processors.query.calcite.message.TestIoManager;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
import org.junit.After;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME;

Expand All @@ -69,6 +81,22 @@ public class AbstractExecutionTest extends GridCommonAbstractTest {
/** */
protected int nodesCnt = 3;

/** */
@Parameterized.Parameters(name = "Execution direction = {0}")
public static List<Object[]> parameters() {
ArrayList<Object[]> params = new ArrayList<>();

params.add(new Object[]{true});
params.add(new Object[]{false});
params.add(new Object[]{null});

return params;
}

/** Execution direction. */
@Parameterized.Parameter
public static Boolean execDir;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be enum usage will be more appropriate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can`t use enum here, need to be iterable as i understand, does i missed something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zstan please see my suggestion here: gridgain#259.

Also introducing params in the abstract class breaks ContinuousExecutionTest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks ! done.


/** */
@Before
public void setup() throws Exception {
Expand All @@ -85,7 +113,7 @@ public void setup() throws Exception {
GridTestKernalContext kernal = newContext();

QueryTaskExecutorImpl taskExecutor = new QueryTaskExecutorImpl(kernal);
taskExecutor.stripedThreadPoolExecutor(new IgniteStripedThreadPoolExecutor(
taskExecutor.stripedThreadPoolExecutor(new IgniteRandomStripedThreadPoolExecutor(
kernal.config().getQueryThreadPoolSize(),
kernal.igniteInstanceName(),
"calciteQry",
Expand Down Expand Up @@ -114,6 +142,59 @@ public void setup() throws Exception {
}
}

/** Task reordering executor. */
private static class IgniteRandomStripedThreadPoolExecutor extends IgniteStripedThreadPoolExecutor {
/** */
final Deque<T2<Runnable, Integer>> tasks = new ArrayDeque<>();

/** Internal stop flag. */
AtomicBoolean stop = new AtomicBoolean();

/** Inner execution service. */
ExecutorService exec = Executors.newWorkStealingPool();

/** {@inheritDoc} */
public IgniteRandomStripedThreadPoolExecutor(int concurrentLvl, String igniteInstanceName, String threadNamePrefix, Thread.UncaughtExceptionHandler eHnd, boolean allowCoreThreadTimeOut, long keepAliveTime) {
super(concurrentLvl, igniteInstanceName, threadNamePrefix, eHnd, allowCoreThreadTimeOut, keepAliveTime);

GridTestUtils.runAsync(() -> {
while (!stop.get()) {
synchronized (tasks) {
while (!tasks.isEmpty()) {
T2<Runnable, Integer> r = execDir != null ? (execDir ? tasks.pollLast() : tasks.pollFirst()) :
ThreadLocalRandom.current().nextBoolean() ? tasks.pollLast() : tasks.pollFirst();

exec.execute(() -> super.execute(r.getKey(), r.getValue()));
}
}

LockSupport.parkNanos(ThreadLocalRandom.current().nextLong(1_000, 10_000));
}
});
}

/** {@inheritDoc} */
@Override public void execute(Runnable task, int idx) {
synchronized (tasks) {
tasks.add(new T2<>(task, idx));
}
}

/** {@inheritDoc} */
@Override public void shutdown() {
stop.set(true);

super.shutdown();
}

/** {@inheritDoc} */
@Override public List<Runnable> shutdownNow() {
stop.set(true);

return super.shutdownNow();
}
}

/** */
@After
public void tearDown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.apache.calcite.rel.core.JoinRelType.ANTI;
import static org.apache.calcite.rel.core.JoinRelType.FULL;
Expand All @@ -66,6 +68,7 @@
*/
@SuppressWarnings("TypeMayBeWeakened")
@WithSystemProperty(key = "calcite.debug", value = "true")
@RunWith(Parameterized.class)
public class ExecutionTest extends AbstractExecutionTest {
/**
* @throws Exception If failed.
Expand Down