Skip to content

Commit

Permalink
Finishing out thread component functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
bdw429s committed Mar 27, 2024
1 parent 8dc5eff commit 69b8f86
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package ortus.boxlang.runtime.components.threading;

import java.util.List;
import java.util.Set;

import ortus.boxlang.runtime.components.Attribute;
Expand All @@ -30,6 +31,8 @@
import ortus.boxlang.runtime.types.IStruct;
import ortus.boxlang.runtime.types.exceptions.AbortException;
import ortus.boxlang.runtime.types.exceptions.BoxRuntimeException;
import ortus.boxlang.runtime.types.exceptions.BoxValidationException;
import ortus.boxlang.runtime.types.util.ListUtil;
import ortus.boxlang.runtime.util.RequestThreadManager;
import ortus.boxlang.runtime.validation.Validator;

Expand Down Expand Up @@ -146,13 +149,41 @@ private void run( IBoxContext context, String name, String priority, IStruct att
}

private void join( IBoxContext context, String name, Integer timeout ) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException( "Unimplemented method 'join'" );
if ( name == null || name.isEmpty() ) {
throw new BoxValidationException( "Thread name is required for join" );
}
timeout = timeout == null ? 0 : timeout;
int timeoutMSLeft = timeout;
long start = System.currentTimeMillis();
RequestThreadManager threadManager = context.getParentOfType( RequestBoxContext.class ).getThreadManager();
List<String> threadNames = ListUtil.asList( name, "," ).stream()
.map( item -> String.valueOf( item ) )
.map( String::trim )
.toList();

for ( String threadName : threadNames ) {
try {
( ( ThreadBoxContext ) threadManager.getThreadData( Key.of( threadName ) ).get( Key.context ) ).getThread().join( timeoutMSLeft );
} catch ( InterruptedException e ) {
throw new BoxRuntimeException( "Thread join interrupted", e );
}
// If we have a timeout, we need to check if we're out of time
// a timeout of zero means we do this forever
if ( timeout > 0 ) {
// Decrement how much time is left from the original timeout.
timeoutMSLeft = timeout - ( int ) ( System.currentTimeMillis() - start );
// If we're out of time, bail. Doesn't matter how many thread are left, we ran out of time
if ( timeoutMSLeft <= 0 ) {
return;
}
}
}
}

private void terminate( IBoxContext context, String name ) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException( "Unimplemented method 'terminate'" );
RequestThreadManager threadManager = context.getParentOfType( RequestBoxContext.class ).getThreadManager();
// Thread.stop() is deprecated in the JVM. We can use interrupt(), but it may not do anything if the thread is not in a blocking state.
( ( ThreadBoxContext ) threadManager.getThreadData( Key.of( name ) ).get( Key.context ) ).getThread().stop();
}

private void sleep( IBoxContext context, Integer duration ) {
Expand Down
29 changes: 25 additions & 4 deletions src/main/java/ortus/boxlang/runtime/util/RequestThreadManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package ortus.boxlang.runtime.util;

import java.lang.Thread.State;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import ortus.boxlang.runtime.context.ThreadBoxContext;
import ortus.boxlang.runtime.scopes.IScope;
Expand All @@ -28,6 +30,7 @@
import ortus.boxlang.runtime.types.DateTime;
import ortus.boxlang.runtime.types.IStruct;
import ortus.boxlang.runtime.types.Struct;
import ortus.boxlang.runtime.types.exceptions.BoxRuntimeException;

/**
* I manage the threads for a request. Used by the bx:thread component and thread scopes
Expand Down Expand Up @@ -92,13 +95,12 @@ Key.startTime, new DateTime(),
}

/**
* Gets the thread data for a thread. Returns null if none found by the provided name
* Do not cache the return of this method, or the thread state, execution time, and error data
* may be out of sync.
* Gets just the thread meta data for a thread. This is a subset of the metadata.
* Returns null if not found.
*
* @param name The name of the thread
*
* @return The thread data
* @return The thread meta data
*/
public IStruct getThreadMeta( Key name ) {
IStruct threadData = threads.get( name );
Expand Down Expand Up @@ -144,6 +146,25 @@ public IStruct getThreadMeta( Key name ) {
return threadMeta;
}

/**
* Gets the thread data for a thread. Throws exception if not found.
*
* @param name The name of the thread
*
* @return The thread data
*/
public IStruct getThreadData( Key name ) {
IStruct threadData = threads.get( name );
if ( threadData == null ) {
throw new BoxRuntimeException( "No thread with name [" + name.getName() + "] not found. Valid names are ["
+ Arrays.stream( getThreadNames() )
.map( Key::getName )
.collect( Collectors.joining( ", " ) )
+ "]." );
}
return threadData;
}

/**
* Marks a thread as complete
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,86 @@ public void testHasTheadScope() {

}

@DisplayName( "It can join thread no timeout" )
@Test
public void testCanJoinThreadNoTimeout() {

instance.executeSource(
"""
thread name="myThread" {
sleep( 2000 )
}
thread name="myThread" action="join";
result = myThread;
""",
context, BoxSourceType.CFSCRIPT );

assertThat( variables.getAsStruct( result ).get( Key.status ) ).isEqualTo( "COMPLETED" );
}

@DisplayName( "It can join thread zero timeout" )
@Test
public void testCanJoinThreadZeroTimeout() {

instance.executeSource(
"""
thread name="myThread" {
sleep( 2000 )
}
thread name="myThread" action="join" timeout=0;
result = myThread;
""",
context, BoxSourceType.CFSCRIPT );

assertThat( variables.getAsStruct( result ).get( Key.status ) ).isEqualTo( "COMPLETED" );
}

@DisplayName( "It can join thread postive timeout" )
@Test
public void testCanJoinThreadPositiveTimeout() {

instance.executeSource(
"""
start = getTickCount()
thread name="myThread" {
sleep( 2000 )
}
thread name="myThread2" {
sleep( 2000 )
}
thread name="myThread3" {
sleep( 2000 )
}
thread name="myThread,myThread2,myThread3" action="join" timeout=1000;
result = myThread;
totalTime = getTickCount() - start
""",
context, BoxSourceType.CFSCRIPT );

assertThat( variables.getAsStruct( result ).get( Key.status ) ).isEqualTo( "WAITING" );
assertThat( variables.getAsDouble( Key.of( "totalTime" ) ) > 1000 ).isTrue();
assertThat( variables.getAsDouble( Key.of( "totalTime" ) ) < 2000 ).isTrue();
}

@DisplayName( "It can stop thread" )
@Test
public void testCanStopThread() {

instance.executeSource(
"""
start = getTickCount()
thread name="myThread" {
sleep( 2000 )
}
thread name="myThread" action="terminate";
thread name="myThread" action="join" timeout=1000;
result = myThread;
totalTime = getTickCount() - start
""",
context, BoxSourceType.CFSCRIPT );

assertThat( variables.getAsStruct( result ).get( Key.status ) ).isEqualTo( "TERMINATED" );
assertThat( variables.getAsDouble( Key.of( "totalTime" ) ) < 1000 ).isTrue();
}

}

0 comments on commit 69b8f86

Please sign in to comment.