Skip to content

Commit

Permalink
more listeners for schedulers and code uniformity
Browse files Browse the repository at this point in the history
  • Loading branch information
lmajano committed May 29, 2024
1 parent a76e3a5 commit 62575fb
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 19 deletions.
66 changes: 47 additions & 19 deletions src/main/java/ortus/boxlang/runtime/async/tasks/ScheduledTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import ortus.boxlang.runtime.BoxRuntime;
import ortus.boxlang.runtime.async.executors.ExecutorRecord;
import ortus.boxlang.runtime.async.time.DateTimeHelper;
import ortus.boxlang.runtime.events.BoxEvent;
import ortus.boxlang.runtime.interop.DynamicObject;
import ortus.boxlang.runtime.scopes.Key;
import ortus.boxlang.runtime.services.InterceptorService;
import ortus.boxlang.runtime.types.IStruct;
import ortus.boxlang.runtime.types.Struct;
import ortus.boxlang.runtime.util.Timer;
Expand Down Expand Up @@ -249,6 +252,11 @@ public class ScheduledTask implements Runnable {
*/
private final Timer timer = new Timer();

/**
* Interceptor Service
*/
private InterceptorService interceptorService = BoxRuntime.getInstance().getInterceptorService();

/**
* --------------------------------------------------------------------------
* Constructors
Expand Down Expand Up @@ -378,9 +386,13 @@ public void run( Boolean force ) {
}

// Mark the task as it will run now for the first time
stats.put( "neverRun", false );
this.stats.put( "neverRun", false );
try {
// Before Interceptors
// Before Interceptors : From global to local
this.interceptorService.announce(
BoxEvent.SCHEDULER_BEFORE_ANY_TASK,
Struct.of( "task", this )
);
if ( hasScheduler() ) {
getScheduler().beforeAnyTask( this );
}
Expand All @@ -390,60 +402,76 @@ public void run( Boolean force ) {

// Execution by type
if ( task instanceof DynamicObject castedTask ) {
stats.put( "lastResult", Optional.ofNullable( castedTask.invoke( method ) ) );
this.stats.put( "lastResult", Optional.ofNullable( castedTask.invoke( method ) ) );
} else if ( task instanceof Callable<?> castedTask ) {
stats.put( "lastResult", Optional.ofNullable( castedTask.call() ) );
this.stats.put( "lastResult", Optional.ofNullable( castedTask.call() ) );
} else if ( task instanceof Runnable castedTask ) {
castedTask.run();
stats.put( "lastResult", Optional.empty() );
this.stats.put( "lastResult", Optional.empty() );
} else {
throw new IllegalArgumentException( "Task is not a DynamicObject or a Callable or a Runnable" );
}

// Get the last result
var result = ( Optional<?> ) stats.get( "lastResult" );
var result = ( Optional<?> ) this.stats.get( "lastResult" );

// After Interceptors
// After Interceptors : From local to global
if ( afterTask != null ) {
afterTask.accept( this, result );
}
if ( hasScheduler() ) {
getScheduler().afterAnyTask( this, result );
}
this.interceptorService.announce(
BoxEvent.SCHEDULER_AFTER_ANY_TASK,
Struct.of( "task", this, "result", result )
);

// Store successes and call success interceptor
( ( AtomicInteger ) stats.get( "totalSuccess" ) ).incrementAndGet();
// Store successes and call success interceptor : From global to local
( ( AtomicInteger ) this.stats.get( "totalSuccess" ) ).incrementAndGet();
if ( onTaskSuccess != null ) {
onTaskSuccess.accept( this, result );
}
if ( hasScheduler() ) {
getScheduler().onAnyTaskSuccess( this, result );
}
this.interceptorService.announce(
BoxEvent.SCHEDULER_ON_ANY_TASK_SUCCESS,
Struct.of( "task", this, "result", result )
);

} catch ( Exception e ) {
// store failures
( ( AtomicInteger ) stats.get( "totalFailures" ) ).incrementAndGet();
( ( AtomicInteger ) this.stats.get( "totalFailures" ) ).incrementAndGet();
logger.error( "Error running task ({}) failed: {}", name, e.getMessage() );
logger.error( "Stacktrace for ({}) : {}", name, e.getStackTrace() );

// Try to execute the error handlers. Try try try just in case.
try {
// Life Cycle onTaskFailure call
// Life Cycle onTaskFailure call : From global to local
if ( onTaskFailure != null ) {
onTaskFailure.accept( this, e );
}
// If we have a scheduler attached, called the schedulers life-cycle
if ( hasScheduler() ) {
getScheduler().onAnyTaskError( this, e );
}
this.interceptorService.announce(
BoxEvent.SCHEDULER_ON_ANY_TASK_ERROR,
Struct.of( "task", this, "exception", e )
);

// After Tasks Interceptor with the exception as the last result
// After Tasks Interceptor with the exception as the last result : From global to local
if ( afterTask != null ) {
afterTask.accept( this, Optional.of( e ) );
}
if ( hasScheduler() ) {
getScheduler().afterAnyTask( this, Optional.of( e ) );
}
this.interceptorService.announce(
BoxEvent.SCHEDULER_AFTER_ANY_TASK,
Struct.of( "task", this, "result", Optional.of( e ) )
);
} catch ( Exception afterException ) {
// Log it, so it doesn't go to ether and executor doesn't die.
logger.error(
Expand All @@ -459,9 +487,9 @@ public void run( Boolean force ) {
}
} finally {
// Store finalization stats
stats.put( "lastRun", getNow() );
( ( AtomicLong ) stats.get( "lastExecutionTime" ) ).set( timer.stopAndGetMillis( timerLabel ) );
( ( AtomicInteger ) stats.get( "totalRuns" ) ).incrementAndGet();
this.stats.put( "lastRun", getNow() );
( ( AtomicLong ) this.stats.get( "lastExecutionTime" ) ).set( timer.stopAndGetMillis( timerLabel ) );
( ( AtomicInteger ) this.stats.get( "totalRuns" ) ).incrementAndGet();
// Call internal cleanups event
cleanupTaskRun();
// set next run time based on timeUnit and period
Expand All @@ -475,7 +503,7 @@ public void run( Boolean force ) {
* @return The last result of the task as an Optional
*/
public Optional<?> getLastResult() {
return ( Optional<?> ) stats.get( "lastResult" );
return ( Optional<?> ) this.stats.get( "lastResult" );
}

/**
Expand All @@ -501,7 +529,7 @@ public ScheduledFuture<?> start() {
if ( this.timeUnit != TimeUnit.SECONDS ) {
this.initialDelay = 0;
// reset the initial nextRunTime
stats.put( "nextRun", null );
this.stats.put( "nextRun", null );
} else {
this.initialDelay = DateTimeHelper.timeUnitToSeconds( this.initialDelay, this.initialDelayTimeUnit );
}
Expand Down Expand Up @@ -1767,15 +1795,15 @@ else if ( initialNextRun != null ) {
// Which is what the task operates on, either one.
var amount = this.spacedDelay != 0 ? this.spacedDelay : this.period;
// if overlaps are allowed task is immediately scheduled
if ( this.spacedDelay == 0 && ( ( AtomicLong ) stats.get( "lastExecutionTime" ) ).get() / 1000 > this.period ) {
if ( this.spacedDelay == 0 && ( ( AtomicLong ) this.stats.get( "lastExecutionTime" ) ).get() / 1000 > this.period ) {
amount = 0;
}
nextRun = DateTimeHelper.dateTimeAdd( nextRun, amount, this.timeUnit );
}

// Store it
debugLog( "setNextRunTime-end", Struct.of( "nextRun", nextRun ) );
stats.put( "nextRun", nextRun );
this.stats.put( "nextRun", nextRun );
}

/**
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/ortus/boxlang/runtime/events/BoxEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ public enum BoxEvent {
ON_SCHEDULER_STARTUP( "onSchedulerStartup" ),
ON_SCHEDULER_SHUTDOWN( "onSchedulerShutdown" ),
ON_SCHEDULER_RESTART( "onSchedulerRestart" ),
SCHEDULER_BEFORE_ANY_TASK( "schedulerBeforeAnyTask" ),
SCHEDULER_AFTER_ANY_TASK( "schedulerAfterAnyTask" ),
SCHEDULER_ON_ANY_TASK_SUCCESS( "schedulerOnAnyTaskSuccess" ),
SCHEDULER_ON_ANY_TASK_ERROR( "schedulerOnAnyTaskError" ),

/**
* Scheduler Service Events
Expand Down

0 comments on commit 62575fb

Please sign in to comment.