Skip to content

Commit

Permalink
[JBPM-10209] Switching to CMT
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Nov 14, 2023
1 parent f897fcc commit 0b419a1
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,18 @@

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.ejb.ConcurrencyManagement;
import javax.ejb.ConcurrencyManagementType;
import javax.ejb.Lock;
import javax.ejb.LockType;
import javax.ejb.NoSuchObjectLocalException;
import javax.ejb.SessionContext;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.ejb.Timeout;
import javax.ejb.Timer;
import javax.ejb.TimerConfig;
import javax.ejb.TimerHandle;
import javax.ejb.TransactionManagement;
import javax.ejb.TransactionManagementType;
import javax.transaction.RollbackException;
import javax.transaction.UserTransaction;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

import org.drools.core.time.JobHandle;
import org.drools.core.time.impl.TimerJobInstance;
Expand All @@ -58,8 +55,6 @@

@Singleton
@Startup
@ConcurrencyManagement(ConcurrencyManagementType.CONTAINER)
@TransactionManagement(TransactionManagementType.BEAN)
@Lock(LockType.READ)
public class EJBTimerScheduler {

Expand All @@ -77,10 +72,10 @@ public class EJBTimerScheduler {

@Resource
protected javax.ejb.TimerService timerService;

@Resource
protected UserTransaction utx;

protected SessionContext ctx;
public void setUseLocalCache(boolean useLocalCache) {
this.useLocalCache = useLocalCache;
}
Expand Down Expand Up @@ -117,7 +112,7 @@ public void executeTimerJob(Timer timer) {
}
}
try {
transaction(this::executeTimerJobInstance, timerJobInstance);
invokeTransaction(this::executeTimerJobInstance, timerJobInstance);
} catch (SessionNotFoundException e) {
logger.warn("Process instance is not found. More likely already completed. Timer {} won't be recovered", timerJobInstance, e);
removeUnrecoverableTimer(timerJob, timer);
Expand All @@ -127,11 +122,7 @@ public void executeTimerJob(Timer timer) {
}

private void executeTimerJobInstance(TimerJobInstance timerJobInstance) throws Exception {
try {
((Callable<?>) timerJobInstance).call();
} catch (Exception e) {
throw e;
}
((Callable<?>) timerJobInstance).call();
}

private void removeUnrecoverableTimer(EjbTimerJob ejbTimerJob, Timer timer) {
Expand All @@ -141,7 +132,7 @@ private void removeUnrecoverableTimer(EjbTimerJob ejbTimerJob, Timer timer) {
logger.warn("Session not found for timer {}. Timer could not removed.", ejbTimerJob.getTimerJobInstance());
}
};
transaction(tx, ejbTimerJob.getTimerJobInstance());
invokeTransaction(tx, ejbTimerJob.getTimerJobInstance());
} catch (Exception e1) {
logger.warn("There was a problem during timer removal {}", ejbTimerJob.getTimerJobInstance(), e1);
}
Expand Down Expand Up @@ -169,7 +160,7 @@ private void recoverTimerJobInstance(EjbTimerJob ejbTimerJob, Timer timer, Excep
logger.debug("Interval trigger {} was removed before rescheduling", ejbTimerJob.getTimerJobInstance());
}
};
transaction(tx, ejbTimerJob.getTimerJobInstance());
invokeTransaction(tx, ejbTimerJob.getTimerJobInstance());
} catch (Exception e1) {
logger.warn("Could not schedule the interval trigger {}", ejbTimerJob.getTimerJobInstance(), e1);
}
Expand Down Expand Up @@ -198,7 +189,7 @@ private void recoverTimerJobInstance(EjbTimerJob ejbTimerJob, Timer timer, Excep
((GlobalJpaTimerJobInstance) ejbTimerJob.getTimerJobInstance()).setExternalTimerId(getPlatformTimerId(newTimer));
};
try {
transaction(operation, ejbTimerJob.getTimerJobInstance());
invokeTransaction (operation, ejbTimerJob.getTimerJobInstance());
} catch (Exception e1) {
logger.error("Failed to executed timer recovery {}", e1.getMessage(), e1);
}
Expand All @@ -218,30 +209,16 @@ private boolean isSessionNotFound(Exception e) {

@FunctionalInterface
private interface Transaction<I> {

void doWork(I item) throws Exception;
}

private <I> void transaction(Transaction<I> operation, I item) throws Exception {
try {
utx.begin();
operation.doWork(item);
utx.commit();
} catch(RollbackException e) {
logger.warn("Transaction was rolled back for {} with status {}", item, utx.getStatus());
if(utx.getStatus() == javax.transaction.Status.STATUS_ACTIVE) {
utx.rollback();
}
throw new RuntimeException("jbpm timer has been rolledback", e);
} catch (Exception e) {
try {
utx.rollback();
} catch (Exception re) {
logger.error("transaction could not be rolled back", re);
}

throw e;
}
@TransactionAttribute(value = TransactionAttributeType.REQUIRES_NEW)
public <I> void transaction(Transaction<I> operation, I item) throws Exception {
operation.doWork(item);
}

private <I> void invokeTransaction (Transaction<I> operation, I item) throws Exception {
ctx.getBusinessObject(EJBTimerScheduler.class).transaction(operation,item);
}

public void internalSchedule(TimerJobInstance timerJobInstance) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,34 +99,12 @@ public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) {

@Override
public boolean removeJob(JobHandle jobHandle) {
String uuid = ((EjbGlobalJobHandle) jobHandle).getUuid();
final Timer ejbTimer = getEjbTimer(getTimerMappinInfo(uuid));
final Timer ejbTimer = getEjbTimer(getTimerMappinInfo(((EjbGlobalJobHandle) jobHandle).getUuid()));
if (TRANSACTIONAL && ejbTimer == null) {
// this situation needs to be avoided as it should not happen
return false;
}
JtaTransactionManager tm = (JtaTransactionManager) TransactionManagerFactory.get().newTransactionManager();
try {
tm.registerTransactionSynchronization(new TransactionSynchronization() {
@Override
public void beforeCompletion() {
}

@Override
public void afterCompletion(int status) {
if (status == TransactionManager.STATUS_COMMITTED) {
logger.debug("remove job {} after commited", jobHandle);
scheduler.removeJob(jobHandle, ejbTimer);
}
}

});
logger.debug("register tx to remove job {}", jobHandle);
return true;
} catch (Exception e) {
logger.debug("remove job {} outside tx", jobHandle);
return scheduler.removeJob(jobHandle, ejbTimer);
}
return scheduler.removeJob(jobHandle, ejbTimer);
}

private TimerJobInstance getTimerJobInstance (String uuid) {
Expand Down

0 comments on commit 0b419a1

Please sign in to comment.