Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed deadlock in SMPPSession/AbstractSession close methods #38

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 41 additions & 19 deletions jsmpp/src/main/java/org/jsmpp/session/AbstractSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,25 +200,32 @@ public DataSmResult dataShortMessage(String serviceType,
}

public void close() {
logger.info("AbstractSession.close() called");
SessionContext ctx = sessionContext();
if (!ctx.getSessionState().equals(SessionState.CLOSED)) {
ctx.close();
try {
connection().close();
} catch (IOException e) {
}
}

logger.info("AbstractSession.close() called");
SessionContext ctx = sessionContext();
if (!ctx.getSessionState().equals(SessionState.CLOSED)) {
ctx.close();
try {
if(enquireLinkSender != null) {
enquireLinkSender.join();
}
} catch (InterruptedException e) {
logger.warn("interrupted while waiting for enquireLinkSender thread to exit");
}
connection().close();
} catch (IOException e) {
}
}

// Make sure the enquireLinkThread doesn't wait for itself
if (Thread.currentThread() != enquireLinkSender)
{
if (enquireLinkSender != null)
{
try {
enquireLinkSender.join();
} catch (InterruptedException e) {
logger.warn("interrupted while waiting for enquireLinkSender thread to exit");
}
}
}

logger.info("AbstractSession.close() done");
}

/**
* Validate the response, the command_status should be 0 otherwise will
* throw {@link NegativeResponseException}.
Expand Down Expand Up @@ -262,11 +269,21 @@ protected Command executeSendCommand(SendCommandTask task, long timeout)
pendingResponse.put(seqNum, pendingResp);
try {
task.executeTask(connection().getOutputStream(), seqNum);
} catch (IOException e) {
logger.error("Failed sending " + task.getCommandName() + " command", e);
}
catch (IOException e)
{
logger.error("Failed sending " + task.getCommandName() + " command", e);

if(task.getCommandName().equals("enquire_link"))
{
logger.info("Tomas: Ignore failure of sending enquire_link, wait to see if connection is restored");
}
else
{
pendingResponse.remove(seqNum);
close();
throw e;
}
}

try {
Expand Down Expand Up @@ -334,6 +351,8 @@ private void unbind() throws ResponseTimeoutException,
}

public void unbindAndClose() {

logger.info("unbindAndClose() called");
if (sessionContext().getSessionState().isBound()) {
try {
unbind();
Expand Down Expand Up @@ -418,11 +437,14 @@ public void run() {
try {
sendEnquireLink();
} catch (ResponseTimeoutException e) {
logger.error("EnquireLinkSender.run() ResponseTimeoutException", e);
close();
} catch (InvalidResponseException e) {
logger.error("EnquireLinkSender.run() InvalidResponseException", e);
// lets unbind gracefully
unbindAndClose();
} catch (IOException e) {
logger.error("EnquireLinkSender.run() IOException", e);
close();
}
}
Expand Down
36 changes: 20 additions & 16 deletions jsmpp/src/main/java/org/jsmpp/session/SMPPSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public class SMPPSession extends AbstractSession implements ClientSession {
private MessageReceiverListener messageReceiverListener;
private BoundSessionStateListener sessionStateListener = new BoundSessionStateListener();
private SMPPSessionContext sessionContext = new SMPPSessionContext(this, sessionStateListener);

/**
* Default constructor of {@link SMPPSession}. The next action might be
* connect and bind to a destination message center.
Expand Down Expand Up @@ -269,7 +269,7 @@ public String connectAndBind(String host, int port,
* @param bindType is the bind type.
* @param systemId is the system id.
* @param password is the password.
* @param systemTypeis the system type.
* @param systemType is the system type.
* @param interfaceVersion is the interface version.
* @param addrTon is the address TON.
* @param addrNpi is the address NPI.
Expand Down Expand Up @@ -454,22 +454,26 @@ protected GenericMessageReceiverListener messageReceiverListener() {
@Override
public void close()
{
super.close();
super.close();

if(Thread.currentThread() != pduReaderWorker) {
try {
if(pduReaderWorker != null) {
pduReaderWorker.join();
}
} catch (InterruptedException e) {
logger.warn("Interrupted while waiting for pduReaderWorker thread to exit");
}
}
// Moved all cleanup handling to superclass. This code may cause a deadlock because
// PDUReaderWorker waits for EnquireLinkSender and visa versa
// if(Thread.currentThread() != pduReaderWorker) {
// try {
// if(pduReaderWorker != null) {
// logger.trace("Try to join pduReaderWorker thread");
// pduReaderWorker.join();
// logger.trace("Joined");
// }
// } catch (InterruptedException e) {
// logger.warn("Interrupted while waiting for pduReaderWorker thread to exit");
// }
// }
}

@Override
protected void finalize() throws Throwable {
close();
close();
}

private void fireAcceptDeliverSm(DeliverSm deliverSm) throws ProcessRequestException {
Expand Down Expand Up @@ -611,8 +615,8 @@ private void readPDU() {
try {
Command pduHeader = null;
byte[] pdu = null;
pduHeader = pduReader.readPDUHeader(in);

pduHeader = pduReader.readPDUHeader(in);
pdu = pduReader.readPDU(in, pduHeader);

/*
Expand All @@ -624,7 +628,7 @@ private void readPDU() {
sessionContext, responseHandler,
sessionContext, onIOExceptionTask);
executorService.execute(task);

} catch (InvalidCommandLengthException e) {
logger.warn("Receive invalid command length", e);
try {
Expand Down