Skip to content

Commit

Permalink
[PEx] Implements multi-threaded PEx version (#768)
Browse files Browse the repository at this point in the history
* [PEx] Move globals to PExGlobal, adds --nproc CLI option

Moves state cache, timelines, num tasks data structures to PEx global class

Adds CLI option --nproc to provide number of threads

* [PEx] Adds dedicated logger for each thread

Moves global results reporting to global class

* [PEx] parallel: adds todos, cleans up global class

* [PEx] parallel: fix IR, move scheduler specific fields from globals

* [PEx] parallel: update IR so that main machine and monitors are created by PEx runtime

* [PEx] parallel: make search statistics thread safe

* [PEx] parallel: completes first version

Several updates: support multi-thread executor, result tracking, exceptions tracking, compiling results, terminating threads, resolving data conflicts, implements lock/release for shared constructs

* [PEx] parallel: minor change

* [PEx] parallel: update GitHub CI to run with 3 threads

* [PEx] parallel: minor corrections

Make global machine/monitor instance ids thread safe

Add hash/name to PContinuation object for thread-safe state caching
  • Loading branch information
aman-goel authored Sep 5, 2024
1 parent 8da2326 commit ed1b209
Show file tree
Hide file tree
Showing 29 changed files with 1,268 additions and 964 deletions.
19 changes: 5 additions & 14 deletions Src/PCompiler/CompilerCore/Backend/PEx/PExCodeGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,6 @@ private CompiledFile GenerateSource(CompilationContext context, Scope globalScop
context.WriteLine(source.Stream, "public void setTestDriver(PTestDriver input) { testDriver = input; }");
context.WriteLine(source.Stream);

context.WriteLine(source.Stream, "@Generated");
context.WriteLine(source.Stream, "public PMachine getStart() { return testDriver.getStart(); }");
context.WriteLine(source.Stream, "@Generated");
context.WriteLine(source.Stream, "public List<PMonitor> getMonitors() { return testDriver.getMonitors(); }");
context.WriteLine(source.Stream, "@Generated");
context.WriteLine(source.Stream,
"public Map<PEvent, List<PMonitor>> getListeners() { return testDriver.getListeners(); }");
context.WriteLine(source.Stream);

WriteSourceEpilogue(context, source.Stream);

return source;
Expand All @@ -155,7 +146,7 @@ private void WriteDriverConfigure(CompilationContext context, StringWriter outpu
context.WriteLine(output, "@Generated");
context.WriteLine(output, "public void configure() {");

context.WriteLine(output, $" mainMachine = new {startMachine}(0);");
context.WriteLine(output, $" mainMachine = {startMachine}.class;");

context.WriteLine(output);
context.WriteLine(output, " interfaceMap.clear();");
Expand All @@ -175,13 +166,13 @@ private void WriteDriverConfigure(CompilationContext context, StringWriter outpu
{
context.WriteLine(output);
var declName = context.GetNameForDecl(machine);
context.WriteLine(output, $" PMonitor instance_{declName} = new {declName}(0);");
context.WriteLine(output, $" monitorList.add(instance_{declName});");
context.WriteLine(output, $" Class<? extends PMachine> cls_{declName} = {declName}.class;");
context.WriteLine(output, $" monitorList.add(cls_{declName});");
foreach (var pEvent in machine.Observes.Events)
{
context.WriteLine(output, $" if(!observerMap.containsKey({pEvent.Name}))");
context.WriteLine(output, $" observerMap.put({pEvent.Name}, new ArrayList<>());");
context.WriteLine(output, $" observerMap.get({pEvent.Name}).add(instance_{declName});");
context.WriteLine(output, $" observerMap.get({pEvent.Name}).add(cls_{declName});");
}
}

Expand Down Expand Up @@ -732,7 +723,7 @@ private bool WriteStmt(Function function, CompilationContext context, StringWrit
break;

case PrintStmt printStmt:
context.Write(output, "PExLogger.logModel(");
context.Write(output, $"{CompilationContext.SchedulerVar}.getLogger().logModel(");
WriteExpr(context, output, printStmt.Message);
context.WriteLine(output, ".toString());");
break;
Expand Down
2 changes: 1 addition & 1 deletion Src/PRuntimes/PExRuntime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@
<maven.compiler.target>${java.version}</maven.compiler.target>
<log4j2.configurationFile>${project.basedir}/src/main/resources/log4j2.xml</log4j2.configurationFile>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<revision>0.0.1</revision>
<revision>0.1.0</revision>
</properties>

</project>
149 changes: 102 additions & 47 deletions Src/PRuntimes/PExRuntime/src/main/java/pex/RuntimeExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import pex.runtime.logger.StatWriter;
import pex.runtime.scheduler.Schedule;
import pex.runtime.scheduler.explicit.ExplicitSearchScheduler;
import pex.runtime.scheduler.explicit.SearchStatistics;
import pex.runtime.scheduler.explicit.strategy.SearchStrategyMode;
import pex.runtime.scheduler.explicit.strategy.SearchTask;
import pex.runtime.scheduler.replay.ReplayScheduler;
Expand All @@ -19,42 +18,96 @@

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;

/**
* Represents the runtime executor that executes the analysis engine
*/
public class RuntimeExecutor {
private static ExecutorService executor;
private static Future<Integer> future;
private static ExplicitSearchScheduler scheduler;
private static List<Future<Integer>> futures = new ArrayList<>();

private static void runWithTimeout(long timeLimit)
throws TimeoutException,
InterruptedException,
RuntimeException {
try {
private static void cancelAllThreads() {
for (int i = 0; i < futures.size(); i++) {
Future<Integer> f = futures.get(i);
if (!f.isDone() && !f.isCancelled()) {
f.cancel(true);
}
}
}

private static void runWithTimeout() throws Exception {
PExGlobal.setResult("incomplete");
PExGlobal.printProgressHeader();

double timeLimit = PExGlobal.getConfig().getTimeLimit();
Set<Integer> done = new HashSet<>();
Exception resultException = null;

PExGlobal.getSearchSchedulers().get(1).getSearchStrategy().createFirstTask();

for (int i = 0; i < PExGlobal.getConfig().getNumThreads(); i++) {
TimedCall timedCall = new TimedCall(PExGlobal.getSearchSchedulers().get(i + 1));
Future<Integer> f = executor.submit(timedCall);
futures.add(f);
}

while (true) {
if (timeLimit > 0) {
future.get(timeLimit, TimeUnit.SECONDS);
} else {
future.get();
double elapsedTime = TimeMonitor.getRuntime();
if (elapsedTime > timeLimit) {
cancelAllThreads();
resultException = new TimeoutException(String.format("Max time limit reached. Runtime: %.1f seconds", elapsedTime));
}
}

for (int i = 0; i < futures.size(); i++) {
if (!done.contains(i)) {
Future<Integer> f = futures.get(i);
if (f.isDone() || f.isCancelled()) {
done.add(i);
try {
f.get();
} catch (InterruptedException | CancellationException e) {
cancelAllThreads();
} catch (OutOfMemoryError e) {
cancelAllThreads();
resultException = new MemoutException(e.getMessage(), MemoryMonitor.getMemSpent(), e);
} catch (ExecutionException e) {
if (e.getCause() instanceof MemoutException) {
cancelAllThreads();
resultException = (MemoutException) e.getCause();
} else if (e.getCause() instanceof BugFoundException) {
cancelAllThreads();
resultException = (BugFoundException) e.getCause();
} else if (e.getCause() instanceof TimeoutException) {
cancelAllThreads();
resultException = (TimeoutException) e.getCause();
} else {
cancelAllThreads();
resultException = new RuntimeException("RuntimeException", e);
}
}
}
}
}
} catch (TimeoutException | BugFoundException e) {
throw e;
} catch (OutOfMemoryError e) {
throw new MemoutException(e.getMessage(), MemoryMonitor.getMemSpent(), e);
} catch (ExecutionException e) {
if (e.getCause() instanceof MemoutException) {
throw (MemoutException) e.getCause();
} else if (e.getCause() instanceof BugFoundException) {
throw (BugFoundException) e.getCause();
} else if (e.getCause() instanceof TimeoutException) {
throw (TimeoutException) e.getCause();
} else {
throw new RuntimeException("RuntimeException", e);

if (done.size() == PExGlobal.getConfig().getNumThreads()) {
break;
}
} catch (InterruptedException e) {
throw e;

TimeUnit.MILLISECONDS.sleep(100);
PExGlobal.printProgress(false);
}
PExGlobal.printProgress(true);
PExGlobal.printProgressFooter();

if (resultException != null) {
throw resultException;
}
}

Expand All @@ -67,8 +120,8 @@ private static void printStats() {
StatWriter.log("memory-current-MB", String.format("%.1f", memoryUsed));

if (PExGlobal.getConfig().getSearchStrategyMode() != SearchStrategyMode.Replay) {
StatWriter.log("max-depth-explored", String.format("%d", SearchStatistics.maxSteps));
scheduler.recordStats();
StatWriter.log("max-depth-explored", String.format("%d", PExGlobal.getMaxSteps()));
PExGlobal.recordStats();
if (PExGlobal.getResult().equals("correct for any depth")) {
PExGlobal.setStatus(STATUS.VERIFIED);
} else if (PExGlobal.getResult().startsWith("correct up to step")) {
Expand All @@ -82,8 +135,8 @@ private static void printStats() {

private static void preprocess() {
PExLogger.logInfo(String.format(".. Test case :: " + PExGlobal.getConfig().getTestDriver()));
PExLogger.logInfo(String.format("... Checker is using '%s' strategy (seed:%s)",
PExGlobal.getConfig().getSearchStrategyMode(), PExGlobal.getConfig().getRandomSeed()));
PExLogger.logInfo(String.format("... Checker is using '%s' strategy with %d threads (seed:%s)",
PExGlobal.getConfig().getSearchStrategyMode(), PExGlobal.getConfig().getNumThreads(), PExGlobal.getConfig().getRandomSeed()));

PExGlobal.setResult("error");

Expand All @@ -94,11 +147,8 @@ private static void preprocess() {
}

private static void process(boolean resume) throws Exception {
executor = Executors.newSingleThreadExecutor();
try {
TimedCall timedCall = new TimedCall(scheduler, resume);
future = executor.submit(timedCall);
runWithTimeout((long) PExGlobal.getConfig().getTimeLimit());
runWithTimeout();
} catch (TimeoutException e) {
PExGlobal.setStatus(STATUS.TIMEOUT);
throw new Exception("TIMEOUT", e);
Expand All @@ -107,15 +157,15 @@ private static void process(boolean resume) throws Exception {
throw new Exception("MEMOUT", e);
} catch (BugFoundException e) {
PExGlobal.setStatus(STATUS.BUG_FOUND);
PExGlobal.setResult(String.format("found cex of length %d", scheduler.getStepNumber()));
PExLogger.logStackTrace(e);
PExGlobal.setResult(String.format("found cex of length %d", e.getScheduler().getStepNumber()));
e.getScheduler().getLogger().logStackTrace(e);

String schFile = PExGlobal.getConfig().getOutputFolder() + "/" + PExGlobal.getConfig().getProjectName() + "_0_0.schedule";
PExLogger.logInfo(String.format("Writing buggy trace in %s", schFile));
scheduler.getSchedule().writeToFile(schFile);
e.getScheduler().getSchedule().writeToFile(schFile);

ReplayScheduler replayer = new ReplayScheduler(scheduler.getSchedule());
PExGlobal.setScheduler(replayer);
ReplayScheduler replayer = new ReplayScheduler(e.getScheduler().getSchedule());
PExGlobal.setReplayScheduler(replayer);
try {
replayer.run();
} catch (NullPointerException | StackOverflowError | ClassCastException replayException) {
Expand All @@ -136,11 +186,11 @@ private static void process(boolean resume) throws Exception {
PExGlobal.setStatus(STATUS.ERROR);
throw new Exception("ERROR", e);
} finally {
future.cancel(true);
cancelAllThreads();
executor.shutdownNow();
scheduler.updateResult();
PExGlobal.updateResult();
printStats();
PExLogger.logEndOfRun(scheduler, Duration.between(TimeMonitor.getStart(), Instant.now()).getSeconds());
PExLogger.logEndOfRun(Duration.between(TimeMonitor.getStart(), Instant.now()).getSeconds());
SearchTask.Cleanup();
}
}
Expand All @@ -149,18 +199,23 @@ public static void runSearch() throws Exception {
SearchTask.Initialize();
ScratchLogger.Initialize();

scheduler = new ExplicitSearchScheduler();
PExGlobal.setScheduler(scheduler);

preprocess();

executor = Executors.newFixedThreadPool(PExGlobal.getConfig().getNumThreads());

for (int i = 0; i < PExGlobal.getConfig().getNumThreads(); i++) {
ExplicitSearchScheduler scheduler = new ExplicitSearchScheduler(i + 1);
PExGlobal.addSearchScheduler(scheduler);
}

process(false);
}

private static void replaySchedule(String fileName) throws Exception {
PExLogger.logInfo(String.format("... Reading buggy trace from %s", fileName));

ReplayScheduler replayer = new ReplayScheduler(Schedule.readFromFile(fileName));
PExGlobal.setScheduler(replayer);
PExGlobal.setReplayScheduler(replayer);
try {
replayer.run();
} catch (NullPointerException | StackOverflowError | ClassCastException replayException) {
Expand All @@ -178,7 +233,7 @@ private static void replaySchedule(String fileName) throws Exception {
throw new Exception("Error when replaying the bug", replayException);
} finally {
printStats();
PExLogger.logEndOfRun(null, Duration.between(TimeMonitor.getStart(), Instant.now()).getSeconds());
PExLogger.logEndOfRun(Duration.between(TimeMonitor.getStart(), Instant.now()).getSeconds());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public class PExConfig {
// name of the output folder
@Setter
String outputFolder = "output";
// number of threads
@Setter
int numThreads = 1;
// time limit in seconds (0 means infinite)
@Setter
double timeLimit = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ public class PExOptions {
.build();
addOption(outputDir);

// number of threads
Option numThreads =
Option.builder("n")
.longOpt("nproc")
.desc("Number of threads (default: 1)")
.numberOfArgs(1)
.hasArg()
.argName("No. of Threads (integer)")
.build();
addOption(numThreads);

// time limit
Option timeLimit =
Option.builder("t")
Expand Down Expand Up @@ -288,6 +299,19 @@ public static PExConfig ParseCommandlineArgs(String[] args) {
case "outdir":
config.setOutputFolder(option.getValue());
break;
case "n":
case "nproc":
try {
config.setNumThreads(Integer.parseInt(option.getValue()));
if (config.getNumThreads() < 1) {
optionError(
option, String.format("Expected a positive integer value, got %s", option.getValue()));
}
} catch (NumberFormatException ex) {
optionError(
option, String.format("Expected an integer value, got %s", option.getValue()));
}
break;
case "t":
case "timeout":
try {
Expand Down Expand Up @@ -473,10 +497,12 @@ public static PExConfig ParseCommandlineArgs(String[] args) {

if (config.getSearchStrategyMode() == SearchStrategyMode.DepthFirst) {
config.setMaxSchedulesPerTask(0);
config.setNumThreads(1);
}

if (config.getReplayFile() != "") {
config.setSearchStrategyMode(SearchStrategyMode.Replay);
config.setNumThreads(1);
if (config.getVerbosity() == 0) {
config.setVerbosity(1);
}
Expand Down
Loading

0 comments on commit ed1b209

Please sign in to comment.