From 2072a20d9317f5d2ca6dd3cb949991e70f3419e4 Mon Sep 17 00:00:00 2001 From: Alejandro Revilla Date: Wed, 15 Jan 2025 17:04:54 -0300 Subject: [PATCH] use java.util.concurrent locks and counters --- .../jpos/transaction/TransactionManager.java | 96 +++++++------------ 1 file changed, 32 insertions(+), 64 deletions(-) diff --git a/jpos/src/main/java/org/jpos/transaction/TransactionManager.java b/jpos/src/main/java/org/jpos/transaction/TransactionManager.java index 8878ec9c39..06750a9502 100644 --- a/jpos/src/main/java/org/jpos/transaction/TransactionManager.java +++ b/jpos/src/main/java/org/jpos/transaction/TransactionManager.java @@ -52,6 +52,9 @@ import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.jpos.iso.ISOUtil; import org.jpos.util.Metrics; @@ -63,12 +66,9 @@ public class TransactionManager extends QBeanSupport implements Runnable, TransactionConstants, TransactionManagerMBean, Loggeable, MetricsProvider { - public static final String HEAD = "$HEAD"; - public static final String TAIL = "$TAIL"; public static final String CONTEXT = "$CONTEXT."; public static final String STATE = "$STATE."; public static final String GROUPS = "$GROUPS."; - public static final String TAILLOCK = "$TAILLOCK"; public static final String RETRY_QUEUE = "$RETRY_QUEUE"; public static final Integer PREPARING = 0; public static final Integer COMMITTING = 1; @@ -89,7 +89,7 @@ public class TransactionManager private Space isp; // real input space private Space iisp; // internal input space private String queue; - private String tailLock; + private Lock tailLock = new ReentrantLock(); private final List statusListeners = new ArrayList<>(); private boolean hasStatusListeners; private boolean doRecover; @@ -102,7 +102,9 @@ public class TransactionManager private final AtomicInteger activeSessions = new AtomicInteger(); private final AtomicInteger pausedSessions = new AtomicInteger(); - private volatile long head, tail; + private final AtomicLong head = new AtomicLong(); + private final AtomicLong tail = new AtomicLong(); + private long retryInterval = 5000L; private long retryTimeout = 60000L; private long pauseTimeout = 60000L; @@ -128,10 +130,8 @@ public void initService () throws ConfigurationException { psp = SpaceFactory.getSpace (cfg.get ("persistent-space", this.toString())); doRecover = cfg.getBoolean ("recover", psp instanceof PersistentSpace); - tail = initCounter (TAIL, cfg.getLong ("initial-tail", 1)); - head = Math.max (initCounter (HEAD, tail), tail); - initTailLock (); - + tail.set(cfg.getLong ("initial-tail", 1)); + head.set(tail.get()); groups = new HashMap<>(); initParticipants (getPersist()); initStatusListeners (getPersist()); @@ -257,7 +257,7 @@ private void runTransaction (Serializable context, int session) { evt = null; thread.setName (getName() + "-" + session + ":idle"); int action = -1; - id = nextId (); + id = head.incrementAndGet (); TMEvent tme = new TMEvent(getName(), id); Txn txn = new Txn(getName(), id); @@ -300,7 +300,7 @@ private void runTransaction (Serializable context, int session) { break; } snapshot (id, null, DONE); - if (id == tail) { + if (id == tail.get()) { checkTail (); } else { purge (id, false); @@ -353,16 +353,16 @@ private void runTransaction (Serializable context, int session) { @Override public long getTail () { - return tail; + return tail.get(); } @Override public long getHead () { - return head; + return head.get(); } public long getInTransit () { - return head - tail; + return head.get() - tail.get(); } @Override @@ -802,14 +802,6 @@ protected String getKey (String prefix, long id) { sb.append (id); return sb.toString (); } - protected long initCounter (String name, long defValue) { - Long L = (Long) psp.rdp (name); - if (L == null) { - L = defValue; - psp.out (name, L); - } - return L; - } protected void commitOff (Space sp) { if (sp instanceof JDBMSpace jsp) { jsp.setAutoCommit(false); @@ -821,46 +813,24 @@ protected void commitOn (Space sp) { jsp.setAutoCommit(true); } } - protected void syncTail () { - synchronized (psp) { - commitOff (psp); - psp.inp (TAIL); - psp.out (TAIL, tail); - commitOn (psp); - } - } - protected void initTailLock () { - tailLock = TAILLOCK + "." + this.hashCode(); - sp.put (tailLock, TAILLOCK); - } protected void checkTail () { - Object lock = sp.in (tailLock); - while (tailDone()) { - tail++; - Thread.yield(); + tailLock.lock(); + try { + while (tailDone()) { + tail.incrementAndGet(); + } + } finally { + tailLock.unlock(); } - syncTail (); - sp.out(tailLock, lock); } protected boolean tailDone () { - String stateKey = getKey(STATE, tail); + String stateKey = getKey(STATE, tail.get()); if (DONE.equals (psp.rdp (stateKey))) { - purge (tail, true); + purge (tail.get(), true); return true; } return false; } - protected long nextId () { - long h; - synchronized (psp) { - commitOff (psp); - psp.in (HEAD); - h = head; - psp.out (HEAD, ++head); - commitOn (psp); - } - return h; - } protected void snapshot (long id, Serializable context) { snapshot (id, context, null); } @@ -913,17 +883,15 @@ protected void purge (long id, boolean full) { protected void recover () { if (doRecover) { - if (tail < head) { - getLog().info ("recover - tail=" +tail+", head="+head); + if (tail.get() < head.get()) { + getLog().info ("recover - tail=" +tail.get()+", head="+head.get()); } - while (tail < head) { - recover (0, tail++); + while (tail.get() < head.get()) { + recover (tail.getAndIncrement()); } - } else - tail = head; - syncTail (); + } } - protected void recover (int session, long id) { + protected void recover (long id) { LogEvent evt = getLog().createLogEvent ("recover"); Profiler prof = new Profiler(); evt.addMessage ("" + id + ""); @@ -943,9 +911,9 @@ protected void recover (int session, long id) { if (DONE.equals (state)) { evt.addMessage (""); } else if (COMMITTING.equals (state)) { - commit (session, id, context, getParticipants (id), true, evt, prof); + commit (0, id, context, getParticipants (id), true, evt, prof); } else if (PREPARING.equals (state)) { - abort (session, id, context, getParticipants (id), true, evt, prof); + abort (0, id, context, getParticipants (id), true, evt, prof); } purge (id, true); } finally { @@ -1094,7 +1062,7 @@ private ParticipantParams getParams (TransactionParticipant p) { private String tmInfo() { return String.format ("in-transit=%d, head=%d, tail=%d, paused=%d, outstanding=%d, active-sessions=%d/%d%s", - getInTransit(), head, tail, pausedSessions.get(), getOutstandingTransactions(), + getInTransit(), head.get(), tail.get(), pausedSessions.get(), getOutstandingTransactions(), getActiveSessions(), maxSessions, (tps != null ? ", " + tps : "") );