Skip to content

Commit

Permalink
use java.util.concurrent locks and counters
Browse files Browse the repository at this point in the history
  • Loading branch information
ar committed Jan 15, 2025
1 parent a721d79 commit 2072a20
Showing 1 changed file with 32 additions and 64 deletions.
96 changes: 32 additions & 64 deletions jpos/src/main/java/org/jpos/transaction/TransactionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.jpos.iso.ISOUtil;
import org.jpos.util.Metrics;
Expand All @@ -63,12 +66,9 @@ public class TransactionManager
extends QBeanSupport
implements Runnable, TransactionConstants, TransactionManagerMBean, Loggeable, MetricsProvider
{
public static final String HEAD = "$HEAD";
public static final String TAIL = "$TAIL";
public static final String CONTEXT = "$CONTEXT.";
public static final String STATE = "$STATE.";
public static final String GROUPS = "$GROUPS.";
public static final String TAILLOCK = "$TAILLOCK";
public static final String RETRY_QUEUE = "$RETRY_QUEUE";
public static final Integer PREPARING = 0;
public static final Integer COMMITTING = 1;
Expand All @@ -89,7 +89,7 @@ public class TransactionManager
private Space<String,Object> isp; // real input space
private Space<String,Object> iisp; // internal input space
private String queue;
private String tailLock;
private Lock tailLock = new ReentrantLock();
private final List<TransactionStatusListener> statusListeners = new ArrayList<>();
private boolean hasStatusListeners;
private boolean doRecover;
Expand All @@ -102,7 +102,9 @@ public class TransactionManager
private final AtomicInteger activeSessions = new AtomicInteger();
private final AtomicInteger pausedSessions = new AtomicInteger();

private volatile long head, tail;
private final AtomicLong head = new AtomicLong();
private final AtomicLong tail = new AtomicLong();

private long retryInterval = 5000L;
private long retryTimeout = 60000L;
private long pauseTimeout = 60000L;
Expand All @@ -128,10 +130,8 @@ public void initService () throws ConfigurationException {
psp = SpaceFactory.getSpace (cfg.get ("persistent-space", this.toString()));
doRecover = cfg.getBoolean ("recover", psp instanceof PersistentSpace);

tail = initCounter (TAIL, cfg.getLong ("initial-tail", 1));
head = Math.max (initCounter (HEAD, tail), tail);
initTailLock ();

tail.set(cfg.getLong ("initial-tail", 1));
head.set(tail.get());
groups = new HashMap<>();
initParticipants (getPersist());
initStatusListeners (getPersist());
Expand Down Expand Up @@ -257,7 +257,7 @@ private void runTransaction (Serializable context, int session) {
evt = null;
thread.setName (getName() + "-" + session + ":idle");
int action = -1;
id = nextId ();
id = head.incrementAndGet ();
TMEvent tme = new TMEvent(getName(), id);
Txn txn = new Txn(getName(), id);

Expand Down Expand Up @@ -300,7 +300,7 @@ private void runTransaction (Serializable context, int session) {
break;
}
snapshot (id, null, DONE);
if (id == tail) {
if (id == tail.get()) {
checkTail ();
} else {
purge (id, false);
Expand Down Expand Up @@ -353,16 +353,16 @@ private void runTransaction (Serializable context, int session) {

@Override
public long getTail () {
return tail;
return tail.get();
}

@Override
public long getHead () {
return head;
return head.get();
}

public long getInTransit () {
return head - tail;
return head.get() - tail.get();
}

@Override
Expand Down Expand Up @@ -802,14 +802,6 @@ protected String getKey (String prefix, long id) {
sb.append (id);
return sb.toString ();
}
protected long initCounter (String name, long defValue) {
Long L = (Long) psp.rdp (name);
if (L == null) {
L = defValue;
psp.out (name, L);
}
return L;
}
protected void commitOff (Space sp) {
if (sp instanceof JDBMSpace jsp) {
jsp.setAutoCommit(false);
Expand All @@ -821,46 +813,24 @@ protected void commitOn (Space sp) {
jsp.setAutoCommit(true);
}
}
protected void syncTail () {
synchronized (psp) {
commitOff (psp);
psp.inp (TAIL);
psp.out (TAIL, tail);
commitOn (psp);
}
}
protected void initTailLock () {
tailLock = TAILLOCK + "." + this.hashCode();
sp.put (tailLock, TAILLOCK);
}
protected void checkTail () {
Object lock = sp.in (tailLock);
while (tailDone()) {
tail++;
Thread.yield();
tailLock.lock();
try {
while (tailDone()) {
tail.incrementAndGet();
}
} finally {
tailLock.unlock();
}
syncTail ();
sp.out(tailLock, lock);
}
protected boolean tailDone () {
String stateKey = getKey(STATE, tail);
String stateKey = getKey(STATE, tail.get());
if (DONE.equals (psp.rdp (stateKey))) {
purge (tail, true);
purge (tail.get(), true);
return true;
}
return false;
}
protected long nextId () {
long h;
synchronized (psp) {
commitOff (psp);
psp.in (HEAD);
h = head;
psp.out (HEAD, ++head);
commitOn (psp);
}
return h;
}
protected void snapshot (long id, Serializable context) {
snapshot (id, context, null);
}
Expand Down Expand Up @@ -913,17 +883,15 @@ protected void purge (long id, boolean full) {

protected void recover () {
if (doRecover) {
if (tail < head) {
getLog().info ("recover - tail=" +tail+", head="+head);
if (tail.get() < head.get()) {
getLog().info ("recover - tail=" +tail.get()+", head="+head.get());
}
while (tail < head) {
recover (0, tail++);
while (tail.get() < head.get()) {
recover (tail.getAndIncrement());
}
} else
tail = head;
syncTail ();
}
}
protected void recover (int session, long id) {
protected void recover (long id) {
LogEvent evt = getLog().createLogEvent ("recover");
Profiler prof = new Profiler();
evt.addMessage ("<id>" + id + "</id>");
Expand All @@ -943,9 +911,9 @@ protected void recover (int session, long id) {
if (DONE.equals (state)) {
evt.addMessage ("<done/>");
} else if (COMMITTING.equals (state)) {
commit (session, id, context, getParticipants (id), true, evt, prof);
commit (0, id, context, getParticipants (id), true, evt, prof);
} else if (PREPARING.equals (state)) {
abort (session, id, context, getParticipants (id), true, evt, prof);
abort (0, id, context, getParticipants (id), true, evt, prof);
}
purge (id, true);
} finally {
Expand Down Expand Up @@ -1094,7 +1062,7 @@ private ParticipantParams getParams (TransactionParticipant p) {

private String tmInfo() {
return String.format ("in-transit=%d, head=%d, tail=%d, paused=%d, outstanding=%d, active-sessions=%d/%d%s",
getInTransit(), head, tail, pausedSessions.get(), getOutstandingTransactions(),
getInTransit(), head.get(), tail.get(), pausedSessions.get(), getOutstandingTransactions(),
getActiveSessions(), maxSessions,
(tps != null ? ", " + tps : "")
);
Expand Down

0 comments on commit 2072a20

Please sign in to comment.