Skip to content

Commit

Permalink
Implement queue capacity with default 100 include throttle response, …
Browse files Browse the repository at this point in the history
…handle bound state inside the handler of the bind request.
  • Loading branch information
pmoerenhout committed Jun 10, 2020
1 parent 3ce86d4 commit 1a8e98f
Show file tree
Hide file tree
Showing 30 changed files with 687 additions and 306 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,14 @@ public static void main(String[] args) {
"test".equals(request.getPassword())) {

// accepting request and send bind response immediately
LOGGER.info("Accepting bind request, interface version is " + request.getInterfaceVersion());
LOGGER.info("Accepting bind request, interface version is {}", request.getInterfaceVersion());
request.accept("sys");

try { Thread.sleep(20000); } catch (InterruptedException e) {}
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
LOGGER.info("Rejecting bind request");
request.reject(SMPPConstant.STAT_ESME_RINVPASWD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public class OpenAndBindExample {
public static void main(String[] args) throws Exception {

boolean useSsl = true;

String host = "localhost";
int port = 8056;
/*
Expand All @@ -49,7 +48,7 @@ public static void main(String[] args) throws Exception {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
LOGGER.debug("Interrupted");
Thread.currentThread().interrupt();
}
} catch (IOException e) {
// Failed connect and bind to SMSC
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package org.jsmpp.examples;

Expand Down Expand Up @@ -112,7 +112,12 @@ public void onAcceptReplaceSm(ReplaceSm replaceSm,
LOGGER.info("Accepting bind request");
request.accept("sys");

try { Thread.sleep(20000); } catch (InterruptedException e) {}
try {
Thread.sleep(20000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
LOGGER.info("Rejecting bind request");
request.reject(SMPPConstant.STAT_ESME_RINVPASWD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.jsmpp.PDUStringException;
Expand Down Expand Up @@ -103,18 +107,33 @@ public void run() {
LOGGER.info("Accepting connection for session {}", serverSession.getSessionId());
serverSession.setMessageReceiverListener(this);
serverSession.setResponseDeliveryListener(this);
execService.execute(new WaitBindTask(serverSession, systemId, password));
Future<Boolean> bindResult = execService.submit(new WaitBindTask(serverSession, systemId, password));
try {
boolean bound = bindResult.get(60000, TimeUnit.MILLISECONDS);
if (bound) {
// Could start deliver_sm to ESME
LOGGER.info("The session is now in state {}", serverSession.getSessionState());
}
} catch (InterruptedException e){
LOGGER.info("Interrupted WaitBind task: {}", e.getMessage());
} catch (ExecutionException e){
LOGGER.info("Exception on execute WaitBind task: {}", e.getMessage());
} catch (TimeoutException e){
LOGGER.info("Timeout on bind result: {}", e.getMessage());
}
}
} catch (IOException e) {
LOGGER.error("IO error occurred", e);
}
}

@Override
public QuerySmResult onAcceptQuerySm(QuerySm querySm, SMPPServerSession source) throws ProcessRequestException {
LOGGER.info("QuerySm not implemented");
throw new ProcessRequestException(QUERYSM_NOT_IMPLEMENTED, SMPPConstant.STAT_ESME_RINVCMDID);
}

@Override
public MessageId onAcceptSubmitSm(SubmitSm submitSm,
SMPPServerSession source) throws ProcessRequestException {
MessageId messageId = messageIDGenerator.newMessageId();
Expand All @@ -130,6 +149,7 @@ public void onSubmitSmRespSent(MessageId messageId,
LOGGER.debug("submit_sm_resp with message_id {} has been sent", messageId);
}

@Override
public SubmitMultiResult onAcceptSubmitMulti(SubmitMulti submitMulti, SMPPServerSession source)
throws ProcessRequestException {
MessageId messageId = messageIDGenerator.newMessageId();
Expand All @@ -143,25 +163,28 @@ public SubmitMultiResult onAcceptSubmitMulti(SubmitMulti submitMulti, SMPPServer
return new SubmitMultiResult(messageId.getValue(), new UnsuccessDelivery[0]);
}

@Override
public DataSmResult onAcceptDataSm(DataSm dataSm, Session source)
throws ProcessRequestException {
LOGGER.info("Accepting DataSm, but not implemented");
throw new ProcessRequestException(DATASM_NOT_IMPLEMENTED, SMPPConstant.STAT_ESME_RSYSERR);
}

@Override
public void onAcceptCancelSm(CancelSm cancelSm, SMPPServerSession source)
throws ProcessRequestException {
LOGGER.info("Accepting CancelSm, but not implemented");
throw new ProcessRequestException(CANCELSM_NOT_IMPLEMENTED, SMPPConstant.STAT_ESME_RCANCELFAIL);
}

@Override
public void onAcceptReplaceSm(ReplaceSm replaceSm, SMPPServerSession source)
throws ProcessRequestException {
LOGGER.info("AcceptingReplaceSm, but not implemented");
throw new ProcessRequestException(REPLACESM_NOT_IMPLEMENTED, SMPPConstant.STAT_ESME_RREPLACEFAIL);
}

private static class WaitBindTask implements Runnable {
private static class WaitBindTask implements Callable<Boolean> {
private final SMPPServerSession serverSession;
private String systemId;
private String password;
Expand All @@ -172,7 +195,8 @@ public WaitBindTask(SMPPServerSession serverSession, String systemId, String pas
this.password = password;
}

public void run() {
@Override
public Boolean call() {
try {
BindRequest bindRequest = serverSession.waitForBind(5000);
try {
Expand All @@ -182,6 +206,7 @@ public void run() {
LOGGER.info("Accepting bind for session {}, interface version {}", serverSession.getSessionId(), bindRequest.getInterfaceVersion());
// The systemId identifies the SMSC to the ESME.
bindRequest.accept(SMSC_SYSTEMID, InterfaceVersion.IF_34);
return true;
} else {
LOGGER.info("Rejecting bind for session {}, interface version {}, invalid password", serverSession.getSessionId(), bindRequest.getInterfaceVersion());
bindRequest.reject(SMPPConstant.STAT_ESME_RINVPASWD);
Expand All @@ -206,6 +231,7 @@ public void run() {
} catch (IOException e) {
LOGGER.error("Failed accepting bind request for session {}", serverSession.getSessionId());
}
return false;
}
}

Expand Down Expand Up @@ -267,6 +293,7 @@ public DeliveryReceiptTask(SMPPServerSession session,
shortMessage = submitMulti.getShortMessage();
}

@Override
public void run() {
try {
Thread.sleep(1000);
Expand Down
3 changes: 3 additions & 0 deletions jsmpp/src/main/java/org/jsmpp/DefaultPDUReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class DefaultPDUReader implements PDUReader {
/* (non-Javadoc)
* @see org.jsmpp.PDUReader#readPDUHeader(java.io.DataInputStream)
*/
@Override
public Command readPDUHeader(DataInputStream in)
throws InvalidCommandLengthException, IOException {
Command header = new Command();
Expand All @@ -59,6 +60,7 @@ public Command readPDUHeader(DataInputStream in)
/* (non-Javadoc)
* @see org.jsmpp.PDUReader#readPDU(java.io.InputStream, org.jsmpp.bean.Command)
*/
@Override
public byte[] readPDU(DataInputStream in, Command pduHeader) throws IOException {
return readPDU(in, pduHeader.getCommandLength(), pduHeader
.getCommandId(), pduHeader.getCommandStatus(), pduHeader
Expand All @@ -68,6 +70,7 @@ public byte[] readPDU(DataInputStream in, Command pduHeader) throws IOException
/* (non-Javadoc)
* @see org.jsmpp.PDUReader#readPDU(java.io.InputStream, int, int, int, int)
*/
@Override
public byte[] readPDU(DataInputStream in, int commandLength, int commandId,
int commandStatus, int sequenceNumber) throws IOException {

Expand Down
6 changes: 3 additions & 3 deletions jsmpp/src/main/java/org/jsmpp/bean/EnquireLink.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public class EnquireLink extends Command {

public EnquireLink(int sequenceNumber) {
super();
commandLength = 16;
commandId = SMPPConstant.CID_ENQUIRE_LINK;
commandStatus = 0;
this.commandLength = SMPPConstant.PDU_HEADER_LENGTH;
this.commandId = SMPPConstant.CID_ENQUIRE_LINK;
this.commandStatus = SMPPConstant.STAT_ESME_ROK;
this.sequenceNumber = sequenceNumber;
}

Expand Down
1 change: 0 additions & 1 deletion jsmpp/src/main/java/org/jsmpp/extra/PendingResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ public void waitDone() throws ResponseTimeoutException,
condition.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted");
}
}

Expand Down
2 changes: 1 addition & 1 deletion jsmpp/src/main/java/org/jsmpp/extra/QueueException.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* @since 1.0
*
*/
public class QueueException extends Exception {
public class QueueException extends RuntimeException {
private static final long serialVersionUID = -8946319349013591134L;

/**
Expand Down
32 changes: 30 additions & 2 deletions jsmpp/src/main/java/org/jsmpp/session/AbstractSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ public abstract class AbstractSession implements Session {
private final Sequence sequence = new Sequence(1);
private final PDUSender pduSender;
private int pduProcessorDegree = 3;
private int queueCapacity = 100;

private String sessionId = generateSessionId();
private int enquireLinkTimer = 5000;
private int enquireLinkTimer = 60000;
private long transactionTimer = 2000;

protected EnquireLinkSender enquireLinkSender;
Expand All @@ -83,10 +84,12 @@ protected PendingResponse<Command> removePendingResponse(int sequenceNumber) {
return pendingResponse.remove(sequenceNumber);
}

@Override
public String getSessionId() {
return sessionId;
}

@Override
public void setEnquireLinkTimer(int enquireLinkTimer) {
if (sessionContext().getSessionState().isBound()) {
try {
Expand All @@ -98,18 +101,22 @@ public void setEnquireLinkTimer(int enquireLinkTimer) {
this.enquireLinkTimer = enquireLinkTimer;
}

@Override
public int getEnquireLinkTimer() {
return enquireLinkTimer;
}

@Override
public void setTransactionTimer(long transactionTimer) {
this.transactionTimer = transactionTimer;
}

@Override
public long getTransactionTimer() {
return transactionTimer;
}

@Override
public SessionState getSessionState() {
return sessionContext().getSessionState();
}
Expand All @@ -119,16 +126,19 @@ protected synchronized boolean isReadPdu() {
return sessionState.isBound() || sessionState.equals(SessionState.OPEN) || sessionState.equals(SessionState.OUTBOUND);
}

@Override
public void addSessionStateListener(SessionStateListener listener) {
if (listener != null) {
sessionContext().addSessionStateListener(listener);
}
}

@Override
public void removeSessionStateListener(SessionStateListener listener) {
sessionContext().removeSessionStateListener(listener);
}

@Override
public long getLastActivityTimestamp() {
return sessionContext().getLastActivityTimestamp();
}
Expand Down Expand Up @@ -159,6 +169,22 @@ public int getPduProcessorDegree() {
return pduProcessorDegree;
}

/**
* Get the capacity of the working queue for PDU processing.
*
* @return the ThreadPoolExecutor queue capacity.
*/
public int getQueueCapacity() {
return queueCapacity;
}

/**
* Set the capacity of the working queue for PDU processing.
*/
public void setQueueCapacity(final int queueCapacity) {
this.queueCapacity = queueCapacity;
}

/**
* Send the data_sm command.
*
Expand Down Expand Up @@ -200,6 +226,7 @@ public DataSmResult dataShortMessage(String serviceType,
return new DataSmResult(resp.getMessageId(), resp.getOptionalParameters());
}

@Override
public void close() {
logger.debug("Close session {} in state {}", sessionId, getSessionState());
SessionContext ctx = sessionContext();
Expand Down Expand Up @@ -384,6 +411,7 @@ public void unbind() throws ResponseTimeoutException,
}
}

@Override
public void unbindAndClose() {
logger.debug("Unbind and close session {}", sessionId);
if (sessionContext().getSessionState().isBound()) {
Expand Down Expand Up @@ -449,7 +477,7 @@ protected class EnquireLinkSender extends Thread {

public EnquireLinkSender()
{
super("EnquireLinkSender: " + AbstractSession.this);
super("EnquireLinkSender-" + sessionId);
}

@Override
Expand Down
Loading

0 comments on commit 1a8e98f

Please sign in to comment.