From b240e1be8e7a76f0175889df15b3456c1d0bb25d Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 21 Nov 2023 20:15:17 +0100 Subject: [PATCH] [JBPM-10209] Switching to CMT (#2362) (#2364) * [JBPM-10209] Switching to CMT * [JBPM-10209] A bit of refactor * [JBPM-10209] Gonzalos comments * [JBPM-10209] Disposable handling Co-authored-by: Francisco Javier Tirado Sarti <65240126+fjtirado@users.noreply.github.com> --- .../process/core/DisposableRuntimeEngine.java | 24 +++ .../core/timer/impl/GlobalTimerService.java | 6 +- .../timer/GlobalJpaTimerJobInstance.java | 20 +- .../manager/impl/RuntimeEngineImpl.java | 6 +- .../services/ejb/timer/EJBTimerScheduler.java | 185 +++++++----------- .../ejb/timer/EjbSchedulerService.java | 28 +-- 6 files changed, 115 insertions(+), 154 deletions(-) create mode 100644 jbpm-flow/src/main/java/org/jbpm/process/core/DisposableRuntimeEngine.java diff --git a/jbpm-flow/src/main/java/org/jbpm/process/core/DisposableRuntimeEngine.java b/jbpm-flow/src/main/java/org/jbpm/process/core/DisposableRuntimeEngine.java new file mode 100644 index 0000000000..2196fc5502 --- /dev/null +++ b/jbpm-flow/src/main/java/org/jbpm/process/core/DisposableRuntimeEngine.java @@ -0,0 +1,24 @@ +/* + * Copyright 2023 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.jbpm.process.core; + +import org.kie.internal.runtime.manager.Disposable; +import org.kie.internal.runtime.manager.InternalRuntimeEngine; + +public interface DisposableRuntimeEngine extends InternalRuntimeEngine, Disposable { + boolean isDisposed(); +} diff --git a/jbpm-flow/src/main/java/org/jbpm/process/core/timer/impl/GlobalTimerService.java b/jbpm-flow/src/main/java/org/jbpm/process/core/timer/impl/GlobalTimerService.java index 2daa6b2cfd..2c013940f3 100644 --- a/jbpm-flow/src/main/java/org/jbpm/process/core/timer/impl/GlobalTimerService.java +++ b/jbpm-flow/src/main/java/org/jbpm/process/core/timer/impl/GlobalTimerService.java @@ -37,6 +37,7 @@ import org.drools.core.time.impl.DefaultJobHandle; import org.drools.core.time.impl.TimerJobFactoryManager; import org.drools.core.time.impl.TimerJobInstance; +import org.jbpm.process.core.DisposableRuntimeEngine; import org.jbpm.process.core.timer.GlobalSchedulerService; import org.jbpm.process.core.timer.NamedJobContext; import org.jbpm.process.instance.timer.TimerManager.ProcessJobContext; @@ -420,7 +421,10 @@ public Environment getEnvironment() { return runtime.getKieSession().getEnvironment(); } - + + public boolean isDisposed() { + return runtime instanceof DisposableRuntimeEngine && ((DisposableRuntimeEngine)runtime).isDisposed(); + } } public List getListeners() { diff --git a/jbpm-persistence/jbpm-persistence-jpa/src/main/java/org/jbpm/persistence/timer/GlobalJpaTimerJobInstance.java b/jbpm-persistence/jbpm-persistence-jpa/src/main/java/org/jbpm/persistence/timer/GlobalJpaTimerJobInstance.java index c19a959172..16b7de1cfb 100644 --- a/jbpm-persistence/jbpm-persistence-jpa/src/main/java/org/jbpm/persistence/timer/GlobalJpaTimerJobInstance.java +++ b/jbpm-persistence/jbpm-persistence-jpa/src/main/java/org/jbpm/persistence/timer/GlobalJpaTimerJobInstance.java @@ -65,7 +65,7 @@ public GlobalJpaTimerJobInstance(Job job, JobContext ctx, Trigger trigger, @Override public Void call() throws Exception { AsyncExecutionMarker.markAsync(); - ExecutableRunner runner = null; + ExecutableRunner runner = null; TransactionManager jtaTm = null; boolean success = false; try { @@ -88,16 +88,16 @@ public Void call() throws Exception { success = true; return null; } catch( Exception e ) { - e.printStackTrace(); + logger.error("Exception executing timer", e); success = false; throw e; } finally { AsyncExecutionMarker.reset(); - if (runner != null && runner instanceof DisposableCommandService) { - if (allowedToDispose(((DisposableCommandService) runner).getEnvironment())) { - logger.debug("Allowed to dispose command service from global timer job instance"); - ((DisposableCommandService) runner).dispose(); - } + if (runner != null && runner instanceof DisposableCommandService) { + if (allowedToDispose(((DisposableCommandService) runner))) { + logger.debug("Allowed to dispose command service from global timer job instance"); + ((DisposableCommandService) runner).dispose(); + } } closeTansactionIfNeeded(jtaTm, success); } @@ -123,7 +123,11 @@ public String toString() { return "GlobalJpaTimerJobInstance [timerServiceId=" + timerServiceId + ", getJobHandle()=" + getJobHandle() + "]"; } - + + private boolean allowedToDispose(DisposableCommandService disposableCommandService) { + return !disposableCommandService.isDisposed() && allowedToDispose (disposableCommandService.getEnvironment()); + } + protected boolean allowedToDispose(Environment environment) { if (hasEnvironmentEntry(environment, "IS_JTA_TRANSACTION", false)) { return true; diff --git a/jbpm-runtime-manager/src/main/java/org/jbpm/runtime/manager/impl/RuntimeEngineImpl.java b/jbpm-runtime-manager/src/main/java/org/jbpm/runtime/manager/impl/RuntimeEngineImpl.java index 043b40bb39..1eda7e9f36 100644 --- a/jbpm-runtime-manager/src/main/java/org/jbpm/runtime/manager/impl/RuntimeEngineImpl.java +++ b/jbpm-runtime-manager/src/main/java/org/jbpm/runtime/manager/impl/RuntimeEngineImpl.java @@ -19,14 +19,13 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.jbpm.process.audit.JPAAuditLogService; +import org.jbpm.process.core.DisposableRuntimeEngine; import org.kie.api.runtime.KieSession; import org.kie.api.runtime.manager.Context; import org.kie.api.runtime.manager.RuntimeManager; import org.kie.api.runtime.manager.audit.AuditService; import org.kie.api.task.TaskService; -import org.kie.internal.runtime.manager.Disposable; import org.kie.internal.runtime.manager.DisposeListener; -import org.kie.internal.runtime.manager.InternalRuntimeEngine; import org.kie.internal.runtime.manager.InternalRuntimeManager; import org.kie.internal.runtime.manager.SessionNotFoundException; @@ -36,7 +35,7 @@ * and work item handlers might be interested in receiving notification when the runtime engine is disposed of, * in order deactivate themselves too and not receive any other events. */ -public class RuntimeEngineImpl implements InternalRuntimeEngine, Disposable { +public class RuntimeEngineImpl implements DisposableRuntimeEngine { private RuntimeEngineInitlializer initializer; private Context context; @@ -143,6 +142,7 @@ public void setManager(RuntimeManager manager) { this.manager = manager; } + @Override public boolean isDisposed() { return disposed; } diff --git a/jbpm-services/jbpm-services-ejb/jbpm-services-ejb-timer/src/main/java/org/jbpm/services/ejb/timer/EJBTimerScheduler.java b/jbpm-services/jbpm-services-ejb/jbpm-services-ejb-timer/src/main/java/org/jbpm/services/ejb/timer/EJBTimerScheduler.java index 10f7da7613..1a574fad68 100644 --- a/jbpm-services/jbpm-services-ejb/jbpm-services-ejb-timer/src/main/java/org/jbpm/services/ejb/timer/EJBTimerScheduler.java +++ b/jbpm-services/jbpm-services-ejb/jbpm-services-ejb-timer/src/main/java/org/jbpm/services/ejb/timer/EJBTimerScheduler.java @@ -32,21 +32,18 @@ import javax.annotation.PostConstruct; import javax.annotation.Resource; -import javax.ejb.ConcurrencyManagement; -import javax.ejb.ConcurrencyManagementType; import javax.ejb.Lock; import javax.ejb.LockType; import javax.ejb.NoSuchObjectLocalException; +import javax.ejb.SessionContext; import javax.ejb.Singleton; import javax.ejb.Startup; import javax.ejb.Timeout; import javax.ejb.Timer; import javax.ejb.TimerConfig; import javax.ejb.TimerHandle; -import javax.ejb.TransactionManagement; -import javax.ejb.TransactionManagementType; -import javax.transaction.RollbackException; -import javax.transaction.UserTransaction; +import javax.ejb.TransactionAttribute; +import javax.ejb.TransactionAttributeType; import org.drools.core.time.JobHandle; import org.drools.core.time.impl.TimerJobInstance; @@ -58,8 +55,6 @@ @Singleton @Startup -@ConcurrencyManagement(ConcurrencyManagementType.CONTAINER) -@TransactionManagement(TransactionManagementType.BEAN) @Lock(LockType.READ) public class EJBTimerScheduler { @@ -70,6 +65,8 @@ public class EJBTimerScheduler { private static final Integer TIMER_RETRY_LIMIT = Integer.parseInt(System.getProperty("org.kie.jbpm.timer.retry.limit", "3")); private static final Integer OVERDUE_WAIT_TIME = Integer.parseInt(System.getProperty("org.jbpm.overdue.timer.wait", "20000")); + + private static final Integer OVERDUE_CHECK_TIME = Integer.parseInt(System.getProperty("org.jbpm.overdue.timer.check", "200")); private boolean useLocalCache = Boolean.parseBoolean(System.getProperty("org.jbpm.ejb.timer.local.cache", "false")); @@ -77,10 +74,10 @@ public class EJBTimerScheduler { @Resource protected javax.ejb.TimerService timerService; - + @Resource - protected UserTransaction utx; - + protected SessionContext ctx; + public void setUseLocalCache(boolean useLocalCache) { this.useLocalCache = useLocalCache; } @@ -97,112 +94,78 @@ public void executeTimerJob(Timer timer) { EjbTimerJob timerJob = (EjbTimerJob) timer.getInfo(); TimerJobInstance timerJobInstance = timerJob.getTimerJobInstance(); logger.debug("About to execute timer for job {}", timerJob); - - String timerServiceId = ((EjbGlobalJobHandle) timerJobInstance.getJobHandle()).getDeploymentId(); - // handle overdue timers as ejb timer service might start before all deployments are ready long time = 0; - while (TimerServiceRegistry.getInstance().get(timerServiceId) == null) { - logger.debug("waiting for timer service to be available, elapsed time {} ms", time); - try { - Thread.sleep(500); - } catch (InterruptedException e) { - e.printStackTrace(); - } - time += 500; - - if (time > OVERDUE_WAIT_TIME) { - logger.debug("No timer service found after waiting {} ms", time); - break; + try { + while (TimerServiceRegistry.getInstance().get(((EjbGlobalJobHandle) timerJobInstance.getJobHandle()).getDeploymentId()) == null) { + logger.debug("waiting for timer service to be available, elapsed time {} ms", time); + Thread.sleep(OVERDUE_CHECK_TIME); + time += OVERDUE_CHECK_TIME; + if (time > OVERDUE_WAIT_TIME) { + logger.debug("No timer service found after waiting {} ms", time); + break; + } } - } + } catch (InterruptedException e) { + logger.warn("Thread has been interrupted", e); + Thread.currentThread().interrupt(); + } try { - transaction(this::executeTimerJobInstance, timerJobInstance); - } catch (SessionNotFoundException e) { - logger.warn("Process instance is not found. More likely already completed. Timer {} won't be recovered", timerJobInstance, e); - removeUnrecoverableTimer(timerJob, timer); + invokeTransaction(this::executeTimerJobInstance, timerJobInstance); } catch (Exception e) { recoverTimerJobInstance(timerJob, timer, e); } } private void executeTimerJobInstance(TimerJobInstance timerJobInstance) throws Exception { - try { - ((Callable) timerJobInstance).call(); - } catch (Exception e) { - throw e; - } + ((Callable) timerJobInstance).call(); } - private void removeUnrecoverableTimer(EjbTimerJob ejbTimerJob, Timer timer) { - try { - Transaction tx = timerJobInstance -> { - if (!this.removeJob(timerJobInstance.getJobHandle(), timer)) { - logger.warn("Session not found for timer {}. Timer could not removed.", ejbTimerJob.getTimerJobInstance()); - } - }; - transaction(tx, ejbTimerJob.getTimerJobInstance()); - } catch (Exception e1) { - logger.warn("There was a problem during timer removal {}", ejbTimerJob.getTimerJobInstance(), e1); - } - } - - private void recoverTimerJobInstance(EjbTimerJob ejbTimerJob, Timer timer, Exception e) { - if (isSessionNotFound(e)) { - logger.warn("Trying to recover timer. Not possible due to process instance is not found. More likely already completed. Timer {} won't be recovered", ejbTimerJob.getTimerJobInstance(), e); + private void recoverTimerJobInstance(EjbTimerJob ejbTimerJob, Timer timer, Exception cause) { + Transaction tx; + if (isSessionNotFound(cause)) { // if session is not found means the process has already finished. In this case we just need to remove // the timer and avoid any recovery as it should not trigger any more timers. - removeUnrecoverableTimer(ejbTimerJob, timer); - return; + tx = timerJobInstance -> { + logger.warn("Trying to recover timer. Not possible due to process instance is not found. More likely already completed. Timer {} won't be recovered", timerJobInstance, cause); + if (!removeJob(timerJobInstance.getJobHandle(), timer)) { + logger.warn("Session not found for timer {}. Timer could not removed.", timerJobInstance); + } + }; } - - if (ejbTimerJob.getTimerJobInstance().getTrigger().hasNextFireTime() != null) { + else if (ejbTimerJob.getTimerJobInstance().getTrigger().hasNextFireTime() != null) { // this is an interval trigger. Problem here is that the timer scheduled by DefaultTimerJobInstance is lost // because of the transaction, so we need to do this here. - try { - - logger.warn("Execution of time failed Interval Trigger failed. Skipping {}", ejbTimerJob.getTimerJobInstance()); - Transaction tx = timerJobInstance -> { - if (this.removeJob(timerJobInstance.getJobHandle(), null)) { - this.internalSchedule(timerJobInstance); - } else { - logger.debug("Interval trigger {} was removed before rescheduling", ejbTimerJob.getTimerJobInstance()); - } - }; - transaction(tx, ejbTimerJob.getTimerJobInstance()); - } catch (Exception e1) { - logger.warn("Could not schedule the interval trigger {}", ejbTimerJob.getTimerJobInstance(), e1); - } - return; + tx = timerJobInstance -> { + logger.warn("Execution of time failed Interval Trigger failed. Skipping {}", timerJobInstance); + if (removeJob(timerJobInstance.getJobHandle(), null)) { + internalSchedule(timerJobInstance); + } else { + logger.debug("Interval trigger {} was removed before rescheduling", timerJobInstance); + } + }; + } + else { + // if there is not next date to be fired, we need to apply policy otherwise will be lost + tx = timerJobInstance -> { + logger.warn("Execution of time failed. The timer will be retried {}", timerJobInstance); + ZonedDateTime nextRetry = ZonedDateTime.now().plus(TIMER_RETRY_INTERVAL, ChronoUnit.MILLIS); + EjbTimerJobRetry info = ejbTimerJob instanceof EjbTimerJobRetry ? ((EjbTimerJobRetry) ejbTimerJob).next() : new EjbTimerJobRetry(timerJobInstance); + if (TIMER_RETRY_LIMIT > 0 && info.getRetry() > TIMER_RETRY_LIMIT) { + logger.warn("The timer {} reached retry limit {}. It won't be retried again", timerJobInstance, TIMER_RETRY_LIMIT); + } else { + TimerConfig config = new TimerConfig(info, true); + Timer newTimer = timerService.createSingleActionTimer(Date.from(nextRetry.toInstant()), config); + ((GlobalJpaTimerJobInstance) timerJobInstance).setTimerInfo(newTimer.getHandle()); + ((GlobalJpaTimerJobInstance) timerJobInstance).setExternalTimerId(getPlatformTimerId(newTimer)); + } + }; } - - // if there is not next date to be fired, we need to apply policy otherwise will be lost - - logger.warn("Execution of time failed. The timer will be retried {}", ejbTimerJob.getTimerJobInstance()); - Transaction operation = (instance) -> { - ZonedDateTime nextRetry = ZonedDateTime.now().plus(TIMER_RETRY_INTERVAL, ChronoUnit.MILLIS); - EjbTimerJobRetry info = null; - if(ejbTimerJob instanceof EjbTimerJobRetry) { - info = ((EjbTimerJobRetry) ejbTimerJob).next(); - } else { - info = new EjbTimerJobRetry(instance); - } - if (TIMER_RETRY_LIMIT > 0 && info.getRetry() > TIMER_RETRY_LIMIT) { - logger.warn("The timer {} reached retry limit {}. It won't be retried again", instance, TIMER_RETRY_LIMIT); - return; - } - TimerConfig config = new TimerConfig(info, true); - Timer newTimer = timerService.createSingleActionTimer(Date.from(nextRetry.toInstant()), config); - TimerHandle handler = newTimer.getHandle(); - ((GlobalJpaTimerJobInstance) ejbTimerJob.getTimerJobInstance()).setTimerInfo(handler); - ((GlobalJpaTimerJobInstance) ejbTimerJob.getTimerJobInstance()).setExternalTimerId(getPlatformTimerId(newTimer)); - }; try { - transaction(operation, ejbTimerJob.getTimerJobInstance()); - } catch (Exception e1) { - logger.error("Failed to executed timer recovery {}", e1.getMessage(), e1); + invokeTransaction (tx, ejbTimerJob.getTimerJobInstance()); + } catch (Exception e) { + logger.error("Failed to executed timer recovery", e); } - } private boolean isSessionNotFound(Exception e) { @@ -218,30 +181,16 @@ private boolean isSessionNotFound(Exception e) { @FunctionalInterface private interface Transaction { - void doWork(I item) throws Exception; } - private void transaction(Transaction operation, I item) throws Exception { - try { - utx.begin(); - operation.doWork(item); - utx.commit(); - } catch(RollbackException e) { - logger.warn("Transaction was rolled back for {} with status {}", item, utx.getStatus()); - if(utx.getStatus() == javax.transaction.Status.STATUS_ACTIVE) { - utx.rollback(); - } - throw new RuntimeException("jbpm timer has been rolledback", e); - } catch (Exception e) { - try { - utx.rollback(); - } catch (Exception re) { - logger.error("transaction could not be rolled back", re); - } - - throw e; - } + @TransactionAttribute(value = TransactionAttributeType.REQUIRES_NEW) + public void transaction(Transaction operation, I item) throws Exception { + operation.doWork(item); + } + + private void invokeTransaction (Transaction operation, I item) throws Exception { + ctx.getBusinessObject(EJBTimerScheduler.class).transaction(operation,item); } public void internalSchedule(TimerJobInstance timerJobInstance) { diff --git a/jbpm-services/jbpm-services-ejb/jbpm-services-ejb-timer/src/main/java/org/jbpm/services/ejb/timer/EjbSchedulerService.java b/jbpm-services/jbpm-services-ejb/jbpm-services-ejb-timer/src/main/java/org/jbpm/services/ejb/timer/EjbSchedulerService.java index a6bc947c2b..ebcf8183a4 100644 --- a/jbpm-services/jbpm-services-ejb/jbpm-services-ejb-timer/src/main/java/org/jbpm/services/ejb/timer/EjbSchedulerService.java +++ b/jbpm-services/jbpm-services-ejb/jbpm-services-ejb-timer/src/main/java/org/jbpm/services/ejb/timer/EjbSchedulerService.java @@ -39,7 +39,6 @@ import org.drools.core.time.impl.TimerJobInstance; import org.drools.persistence.api.TransactionManager; import org.drools.persistence.api.TransactionManagerFactory; -import org.drools.persistence.api.TransactionSynchronization; import org.drools.persistence.jta.JtaTransactionManager; import org.jbpm.process.core.timer.GlobalSchedulerService; import org.jbpm.process.core.timer.JobNameHelper; @@ -103,31 +102,12 @@ public boolean removeJob(JobHandle jobHandle) { String uuid = ((EjbGlobalJobHandle) jobHandle).getUuid(); final Timer ejbTimer = getEjbTimer(getTimerMappinInfo(uuid)); if (TRANSACTIONAL && ejbTimer == null) { - // this situation needs to be avoided as it should not happen + logger.warn("EJB timer is null for uuid {} and transactional flag is enabled", uuid); return false; } - JtaTransactionManager tm = (JtaTransactionManager) TransactionManagerFactory.get().newTransactionManager(); - try { - tm.registerTransactionSynchronization(new TransactionSynchronization() { - @Override - public void beforeCompletion() { - } - - @Override - public void afterCompletion(int status) { - if (status == TransactionManager.STATUS_COMMITTED) { - logger.debug("remove job {} after commited", jobHandle); - scheduler.removeJob(jobHandle, ejbTimer); - } - } - - }); - logger.debug("register tx to remove job {}", jobHandle); - return true; - } catch (Exception e) { - logger.debug("remove job {} outside tx", jobHandle); - return scheduler.removeJob(jobHandle, ejbTimer); - } + boolean result = scheduler.removeJob(jobHandle, ejbTimer); + logger.debug("Remove job returned {}", result); + return result; } private TimerJobInstance getTimerJobInstance (String uuid) {