diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java b/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java index 77cbebb75..a8f369643 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java @@ -29,6 +29,8 @@ import org.apache.synapse.message.store.InMemoryMessageStore; import org.apache.synapse.message.store.MessageStore; import org.apache.axis2.util.JavaUtils; +import org.apache.synapse.util.xpath.SynapseXPath; +import org.jaxen.JaxenException; import javax.xml.XMLConstants; @@ -56,6 +58,7 @@ public class MessageStoreFactory { public static final QName CLASS_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "class"); public static final QName NAME_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "name"); public static final QName SEQUENCE_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "sequence"); + public static final QName EXPRESSION_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "expression"); public static final QName PARAMETER_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "parameter"); @@ -98,7 +101,6 @@ public static MessageStore createMessageStore(OMElement elem, Properties propert messageStore.setParameters(getParameters(elem)); - log.info("Successfully created Message Store: " + nameAtt.getAttributeValue()); return messageStore; } @@ -118,6 +120,7 @@ private static Map getParameters(OMElement elem) { if (paramValue != null) { parameters.put(paramName.getAttributeValue(), paramValue); } + processExpressionIfExist(prop, parameters); } else { handleException("Invalid MessageStore parameter - Parameter must have a name "); } @@ -126,6 +129,45 @@ private static Map getParameters(OMElement elem) { return parameters; } + /** + * If an expression is defined in property, it'll be extracted and populated + * + * @param prop xml element read from the synapse parameter. This should be a != null value. + * @param params list of processed params from the XML. + * @return true if an expression is processed + */ + private static boolean processExpressionIfExist(OMElement prop, Map params) { + try { + OMAttribute expression = prop.getAttribute(EXPRESSION_Q); + if (null != expression) { + SynapseXPath synapseXPath = SynapseXPathFactory.getSynapseXPath(prop, EXPRESSION_Q); + registerParameter(params, prop.getAttribute(NAME_Q), synapseXPath); + return true; + } + } catch (JaxenException e) { + handleException("Error while extracting parameter : " + e.getMessage()); + } + return false; + } + + /** + * Register the extracted parameter in the list. + * + * @param parameters the list of parameters which should be registered. + * @param paramName the name of the parameter. + * @param paramValue the value of the parameter. + * @param the type of the parameter. + */ + private static void registerParameter(Map parameters, + OMAttribute paramName, + T paramValue) { + if (paramName != null && paramValue != null) { + parameters.put(paramName.getAttributeValue(), paramValue); + } else { + handleException("Invalid MessageStore parameter - Parameter must have a name "); + } + } + private static void handleException(String msg) { log.error(msg); throw new SynapseException(msg); diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreSerializer.java b/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreSerializer.java index 1a1a1b436..153ee8265 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreSerializer.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreSerializer.java @@ -29,9 +29,12 @@ import org.apache.synapse.SynapseException; import org.apache.synapse.message.store.InMemoryMessageStore; import org.apache.synapse.message.store.MessageStore; +import org.apache.synapse.util.xpath.SynapseXPath; import javax.xml.namespace.QName; import java.util.Iterator; +import java.util.Map; +import java.util.Set; /** * Serialize an instance of the given Message Store, and sets properties on it. @@ -74,16 +77,11 @@ public static OMElement serializeMessageStore(OMElement parent, MessageStore mes Iterator iter = messageStore.getParameters().keySet().iterator(); while (iter.hasNext()) { String name = (String) iter.next(); - String value = (String) messageStore.getParameters().get(name); - OMElement property = fac.createOMElement("parameter", synNS); - property.addAttribute(fac.createOMAttribute( - "name", nullNS, name)); - property.setText(value.trim()); + OMElement property = getParameter(messageStore, name); store.addChild(property); } } - if (getSerializedDescription(messageStore) != null) { store.addChild(getSerializedDescription(messageStore)); } @@ -94,6 +92,39 @@ public static OMElement serializeMessageStore(OMElement parent, MessageStore mes return store; } + + /** + * Will get the parameter OMElement. + * + * @param messageStore the message store definition metadata. + * @param name the parameter key. + * @return the parameter OMElement. + */ + private static OMElement getParameter(MessageStore messageStore, String name) { + Object paramValue = messageStore.getParameters().get(name); + OMElement property = null; + if (paramValue instanceof String) { + String value = (String) paramValue; + property = fac.createOMElement("parameter", synNS); + property.addAttribute(fac.createOMAttribute("name", nullNS, name)); + property.setText(value.trim()); + } else if (paramValue instanceof SynapseXPath) { + SynapseXPath value = (SynapseXPath) paramValue; + String expression = value.toString(); + Map namespaces = value.getNamespaces(); + property = fac.createOMElement("parameter", synNS); + property.addAttribute(fac.createOMAttribute("name", nullNS, name)); + property.addAttribute(fac.createOMAttribute("expression", nullNS, expression)); + Set> nameSpaceAttributes = namespaces.entrySet(); + for (Map.Entry nameSpaceElement : nameSpaceAttributes) { + String prefix = nameSpaceElement.getKey(); + String uri = nameSpaceElement.getValue(); + property.declareNamespace(uri, prefix); + } + } + return property; + } + private static OMElement getSerializedDescription(MessageStore messageStore) { OMElement descriptionElem = fac.createOMElement( new QName(SynapseConstants.SYNAPSE_NAMESPACE, "description")); diff --git a/modules/core/src/main/java/org/apache/synapse/message/MessageConsumer.java b/modules/core/src/main/java/org/apache/synapse/message/MessageConsumer.java new file mode 100644 index 000000000..b6f986d39 --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/message/MessageConsumer.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.message; + +import org.apache.synapse.MessageContext; + +public interface MessageConsumer { + /** + * Receives the next message from the store. + * + * @return Synapse message context of the last message received from the store. + */ + MessageContext receive(); + + /** + * Acknowledges the last message received so that it will be removed from the store. + * + * @return {@code true} if the acknowledgement is successful. {@code false} otherwise. + */ + boolean ack(); + + /** + * Cleans up this message consumer + * + * @return {@code true} if cleanup is successful, {@code false} otherwise. + */ + boolean cleanup(); + + /** + * Check availability of connectivity with the message store + * + * @return {@code true} if connection available, {@code false} otherwise. + */ + boolean isAlive(); + + /** + * Sets the ID of this message consumer. + * + * @param i ID + */ + public void setId(int i); + + /** + * Returns the ID of this Message consumer. + * + * @return ID + */ + public String getId(); +} diff --git a/modules/core/src/main/java/org/apache/synapse/message/MessageProducer.java b/modules/core/src/main/java/org/apache/synapse/message/MessageProducer.java new file mode 100644 index 000000000..662c160e5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/message/MessageProducer.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.message; + +import org.apache.synapse.MessageContext; + +public interface MessageProducer { + /** + * Stores the given message to the store associated with this message consumer. + * + * @param synCtx Message to be saved. + * @return {@code true} if storing of the message is successful, {@code false} otherwise. + */ + boolean storeMessage(MessageContext synCtx); + + /** + * Cleans up this message consumer + * + * @return {@code true} if clean up is successful, {@code false} otherwise. + */ + boolean cleanup(); + + /** + * Sets the ID of this message consumer. + * + * @param id ID + */ + public void setId(int id); + + /** + * Returns the ID of this message consumer. + * + * @return ID + */ + public String getId(); +} diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java b/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java index 179f2c84f..428239e31 100644 --- a/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java +++ b/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java @@ -19,6 +19,8 @@ package org.apache.synapse.message.store; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.synapse.commons.jmx.MBeanRegistrar; import org.apache.synapse.config.SynapseConfiguration; import org.apache.synapse.core.SynapseEnvironment; @@ -26,6 +28,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -59,7 +63,7 @@ public abstract class AbstractMessageStore implements MessageStore { /** * Message store parameters */ - protected Map parameters; + protected Map parameters; /** * Message Store description @@ -71,6 +75,30 @@ public abstract class AbstractMessageStore implements MessageStore { */ protected String fileName; + /** + * Identify whether a given message is enqueued from store + */ + private AtomicLong enqueued = new AtomicLong(0); + + /** + * Identify whether a given message is dequeued from store + */ + private AtomicLong dequeued = new AtomicLong(0); + + private final Object messageCountLock = new Object(); + + private static final long maxEnDequeuable = Long.MAX_VALUE; + + private AtomicInteger producerId = new AtomicInteger(0); + /** + * Message consumer id + */ + private AtomicInteger consumerId = new AtomicInteger(0); + + private int maxProducerId = Integer.MAX_VALUE; + + private static final Log log = LogFactory.getLog(AbstractMessageStore.class); + /** * List that holds the MessageStore observers registered with the Message Store */ @@ -80,7 +108,6 @@ public abstract class AbstractMessageStore implements MessageStore { protected Lock lock = new ReentrantLock(); - @Override public void init(SynapseEnvironment se) { this.synapseEnvironment = se; @@ -103,37 +130,69 @@ public void setName(String name) { @Override public void registerObserver(MessageStoreObserver observer) { - if(observer != null && !messageStoreObservers.contains(observer)) { + if (observer != null && !messageStoreObservers.contains(observer)) { messageStoreObservers.add(observer); } } @Override public void unregisterObserver(MessageStoreObserver observer) { - if(observer != null && messageStoreObservers.contains(observer)) { + if (observer != null && messageStoreObservers.contains(observer)) { messageStoreObservers.remove(observer); } } /** * Notify Message Addition to the observers + * * @param messageId of the Message added. */ protected void notifyMessageAddition(String messageId) { - for(MessageStoreObserver o : messageStoreObservers) { + for (MessageStoreObserver o : messageStoreObservers) { o.messageAdded(messageId); } } /** * Notify Message removal to the observers + * * @param messageId of the Message added */ protected void notifyMessageRemoval(String messageId) { - for(MessageStoreObserver o : messageStoreObservers) { + for (MessageStoreObserver o : messageStoreObservers) { o.messageRemoved(messageId); } } + + public int nextProducerId() { + int id = producerId.incrementAndGet(); + if (id == maxProducerId) { + log.info("Setting producer ID generator to 0..."); + producerId.set(0); + id = producerId.incrementAndGet(); + } + return id; + } + + public int nextConsumerId() { + int id = consumerId.incrementAndGet(); + return id; + } + + public void enqueued() { + synchronized (messageCountLock) { + enqueued.compareAndSet(maxEnDequeuable, 0); + enqueued.incrementAndGet(); + } + } + + public void dequeued() { + synchronized (messageCountLock) { + dequeued.compareAndSet(maxEnDequeuable, 0); + dequeued.incrementAndGet(); + } + } + @Override public int size() { return -1; diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java b/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java index 2203a5a9a..0203c1687 100644 --- a/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java +++ b/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java @@ -22,6 +22,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.synapse.MessageContext; +import org.apache.synapse.message.MessageConsumer; +import org.apache.synapse.message.MessageProducer; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; @@ -35,11 +37,23 @@ public class InMemoryMessageStore extends AbstractMessageStore { private static final Log log = LogFactory.getLog(InMemoryMessageStore.class); - /** The map that keeps the stored messages */ + /** + * The map that keeps the stored messages + */ private Queue messageList = new ConcurrentLinkedQueue(); private Lock lock = new ReentrantLock(); + @Override + public MessageProducer getProducer() { + throw new UnsupportedOperationException(); + } + + @Override + public MessageConsumer getConsumer() { + throw new UnsupportedOperationException(); + } + @Override public boolean offer(MessageContext messageContext) { lock.lock(); @@ -77,7 +91,7 @@ public MessageContext poll() { @Override public MessageContext peek() { - return messageList.peek(); + return messageList.peek(); } @Override diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java b/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java index faad52280..72b39fc45 100644 --- a/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java +++ b/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java @@ -24,6 +24,8 @@ import org.apache.synapse.config.SynapseConfiguration; import org.apache.synapse.SynapseArtifact; import org.apache.synapse.Nameable; +import org.apache.synapse.message.MessageConsumer; +import org.apache.synapse.message.MessageProducer; import org.apache.synapse.message.processors.MessageProcessor; import java.util.List; @@ -35,8 +37,17 @@ * Message Store is used to store Messages. */ public interface MessageStore extends SynapseArtifact, Nameable, ManagedLifecycle { + /** + * Returns a Message Producer for this message store.
+ * @return A non-null message producer that can produce messages to this message store. + */ + MessageProducer getProducer(); - + /** + * Returns a Message Consumer for this message store.
+ * @return A non-null message consumer that can read messages from this message store.
+ */ + MessageConsumer getConsumer(); /** * Inserts the Message into this store if it is possible to do so immediately diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/impl/commons/Axis2Message.java b/modules/core/src/main/java/org/apache/synapse/message/store/impl/commons/Axis2Message.java new file mode 100644 index 000000000..c31d052a5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/message/store/impl/commons/Axis2Message.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.message.store.impl.commons; + +import javax.xml.namespace.QName; +import java.io.Serializable; +import java.util.HashMap; + +/** + * This class serves as a container for the Axis2 Message Context parameters/properties + * , and it will be saved as a message in the Store. + */ +public class Axis2Message implements Serializable { + private String messageID; + + private String operationAction; + + private QName operationName; + + private String action; + + private String service; + + private String relatesToMessageId; + + private String replyToAddress; + + private String faultToAddress; + + private String fromAddress; + + private String toAddress; + + private String transportInName; + + private String transportOutName; + + private boolean isDoingMTOM; + + private boolean isDoingSWA; + + private boolean isDoingPOX; + + private boolean isDoingGET; + + private String soapEnvelope; + + private byte[] jsonStream; + + private int FLOW; + + private HashMap properties = new HashMap(); + + + public String getMessageID() { + return messageID; + } + + public void setMessageID(String messageID) { + this.messageID = messageID; + } + + public String getOperationAction() { + return operationAction; + } + + public void setOperationAction(String operationAction) { + this.operationAction = operationAction; + } + + public QName getOperationName() { + return operationName; + } + + public void setOperationName(QName operationName) { + this.operationName = operationName; + } + + public String getAction() { + return action; + } + + public void setAction(String action) { + this.action = action; + } + + public String getService() { + return service; + } + + public void setService(String service) { + this.service = service; + } + + public String getRelatesToMessageId() { + return relatesToMessageId; + } + + public void setRelatesToMessageId(String relatesToMessageId) { + this.relatesToMessageId = relatesToMessageId; + } + + public String getReplyToAddress() { + return replyToAddress; + } + + public void setReplyToAddress(String replyToAddress) { + this.replyToAddress = replyToAddress; + } + + public String getSoapEnvelope() { + return soapEnvelope; + } + + public void setJsonStream(byte[] jsonStream) { + this.jsonStream = jsonStream; + } + + public byte[] getJsonStream() { + return this.jsonStream; + } + + public void setSoapEnvelope(String soapEnvelope) { + this.soapEnvelope = soapEnvelope; + } + + public int getFLOW() { + return FLOW; + } + + public void setFLOW(int FLOW) { + this.FLOW = FLOW; + } + + public String getFaultToAddress() { + return faultToAddress; + } + + public void setFaultToAddress(String faultToAddress) { + this.faultToAddress = faultToAddress; + } + + public String getFromAddress() { + return fromAddress; + } + + public void setFromAddress(String fromAddress) { + this.fromAddress = fromAddress; + } + + public String getToAddress() { + return toAddress; + } + + public void setToAddress(String toAddress) { + this.toAddress = toAddress; + } + + public boolean isDoingMTOM() { + return isDoingMTOM; + } + + public void setDoingMTOM(boolean doingMTOM) { + isDoingMTOM = doingMTOM; + } + + public boolean isDoingSWA() { + return isDoingSWA; + } + + public void setDoingSWA(boolean doingSWA) { + isDoingSWA = doingSWA; + } + + public boolean isDoingPOX() { + return isDoingPOX; + } + + public void setDoingPOX(boolean doingPOX) { + isDoingPOX = doingPOX; + } + + public boolean isDoingGET() { + return isDoingGET; + } + + public void setDoingGET(boolean doingGET) { + isDoingGET = doingGET; + } + + public void addProperty(String name, Object obj) { + properties.put(name, obj); + } + + public HashMap getProperties() { + return properties; + } + + public String getTransportInName() { + return transportInName; + } + + public void setTransportInName(String transportInName) { + this.transportInName = transportInName; + } + + public String getTransportOutName() { + return transportOutName; + } + + public void setTransportOutName(String transportOutName) { + this.transportOutName = transportOutName; + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/impl/commons/MessageConverter.java b/modules/core/src/main/java/org/apache/synapse/message/store/impl/commons/MessageConverter.java new file mode 100644 index 000000000..6cecdd8f2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/message/store/impl/commons/MessageConverter.java @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.message.store.impl.commons; + +import org.apache.axiom.om.OMAbstractFactory; +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.impl.builder.StAXBuilder; +import org.apache.axiom.om.impl.builder.StAXOMBuilder; +import org.apache.axiom.om.util.StAXUtils; +import org.apache.axiom.soap.SOAP12Constants; +import org.apache.axiom.soap.SOAPEnvelope; +import org.apache.axiom.soap.SOAPFactory; +import org.apache.axiom.soap.impl.builder.StAXSOAPModelBuilder; +import org.apache.axis2.addressing.EndpointReference; +import org.apache.axis2.addressing.RelatesTo; +import org.apache.axis2.context.ConfigurationContext; +import org.apache.axis2.context.OperationContext; +import org.apache.axis2.context.ServiceContext; +import org.apache.axis2.context.ServiceGroupContext; +import org.apache.axis2.description.AxisOperation; +import org.apache.axis2.description.AxisService; +import org.apache.axis2.engine.AxisConfiguration; +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.synapse.MessageContext; +import org.apache.synapse.SynapseException; +import org.apache.synapse.core.axis2.Axis2MessageContext; +import org.apache.synapse.util.UUIDGenerator; + +import java.io.ByteArrayInputStream; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; + +public final class MessageConverter { + private static final String ABSTRACT_MC_PROPERTIES = "ABSTRACT_MC_PROPERTIES"; + + private static final String JMS_PRIORITY = "JMS_PRIORITY"; + + //Prefix to identify a OMElemet type property + private static final String OM_ELEMENT_PREFIX = "OM_ELEMENT_PREFIX_"; + + private static final Log logger = LogFactory.getLog(MessageConverter.class.getName()); + + private MessageConverter() { + } + + /** + * Converts a message read from the message store to a Synapse Message Context object. + * + * @param message Message from the message store + * @param axis2Ctx Final Axis2 Message Context + * @param synCtx Final Synapse message Context + * @return Final Synapse Message Context + */ + public static MessageContext toMessageContext(StorableMessage message, + org.apache.axis2.context.MessageContext axis2Ctx, + MessageContext synCtx) { + if (message == null) { + logger.error("Cannot create Message Context. Message is null."); + return null; + } + + AxisConfiguration axisConfig = axis2Ctx.getConfigurationContext().getAxisConfiguration(); + if (axisConfig == null) { + logger.warn("Cannot create AxisConfiguration. AxisConfiguration is null."); + return null; + } + Axis2Message axis2Msg = message.getAxis2message(); + try { + SOAPEnvelope envelope = getSoapEnvelope(axis2Msg.getSoapEnvelope()); + axis2Ctx.setEnvelope(envelope); + // set the RMSMessageDto properties + axis2Ctx.getOptions().setAction(axis2Msg.getAction()); + if (axis2Msg.getRelatesToMessageId() != null) { + axis2Ctx.addRelatesTo(new RelatesTo(axis2Msg.getRelatesToMessageId())); + } + axis2Ctx.setMessageID(axis2Msg.getMessageID()); + axis2Ctx.getOptions().setAction(axis2Msg.getAction()); + axis2Ctx.setDoingREST(axis2Msg.isDoingPOX()); + axis2Ctx.setDoingMTOM(axis2Msg.isDoingMTOM()); + axis2Ctx.setDoingSwA(axis2Msg.isDoingSWA()); + AxisService axisService; + if (axis2Msg.getService() != null && + (axisService = axisConfig.getServiceForActivation(axis2Msg.getService())) != null) { + + AxisOperation axisOperation = axisService.getOperation(axis2Msg.getOperationName()); + axis2Ctx.setFLOW(axis2Msg.getFLOW()); + ArrayList executionChain = new ArrayList(); + if (axis2Msg.getFLOW() == org.apache.axis2.context.MessageContext.OUT_FLOW) { + executionChain.addAll(axisOperation.getPhasesOutFlow()); + executionChain.addAll(axisConfig.getOutFlowPhases()); + } else if (axis2Msg.getFLOW() == org.apache.axis2.context.MessageContext.OUT_FAULT_FLOW) { + executionChain.addAll(axisOperation.getPhasesOutFaultFlow()); + executionChain.addAll(axisConfig.getOutFlowPhases()); + } + axis2Ctx.setExecutionChain(executionChain); + ConfigurationContext configurationContext = axis2Ctx.getConfigurationContext(); + axis2Ctx.setAxisService(axisService); + ServiceGroupContext serviceGroupContext = configurationContext + .createServiceGroupContext(axisService.getAxisServiceGroup()); + ServiceContext serviceContext = serviceGroupContext.getServiceContext(axisService); + OperationContext operationContext = serviceContext + .createOperationContext(axis2Msg.getOperationName()); + axis2Ctx.setServiceContext(serviceContext); + axis2Ctx.setOperationContext(operationContext); + axis2Ctx.setAxisService(axisService); + axis2Ctx.setAxisOperation(axisOperation); + } else { + if (logger.isDebugEnabled()) { + logger.debug("Axis2 service is not available. Hence skipping execution of out flow handlers"); + } + } + if (axis2Msg.getReplyToAddress() != null) { + axis2Ctx.setReplyTo(new EndpointReference(axis2Msg.getReplyToAddress().trim())); + } + if (axis2Msg.getFaultToAddress() != null) { + axis2Ctx.setFaultTo(new EndpointReference(axis2Msg.getFaultToAddress().trim())); + } + if (axis2Msg.getFromAddress() != null) { + axis2Ctx.setFrom(new EndpointReference(axis2Msg.getFromAddress().trim())); + } + if (axis2Msg.getToAddress() != null) { + axis2Ctx.getOptions().setTo(new EndpointReference(axis2Msg.getToAddress().trim())); + } + Object map = axis2Msg.getProperties().get(ABSTRACT_MC_PROPERTIES); + axis2Msg.getProperties().remove(ABSTRACT_MC_PROPERTIES); + axis2Ctx.setProperties(axis2Msg.getProperties()); + axis2Ctx.setTransportIn( + axisConfig.getTransportIn(axis2Msg.getTransportInName())); + axis2Ctx.setTransportOut( + axisConfig.getTransportOut(axis2Msg.getTransportOutName())); + Object headers = axis2Msg.getProperties() + .get(org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS); + if (headers instanceof Map) { + setTransportHeaders(axis2Ctx, (Map) headers); + } + if (map instanceof Map) { + Map abstractMCProperties = (Map) map; + Iterator properties = abstractMCProperties.keySet().iterator(); + while (properties.hasNext()) { + String property = properties.next(); + Object value = abstractMCProperties.get(property); + axis2Ctx.setProperty(property, value); + } + } + // axis2Ctx.setEnvelope(envelope); + // XXX: always this section must come after the above step. ie. after applying Envelope. + // That is to get the existing headers into the new envelope. +// if (axis2Msg.getJsonStream() != null) { +// JsonUtil.newJsonPayload(axis2Ctx, +// new ByteArrayInputStream(axis2Msg.getJsonStream()), true, true); +// } + SynapseMessage synMsg = message.getSynapseMessage(); + synCtx.setTracingState(synMsg.getTracingState()); + Iterator properties = synMsg.getProperties().keySet().iterator(); + while (properties.hasNext()) { + String key = properties.next(); + Object value = synMsg.getProperties().get(key); + synCtx.setProperty(key, value); + } + Iterator propertyObjects = synMsg.getPropertyObjects().keySet().iterator(); + while (propertyObjects.hasNext()) { + String key = propertyObjects.next(); + Object value = synMsg.getPropertyObjects().get(key); + if (key.startsWith(OM_ELEMENT_PREFIX)) { + String originalKey = key.substring(OM_ELEMENT_PREFIX.length(), key.length()); + ByteArrayInputStream is = new ByteArrayInputStream((byte[]) value); + StAXOMBuilder builder = new StAXOMBuilder(is); + OMElement omElement = builder.getDocumentElement(); + synCtx.setProperty(originalKey, omElement); + } + } + + synCtx.setFaultResponse(synMsg.isFaultResponse()); + synCtx.setResponse(synMsg.isResponse()); + return synCtx; + } catch (Exception e) { + logger.error("Cannot create Message Context. Error:" + e.getLocalizedMessage(), e); + return null; + } + } + + /** + * Converts a Synapse Message Context to a representation that can be stored in the + * Message store queue. + * + * @param synCtx Source Synapse message context. + * @return Storable representation of the provided message context. + */ + public static StorableMessage toStorableMessage(MessageContext synCtx) { + StorableMessage message = new StorableMessage(); + Axis2Message axis2msg = new Axis2Message(); + SynapseMessage synMsg = new SynapseMessage(); + Axis2MessageContext axis2MessageContext; + + if (synCtx instanceof Axis2MessageContext) { + axis2MessageContext = (Axis2MessageContext) synCtx; + org.apache.axis2.context.MessageContext msgCtx = + axis2MessageContext.getAxis2MessageContext(); + axis2msg.setMessageID(UUIDGenerator.getUUID()); + if (msgCtx.getAxisOperation() != null) { + axis2msg.setOperationAction(msgCtx.getAxisOperation().getSoapAction()); + axis2msg.setOperationName(msgCtx.getAxisOperation().getName()); + } +// if (JsonUtil.hasAJsonPayload(msgCtx)) { +// axis2msg.setJsonStream(JsonUtil.jsonPayloadToByteArray(msgCtx)); +// } + axis2msg.setAction(msgCtx.getOptions().getAction()); + if (msgCtx.getAxisService() != null) { + axis2msg.setService(msgCtx.getAxisService().getName()); + } + if (msgCtx.getRelatesTo() != null) { + axis2msg.setRelatesToMessageId(msgCtx.getRelatesTo().getValue()); + } + if (msgCtx.getReplyTo() != null) { + axis2msg.setReplyToAddress(msgCtx.getReplyTo().getAddress()); + } + if (msgCtx.getFaultTo() != null) { + axis2msg.setFaultToAddress(msgCtx.getFaultTo().getAddress()); + } + if (msgCtx.getTo() != null) { + axis2msg.setToAddress(msgCtx.getTo().getAddress()); + } + axis2msg.setDoingPOX(msgCtx.isDoingREST()); + axis2msg.setDoingMTOM(msgCtx.isDoingMTOM()); + axis2msg.setDoingSWA(msgCtx.isDoingSwA()); + String soapEnvelope = msgCtx.getEnvelope().toString(); + axis2msg.setSoapEnvelope(soapEnvelope); + axis2msg.setFLOW(msgCtx.getFLOW()); + if (msgCtx.getTransportIn() != null) { + axis2msg.setTransportInName(msgCtx.getTransportIn().getName()); + } + if (msgCtx.getTransportOut() != null) { + axis2msg.setTransportOutName(msgCtx.getTransportOut().getName()); + } + Iterator abstractMCProperties = msgCtx.getPropertyNames(); + Map copy = new HashMap(msgCtx.getProperties().size()); + while (abstractMCProperties.hasNext()) { + String propertyName = abstractMCProperties.next(); + Object propertyValue = msgCtx.getProperty(propertyName); + if (propertyValue instanceof String + || propertyValue instanceof Boolean + || propertyValue instanceof Integer + || propertyValue instanceof Double + || propertyValue instanceof Character) { + copy.put(propertyName, propertyValue); + } + if (JMS_PRIORITY.equals(propertyName)) { + if (propertyValue instanceof Integer) { + message.setPriority((Integer) propertyValue); + } else if (propertyValue instanceof String) { + try { + int value = Integer.parseInt((String) propertyValue); + message.setPriority(value); + } catch (NumberFormatException e) { + } + } + } + } + axis2msg.addProperty(ABSTRACT_MC_PROPERTIES, copy); + Map transportHeaders = getTransportHeaders(msgCtx); + axis2msg.addProperty( + org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS, transportHeaders); + Iterator properties = msgCtx.getProperties().keySet().iterator(); + while (properties.hasNext()) { + String key = properties.next(); + Object value = msgCtx.getProperty(key); + if (value instanceof String) { + axis2msg.addProperty(key, value); + } + } + message.setAxis2message(axis2msg); + synMsg.setFaultResponse(synCtx.isFaultResponse()); + synMsg.setTracingState(synCtx.getTracingState()); + synMsg.setResponse(synCtx.isResponse()); + properties = synCtx.getPropertyKeySet().iterator(); + while (properties.hasNext()) { + String key = properties.next(); + Object value = synCtx.getProperty(key); + if (value instanceof String) { + synMsg.addProperty(key, (String) value); + } + if (value instanceof ArrayList && ((ArrayList) value).size() > 0 + && ((ArrayList) value).get(0) instanceof OMElement) { + OMElement elem = ((OMElement) ((ArrayList) value).get(0)); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try { + elem.serialize(bos); + byte[] bytes = bos.toByteArray(); + synMsg.addPropertyObject(OM_ELEMENT_PREFIX + key, bytes); + } catch (XMLStreamException e) { + logger.error("Error while converting OMElement to byte array", e); + } + } + } + message.setSynapseMessage(synMsg); + } else { + throw new SynapseException("Cannot store message to store."); + } + return message; + } + + private static SOAPEnvelope getSoapEnvelope(String soapEnvelpe) { + try { + //This is a temporary fix for ESBJAVA-1157 for Andes based(QPID) Client libraries + //Thread.currentThread().setContextClassLoader(SynapseEnvironment.class.getClassLoader()); + XMLStreamReader xmlReader = StAXUtils + .createXMLStreamReader(new ByteArrayInputStream(getUTF8Bytes(soapEnvelpe))); + StAXBuilder builder = new StAXSOAPModelBuilder(xmlReader); + SOAPEnvelope soapEnvelope = (SOAPEnvelope) builder.getDocumentElement(); + soapEnvelope.build(); + String soapNamespace = soapEnvelope.getNamespace().getNamespaceURI(); + if (soapEnvelope.getHeader() == null) { + SOAPFactory soapFactory; + if (soapNamespace.equals(SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI)) { + soapFactory = OMAbstractFactory.getSOAP12Factory(); + } else { + soapFactory = OMAbstractFactory.getSOAP11Factory(); + } + soapFactory.createSOAPHeader(soapEnvelope); + } + return soapEnvelope; + } catch (XMLStreamException e) { + logger.error("Cannot create SOAP Envelop. Error:" + e.getLocalizedMessage(), e); + return null; + } + } + + private static byte[] getUTF8Bytes(String soapEnvelpe) { + byte[] bytes; + try { + bytes = soapEnvelpe.getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + logger.error("Unable to extract bytes in UTF-8 encoding. " + + "Extracting bytes in the system default encoding" + + e.getMessage()); + bytes = soapEnvelpe.getBytes(); + } + return bytes; + } + + private static final class Replace { + public String CHAR; + public String STRING; + + public Replace(String c, String string) { + CHAR = c; + STRING = string; + } + } + + /** + * Replaced Strings + */ + private static final Replace RS_HYPHEN = new Replace("-", "__HYPHEN__"); + private static final Replace RS_EQUAL = new Replace("=", "__EQUAL__"); + private static final Replace RS_SLASH = new Replace("/", "__SLASH__"); + private static final Replace RS_COMMA = new Replace(",", "__COMMA__"); + private static final Replace RS_SPACE = new Replace(" ", "__SPACE__"); + private static final Replace RS_COLON = new Replace(":", "__COLON__"); + private static final Replace RS_SEMICOLON = new Replace(";", "__SEMICOLON__"); + + private static Map getTransportHeaders(org.apache.axis2.context.MessageContext messageContext) { + Object headers = messageContext.getProperty(org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS); + if (!(headers instanceof Map)) { + return Collections.emptyMap(); + } + Map httpHeaders = new TreeMap(); + Iterator i = ((Map) headers).entrySet().iterator(); + while (i.hasNext()) { + Map.Entry headerStr = i.next(); + String fieldName = String.valueOf(headerStr.getKey()); + String fieldValue = String.valueOf(headerStr.getValue()); + fieldName = fieldName.replaceAll(RS_HYPHEN.CHAR, RS_HYPHEN.STRING); + fieldValue = fieldValue.replaceAll(RS_HYPHEN.CHAR, RS_HYPHEN.STRING); + fieldValue = fieldValue.replaceAll(RS_EQUAL.CHAR, RS_EQUAL.STRING); + fieldValue = fieldValue.replaceAll(RS_SLASH.CHAR, RS_SLASH.STRING); + fieldValue = fieldValue.replaceAll(RS_COMMA.CHAR, RS_COMMA.STRING); + fieldValue = fieldValue.replaceAll(RS_SPACE.CHAR, RS_SPACE.STRING); + fieldValue = fieldValue.replaceAll(RS_COLON.CHAR, RS_COLON.STRING); + fieldValue = fieldValue.replaceAll(RS_SEMICOLON.CHAR, RS_SEMICOLON.STRING); + httpHeaders.put(fieldName, fieldValue); + } + return httpHeaders; + } + + private static void setTransportHeaders(org.apache.axis2.context.MessageContext msgCtx, Map headers) { + if (headers == null || msgCtx == null) { + return; + } + Map httpHeaders = new TreeMap(); + Iterator i = headers.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry headerEntry = i.next(); + String fieldName = (String) headerEntry.getKey(); + fieldName = fieldName.replaceAll(RS_HYPHEN.STRING, RS_HYPHEN.CHAR); + String fieldValue = (String) headerEntry.getValue(); + fieldValue = fieldValue.replaceAll(RS_HYPHEN.STRING, RS_HYPHEN.CHAR); + fieldValue = fieldValue.replaceAll(RS_EQUAL.STRING, RS_EQUAL.CHAR); + fieldValue = fieldValue.replaceAll(RS_SLASH.STRING, RS_SLASH.CHAR); + fieldValue = fieldValue.replaceAll(RS_COMMA.STRING, RS_COMMA.CHAR); + fieldValue = fieldValue.replaceAll(RS_SPACE.STRING, RS_SPACE.CHAR); + fieldValue = fieldValue.replaceAll(RS_COLON.STRING, RS_COLON.CHAR); + fieldValue = fieldValue.replaceAll(RS_SEMICOLON.STRING, RS_SEMICOLON.CHAR); + httpHeaders.put(fieldName, fieldValue); + } + msgCtx.setProperty(org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS, httpHeaders); + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/impl/commons/StorableMessage.java b/modules/core/src/main/java/org/apache/synapse/message/store/impl/commons/StorableMessage.java new file mode 100644 index 000000000..e2679bcee --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/message/store/impl/commons/StorableMessage.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.message.store.impl.commons; + +import java.io.Serializable; + +/** + * This represents the final message that will be saved in the storage queue. + */ +public class StorableMessage implements Serializable { + private static final int PRIORITY_UNSET = -999; + + private Axis2Message axis2message; + + private SynapseMessage synapseMessage; + + private int priority = PRIORITY_UNSET; + + public Axis2Message getAxis2message() { + return axis2message; + } + + public void setAxis2message(Axis2Message axis2message) { + this.axis2message = axis2message; + } + + public SynapseMessage getSynapseMessage() { + return synapseMessage; + } + + public void setSynapseMessage(SynapseMessage synapseMessage) { + this.synapseMessage = synapseMessage; + } + + public int getPriority(int defaultValue) { + if (priority == PRIORITY_UNSET) { + return defaultValue; + } + return priority; + } + + /** + * @Depricated + * @return + */ + public int getPriority() { + return priority; + } + + public void setPriority(int priority) { + this.priority = priority; + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/impl/commons/SynapseMessage.java b/modules/core/src/main/java/org/apache/synapse/message/store/impl/commons/SynapseMessage.java new file mode 100644 index 000000000..2257f5047 --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/message/store/impl/commons/SynapseMessage.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.message.store.impl.commons; + +import org.apache.synapse.SynapseConstants; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** + * This class serves as a container for the Synapse Message Context parameters/properties + * , and it will be saved as a message in the Store. + */ +public class SynapseMessage implements Serializable { + private ArrayList localEntries = new ArrayList(); + + private HashMap properties = new HashMap(); + + private HashMap propertyObjects = new HashMap(); + + private boolean response = false; + + private boolean faultResponse = false; + + private int tracingState = SynapseConstants.TRACING_UNSET; + + public boolean isResponse() { + return response; + } + + public void setResponse(boolean response) { + this.response = response; + } + + public boolean isFaultResponse() { + return faultResponse; + } + + public void setFaultResponse(boolean faultResponse) { + this.faultResponse = faultResponse; + } + + public int getTracingState() { + return tracingState; + } + + public void setTracingState(int tracingState) { + this.tracingState = tracingState; + } + + public List getLocalEntries() { + return localEntries; + } + + public HashMap getProperties() { + return properties; + } + + public HashMap getPropertyObjects() { + return propertyObjects; + } + + public void addProperty(String key,String value) { + properties.put(key,value); + } + + public void addPropertyObject(String key , byte[] value){ + propertyObjects.put(key, value); + } + + public void addLocalEntry(String key) { + localEntries.add(key); + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/JDBCConsumer.java b/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/JDBCConsumer.java new file mode 100644 index 000000000..016020752 --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/JDBCConsumer.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.message.store.impl.jdbc; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.synapse.MessageContext; +import org.apache.synapse.SynapseException; +import org.apache.synapse.message.MessageConsumer; + +/** + * JDBC Store Consumer + */ +public class JDBCConsumer implements MessageConsumer { + + /** + * Logger for the class + */ + private static final Log logger = LogFactory.getLog(JDBCConsumer.class.getName()); + + /** + * Store for the consumer + */ + private JDBCMessageStore store; + + /** + * Id of the consumer + */ + private String consumerId; + + /** + * Store current message index processing + */ + private String currentMessageId; + + /** + * Initialize consumer + * + * @param store - JDBC message store + */ + public JDBCConsumer(JDBCMessageStore store) { + this.store = store; + } + + /** + * Select and return the first element in current table + * + * @return - Select and return the first element from the table + */ + @Override + public MessageContext receive() { + // Message will get peeked from the table + MessageContext msg = null; + try { + msg = store.peek(); + if (msg != null) { + currentMessageId = msg.getMessageID(); + } + } catch (SynapseException e) { + logger.error("Can't receive message ", e); + } + return msg; + } + + /** + * Ack on success message sending by processor + * + * @return Success of removing + */ + @Override + public boolean ack() { + // Message will be removed at this point + MessageContext msg = store.remove(currentMessageId); + if (msg != null) { + store.dequeued(); + return true; + } else { + return false; + } + } + + /** + * Cleanup the consumer + * + * @return Success of cleaning + */ + @Override + public boolean cleanup() { + currentMessageId = null; + return true; + } + + + /** + * Check JDBC consumer is alive + * + * @return consumer status + */ + @Override + public boolean isAlive() { + return true; //TODO need to implement proper way to check availability + } + + /** + * Set consumer id + * + * @param id ID + */ + @Override + public void setId(int id) { + consumerId = "[" + store.getName() + "-C-" + id + "]"; + } + + /** + * Get consumer id + * + * @return consumerId - Consumer identifier + */ + @Override + public String getId() { + if (consumerId == null) { + return "[unknown-consumer]"; + } + return consumerId; + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/JDBCMessageStore.java b/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/JDBCMessageStore.java new file mode 100644 index 000000000..1bd478979 --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/JDBCMessageStore.java @@ -0,0 +1,733 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.message.store.impl.jdbc; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.synapse.MessageContext; +import org.apache.synapse.SynapseException; +import org.apache.synapse.config.SynapseConfiguration; +import org.apache.synapse.core.SynapseEnvironment; +import org.apache.synapse.core.axis2.Axis2MessageContext; +import org.apache.synapse.core.axis2.Axis2SynapseEnvironment; +import org.apache.synapse.message.MessageConsumer; +import org.apache.synapse.message.MessageProducer; +import org.apache.synapse.message.store.AbstractMessageStore; +import org.apache.synapse.message.store.impl.commons.MessageConverter; +import org.apache.synapse.message.store.impl.commons.StorableMessage; +import org.apache.synapse.message.store.impl.jdbc.util.JDBCConfiguration; +import org.apache.synapse.message.store.impl.jdbc.util.Statement; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; + +/** + * JDBC Store class + */ +public class JDBCMessageStore extends AbstractMessageStore { + /** + * Message Utility class used to provide utilities to do processing + */ + private JDBCConfiguration jdbcConfiguration; + + /** + * Logger for the class + */ + private static final Log logger = LogFactory.getLog(JDBCMessageStore.class.getName()); + + /** + * Locks for clearing the store + */ + private final ReentrantLock removeLock = new ReentrantLock(); + private final ReentrantLock cleanUpOfferLock = new ReentrantLock(); + private final AtomicBoolean cleaningFlag = new AtomicBoolean(false); + protected static final String MESSAGE_COLUMN_NAME = "message"; + + /** + * Initializes the JDBC Message Store + * + * @param synapseEnvironment SynapseEnvironment for the store + */ + @Override + public void init(SynapseEnvironment synapseEnvironment) { + if (logger.isDebugEnabled()) { + logger.debug("Initializing JDBC Message Store"); + } + super.init(synapseEnvironment); + + if (logger.isDebugEnabled()) { + logger.debug("Initializing Datasource and Properties"); + } + jdbcConfiguration = new JDBCConfiguration(); + jdbcConfiguration.buildDataSource(parameters); + +// JDBCMessageConverter.setSynapseEnvironment(synapseEnvironment); + } + + protected JDBCConfiguration getJdbcConfiguration() { + return jdbcConfiguration; + } + + /** + * @see org.apache.synapse.message.store.MessageStore#getProducer() + */ + @Override + public MessageProducer getProducer() { + JDBCProducer producer = new JDBCProducer(this); + producer.setId(nextProducerId()); + if (logger.isDebugEnabled()) { + logger.debug(getNameString() + " created a new JDBC Message Producer."); + } + return producer; + } + + /** + * @see org.apache.synapse.message.store.MessageStore#getConsumer() + */ + @Override + public MessageConsumer getConsumer() { + JDBCConsumer consumer = new JDBCConsumer(this); + consumer.setId(nextConsumerId()); + if (logger.isDebugEnabled()) { + logger.debug(getNameString() + " created a new JDBC Message Consumer."); + } + return consumer; + } + + @Override + public boolean offer(MessageContext messageContext) { + return this.getProducer().storeMessage(messageContext); + } + + @Override + public MessageContext poll() { + return this.remove(); + } + + /** + * Set JDBC store parameters + * + * @param parameters - List of parameters to set + */ + @Override + public void setParameters(Map parameters) { + super.setParameters(parameters); + + // Rebuild utils after setting new parameters + if (jdbcConfiguration != null) { + jdbcConfiguration.buildDataSource(parameters); + } + } + + /** + * Process a given SQL statement. + * + * @param statement - Statement to process. + * @return - MessageContext which will hold the content of the message. + */ + private MessageContext getResultMessageContextFromDatabase(Statement statement) throws SynapseException { + List processedRows = getProcessedRows(statement); + final int firstRowIndex = 0; + MessageContext messageContext = null; + if (processedRows.size() > 0) { + Map messageContentRow = processedRows.get(firstRowIndex); + messageContext = (MessageContext) messageContentRow.get(MESSAGE_COLUMN_NAME); + if (logger.isDebugEnabled()) { + logger.debug("Number of rows processed:" + processedRows + " calling the statement " + + statement.getStatement()); + logger.debug("Message content with mid:" + messageContext.getMessageID() + " will be returned"); + } + } + return messageContext; + } + + /** + * Will return the list of processed message rows. + * + * @param statement the statement executed in the DB. + * @return the rows which contains the column data wrapped inside a map. + */ + protected List getProcessedRows(Statement statement) { + Connection con = null; + ResultSet rs = null; + PreparedStatement ps = null; + List elements; + try { + con = jdbcConfiguration.getConnection(); + ps = con.prepareStatement(statement.getStatement()); + int index = 1; + for (Object param : statement.getParameters()) { + if (param instanceof String) { + ps.setString(index, (String) param); + } else if (param instanceof Long) { + ps.setLong(index, (Long) param); + } else if (param instanceof Integer) { + ps.setInt(index, (Integer) param); + } + index++; + } + rs = ps.executeQuery(); + elements = statement.getResult(rs); + } catch (SQLException e) { + throw new SynapseException("Processing Statement failed : " + statement.getStatement() + + " against DataSource : " + jdbcConfiguration.getDSName(), e); + } finally { + close(con, ps, rs); + } + return elements; + } + + /** + * Will convert the byte[] message to store-able message. + * + * @param msgObj serialized message read from the database. + * @return converted message context. + */ + protected MessageContext deserializeMessage(byte[] msgObj) { + MessageContext messageContext = null; + if (msgObj != null) { + ObjectInputStream ios = null; + try { + // Convert back to MessageContext and add to list + ios = new ObjectInputStream(new ByteArrayInputStream(msgObj)); + Object msg = ios.readObject(); + if (msg instanceof StorableMessage) { + StorableMessage jdbcMsg = (StorableMessage) msg; + org.apache.axis2.context.MessageContext axis2Mc = this.newAxis2Mc(); + MessageContext synapseMc = this.newSynapseMc(axis2Mc); + messageContext = MessageConverter.toMessageContext(jdbcMsg, axis2Mc, synapseMc); + } + } catch (IOException e) { + throw new SynapseException("Error reading object input stream", e); + } catch (ClassNotFoundException e) { + throw new SynapseException("Could not find the class", e); + } finally { + closeStream(ios); + } + } else { + throw new SynapseException("Retrieved Object is null"); + } + return messageContext; + } + + /** + * Closes the object stream after completing the DB operations. + * + * @param ios stream which should be closed. + */ + private void closeStream(ObjectInputStream ios) { + try { + ios.close(); + } catch (IOException e) { + logger.error("Error while closing object input stream", e); + } + } + + private org.apache.axis2.context.MessageContext newAxis2Mc() { + return ((Axis2SynapseEnvironment) synapseEnvironment) + .getAxis2ConfigurationContext().createMessageContext(); + } + + private org.apache.synapse.MessageContext newSynapseMc( + org.apache.axis2.context.MessageContext msgCtx) { + SynapseConfiguration configuration = synapseEnvironment.getSynapseConfiguration(); + return new Axis2MessageContext(msgCtx, configuration, synapseEnvironment); + } + + /** + * On database update failure tries to rollback + * + * @param connection database connection + * @param task explanation of the task done when the rollback was triggered + */ + private void rollback(Connection connection, String task) { + if (connection != null) { + try { + connection.rollback(); + } catch (SQLException e) { + logger.warn("Rollback failed on " + task, e); + } + } + } + + /** + * Process statements that do not give a ResultSet + * + * @param statements - Statement to process + * @return - Success or Failure of the process + */ + private boolean processNonResultingStatement(List statements) throws SynapseException { + Connection connection = null; + boolean result; + PreparedStatement preparedStatement = null; + try { + connection = jdbcConfiguration.getConnection(); + connection.setAutoCommit(false); + for (Statement statement : statements) { + preparedStatement = connection.prepareStatement(statement.getStatement()); + int index = 1; + for (Object param : statement.getParameters()) { + if (param instanceof String) { + preparedStatement.setString(index, (String) param); + } else if (param instanceof Long) { + preparedStatement.setLong(index, (Long) param); + } else if (param instanceof StorableMessage) { + //Serialize the object into byteArray and update the statement + preparedStatement.setBytes(index, serialize(param)); + } + index++; + } + if (logger.isDebugEnabled()) { + logger.debug("Executing statement:" + preparedStatement); + } + preparedStatement.execute(); + } + connection.commit(); + result = true; + } catch (SQLException | IOException e) { + rollback(connection, "deleting message"); + throw new SynapseException("Processing Statement failed against DataSource : " + + jdbcConfiguration.getDSName(), e); + } finally { + if (preparedStatement != null) { + try { + preparedStatement.close(); + } catch (SQLException e) { + logger.error("Error while closing prepared statement", e); + } + } + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + logger.error("Error while closing connection", e); + } + } + } + return result; + } + + public byte[] serialize(Object obj) throws IOException { + ByteArrayOutputStream b = new ByteArrayOutputStream(); + ObjectOutputStream o = new ObjectOutputStream(b); + o.writeObject(obj); + return b.toByteArray(); + } + + /** + * Destroy Resources allocated + */ + @Override + public void destroy() { + super.destroy(); + jdbcConfiguration = null; + } + + /** + * Add a message to the end of the table. If fetching success return true else false + * + * @param messageContext message to insert + * @return - success/failure of fetching + */ + public boolean store(MessageContext messageContext) throws SynapseException { + if (messageContext == null) { + logger.error("Message is null, can't store into database"); + return false; + } + boolean cleaningState = false; + try { + if (cleaningFlag.get()) { + try { + cleanUpOfferLock.lock(); + cleaningState = true; + } catch (Exception e) { + logger.error("Message Cleanup lock released unexpectedly", e); + } + } + ArrayList statements = new ArrayList<>(); + Statement statement = getStoreMessageStatement(messageContext, null); + statements.add(statement); + return processNonResultingStatement(statements); + } catch (Exception e) { + throw new SynapseException("Error while creating StorableMessage", e); + } finally { + if (cleaningState) { + cleanUpOfferLock.unlock(); + } + } + } + + /** + *

+ * Generates the statement to store message in database. + *

+ *

+ * If the sequence id is specified the corresponding sequence id will be stored, sequence id will be specified if + * re-sequence message store is being used. In other times this value will be null. + *

+ * + * @param messageContext the content of the message. + * @param sequenceId the sequence id of the message (optional). + * @return SQL statement for insertion of value to store. + * @throws StoreException at an event there's an exception when generating the statement. + * @see org.apache.synapse.message.store.impl.resequencer.ResequenceMessageStore + */ + protected Statement getStoreMessageStatement(MessageContext messageContext, Long sequenceId) throws StoreException { + StorableMessage persistentMessage = MessageConverter.toStorableMessage(messageContext); + String msgId = persistentMessage.getAxis2message().getMessageID(); + Statement statement; + if (null == sequenceId) { + String insertMessageStatement = "INSERT INTO " + jdbcConfiguration.getTableName() + + " (msg_id,message) VALUES (?,?)"; + statement = new Statement(insertMessageStatement) { + @Override + public List getResult(ResultSet resultSet) { + throw new UnsupportedOperationException(); + } + }; + statement.addParameter(msgId); + statement.addParameter(persistentMessage); + } else { + String insertMessageWithSequenceIdStatement = + "INSERT INTO " + jdbcConfiguration.getTableName() + " (msg_id,seq_id,message) VALUES (?,?,?)"; + statement = new Statement(insertMessageWithSequenceIdStatement) { + @Override + public List getResult(ResultSet resultSet) { + throw new UnsupportedOperationException(); + } + }; + statement.addParameter(msgId); + statement.addParameter(sequenceId); + statement.addParameter(persistentMessage); + } + return statement; + } + + /** + * Select and return the first element in current table + * + * @return - Select and return the first element from the table + */ + @Override + public MessageContext peek() throws SynapseException { + MessageContext msg; + try { + Statement statement = new Statement("SELECT message FROM " + jdbcConfiguration.getTableName() + + " WHERE indexId=(SELECT min(indexId) from " + jdbcConfiguration.getTableName() + ")") { + @Override + public List getResult(ResultSet resultSet) throws SQLException { + return messageContentResultSet(resultSet, this.getStatement()); + } + }; + msg = getResultMessageContextFromDatabase(statement); + } catch (SynapseException se) { + throw new SynapseException("Error while peek the message", se); + } + return msg; + } + + /** + * Removes the first element from table + * + * @return MessageContext - first message context + * @throws NoSuchElementException if there was not element to be removed. + */ + @Override + public MessageContext remove() throws NoSuchElementException { + MessageContext messageContext = peek(); + messageContext = remove(messageContext.getMessageID()); + if (messageContext != null) { + return messageContext; + } else { + throw new NoSuchElementException("First element not found and remove failed !"); + } + } + + /** + * Remove the message with given msg_id + * + * @param msgId - message ID + * @return - removed message context + */ + @Override + public MessageContext remove(String msgId) throws SynapseException { + MessageContext result; + boolean cleaningState = false; + try { + if (cleaningFlag.get()) { + try { + removeLock.lock(); + cleaningState = true; + } catch (Exception ie) { + logger.error("Message Cleanup lock released unexpectedly", ie); + } + } + result = get(msgId); + List statements = removeMessageStatement(msgId); + processNonResultingStatement(statements); + } catch (Exception e) { + throw new SynapseException("Removing message with id = " + msgId + " failed !", e); + } finally { + if (cleaningState) { + removeLock.unlock(); + } + } + return result; + } + + /** + * Statement to remove the message once a response is received. + * + * @param msgId message id of the statement which should be removed. + * @return the sql remove message statement. + */ + protected List removeMessageStatement(String msgId) { + ArrayList statements = new ArrayList<>(); + Statement statement = new Statement("DELETE FROM " + jdbcConfiguration.getTableName() + " WHERE msg_id=?") { + @Override + public List getResult(ResultSet resultSet) throws SQLException { + throw new UnsupportedOperationException(); + } + }; + statement.addParameter(msgId); + statements.add(statement); + return statements; + } + + /** + * Delete all entries from table + */ + @Override + public void clear() { + try { + logger.warn(getNameString() + "deleting all entries"); + removeLock.lock(); + cleanUpOfferLock.lock(); + cleaningFlag.set(true); + Statement statement = new Statement("DELETE FROM " + jdbcConfiguration.getTableName()) { + @Override + public List getResult(ResultSet resultSet) throws SQLException { + throw new UnsupportedOperationException(); + } + }; + List statements = new ArrayList<>(); + statements.add(statement); + processNonResultingStatement(statements); + } catch (Exception e) { + logger.error("Clearing store failed !", e); + } finally { + cleaningFlag.set(false); + removeLock.unlock(); + cleanUpOfferLock.unlock(); + } + } + + /** + * Get the message at given position + * Only can be done with MYSQL, and no use-case in current implementation + * + * @param position - position of the message , starting value is 0 + * @return Message Context of position th row or if failed return null + */ + @Override + public MessageContext get(int position) { + if (position < 0) { + throw new IllegalArgumentException("Index:" + position + " out of table bound"); + } + // Gets the minimum value of the sub-table which contains indexId values greater than given position + // ('position' has minimum of 0 while indexId has minimum of 1) + Statement statement = new Statement("SELECT message FROM " + jdbcConfiguration.getTableName() + + " ORDER BY indexId ASC LIMIT ?,1 ") { + @Override + public List getResult(ResultSet resultSet) throws SQLException { + return messageContentResultSet(resultSet, this.getStatement()); + } + }; + statement.addParameter(position); + return getResultMessageContextFromDatabase(statement); + } + + /** + * Get all messages in the table + * + * @return - List containing all message contexts in the store + */ + @Override + public List getAll() { + if (logger.isDebugEnabled()) { + logger.debug(getNameString() + " retrieving all messages from the store."); + } + Statement statement = new Statement("SELECT message FROM " + jdbcConfiguration.getTableName()) { + @Override + public List getResult(ResultSet resultSet) throws SQLException { + return messageContentResultSet(resultSet, this.getStatement()); + } + }; + MessageContext result = getResultMessageContextFromDatabase(statement); + if (result != null) { + List msgs = new ArrayList<>(); + msgs.add(result); + return msgs; + } else { + return null; + } + } + + /** + * Return the first element with given msg_id + * + * @param msgId - Message ID + * @return - returns the first result found else null + */ + @Override + public MessageContext get(String msgId) { + Statement statement = new Statement("SELECT indexId,message FROM " + jdbcConfiguration.getTableName() + + " WHERE msg_id=?") { + @Override + public List getResult(ResultSet resultSet) throws SQLException { + return messageContentResultSet(resultSet, this.getStatement()); + } + }; + statement.addParameter(msgId); + return getResultMessageContextFromDatabase(statement); + } + + /** + *

+ * Return the messages corresponding to the provided statement. + *

+ * + * @param resultSet the result-set obtained from the statement. + * @param statement the SQL statement results are obtained for. + * @return the content of the messages. + * @throws SQLException during an error encountered when accessing the database. + */ + protected List messageContentResultSet(ResultSet resultSet, String statement) throws SQLException { + ArrayList elements = new ArrayList<>(); + while (resultSet.next()) { + try { + HashMap rowData = new HashMap<>(); + byte[] msgObj = resultSet.getBytes(MESSAGE_COLUMN_NAME); + MessageContext responseMessageContext = deserializeMessage(msgObj); + rowData.put(MESSAGE_COLUMN_NAME, responseMessageContext); + elements.add(rowData); + } catch (SQLException e) { + String message = "Error executing statement : " + statement + " against DataSource : " + + jdbcConfiguration.getDSName(); + throw new SynapseException(message, e); + } + } + return elements; + } + + /** + * Return number of messages in the store + * + * @return size - Number of messages + */ + @Override + public int size() { + Connection con = null; + ResultSet rs = null; + PreparedStatement ps = null; + int size = 0; + Statement statement = new Statement("SELECT COUNT(*) FROM " + jdbcConfiguration.getTableName()) { + @Override + public List getResult(ResultSet resultSet) throws SQLException { + return messageContentResultSet(resultSet, this.getStatement()); + } + }; + try { + con = jdbcConfiguration.getConnection(); + ps = con.prepareStatement(statement.getStatement()); + con = ps.getConnection(); + rs = ps.executeQuery(); + while (rs.next()) { + try { + size = rs.getInt(1); + } catch (Exception e) { + logger.error("Error executing statement : " + statement.getStatement() + + " against DataSource : " + jdbcConfiguration.getDSName(), e); + break; + } + } + } catch (SQLException e) { + logger.error("Error executing statement : " + statement.getStatement() + + " against DataSource : " + jdbcConfiguration.getDSName(), e); + } finally { + close(con, ps, rs); + } + return size; + } + + /** + * Get the store's name + * + * @return - name of the store + */ + private String getNameString() { + return "Store [" + getName() + "]"; + } + + /** + * Close all ResultSet related things + * + * @param con - Connection to close + * @param ps - PreparedStatement to close + * @param rs - ResultSet to close + */ + private void close(Connection con, PreparedStatement ps, ResultSet rs) { + if (ps != null) { + try { + ps.close(); + } catch (SQLException e) { + logger.error("Error while closing prepared statement", e); + } + } + if (rs != null) { + try { + rs.close(); + } catch (SQLException e) { + logger.error("Error while closing result set", e); + } + } + if (con != null) { + try { + con.close(); + } catch (SQLException e) { + logger.error("Error while closing connection", e); + } + } + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/JDBCMessageStoreConstants.java b/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/JDBCMessageStoreConstants.java new file mode 100644 index 000000000..a839df08f --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/JDBCMessageStoreConstants.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.message.store.impl.jdbc; + +public class JDBCMessageStoreConstants { + + /** + * Connection URL to database + */ + public static final String JDBC_CONNECTION_URL = "store.jdbc.connection.url"; + + /** + * Driver to use + */ + public static final String JDBC_CONNECTION_DRIVER = "store.jdbc.driver"; + + /** + * User Name that is used to create the connection with the broker + */ + public static final String JDBC_USERNAME = "store.jdbc.username"; + + /** + * Password that is used to create the connection with the broker + */ + public static final String JDBC_PASSWORD = "store.jdbc.password"; + + /** + * DataSource name exists + */ + public static final String JDBC_DSNAME = "store.jdbc.dsName"; + + /** + * IcClass of the + */ + public static final String JDBC_ICCLASS = "store.jdbc.icClass"; + + + // Optional parameters + /** + * Name of the database table + */ + public static final String JDBC_TABLE = "store.jdbc.table"; + + /** + * Default name of the database table + */ + public static final String JDBC_DEFAULT_TABLE_NAME = "jdbc_message_store"; +} diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/JDBCProducer.java b/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/JDBCProducer.java new file mode 100644 index 000000000..12d4532ae --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/JDBCProducer.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.message.store.impl.jdbc; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.synapse.MessageContext; +import org.apache.synapse.SynapseException; +import org.apache.synapse.message.MessageProducer; + +/** + * JDBC Store Producer + */ +public class JDBCProducer implements MessageProducer { + + /** + * Logger for the class + */ + private static final Log logger = LogFactory.getLog(JDBCProducer.class.getName()); + + /** + * Store for the producer + */ + private JDBCMessageStore store; + + /** + * Id of the producer + */ + private String producerId; + + /** + * Initialize producer + * + * @param store - JDBC message store + */ + public JDBCProducer(JDBCMessageStore store) { + this.store = store; + } + + /** + * Add a message to the end of the table. If fetching success return true else false + * + * @param synCtx message to insert + * @return - success/failure of fetching + */ + @Override + public boolean storeMessage(MessageContext synCtx) { + boolean success = false; + try { + success = store.store(synCtx); + store.enqueued(); + } catch (SynapseException e) { + logger.error("Error while storing message : " + synCtx.getMessageID(), e); + } + return success; + } + + /** + * Cleanup the producer + * + * @return true Since no producer specific things + */ + @Override + public boolean cleanup() { + return true; + } + + /** + * Set producer id + * + * @param id ID + */ + @Override + public void setId(int id) { + producerId = "[" + store.getName() + "-P-" + id + "]"; + } + + /** + * Get producer id + * + * @return producerId - Producer identifier + */ + @Override + public String getId() { + if (producerId == null) { + return "[unknown-producer]"; + } + return producerId; + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/StoreException.java b/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/StoreException.java new file mode 100644 index 000000000..03bec6d8a --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/StoreException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.message.store.impl.jdbc; + +/** + * Will be thrown when there's an error in the JDBC store + */ +public class StoreException extends Exception{ + public StoreException(){} + + public StoreException(String message){ + super(message); + } + + public StoreException(String message,Exception e){ + super(message,e); + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/util/JDBCConfiguration.java b/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/util/JDBCConfiguration.java new file mode 100644 index 000000000..8d85c9105 --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/util/JDBCConfiguration.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.message.store.impl.jdbc.util; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.synapse.SynapseException; +import org.apache.synapse.commons.datasource.DataSourceFinder; +import org.apache.synapse.commons.datasource.DataSourceInformation; +import org.apache.synapse.commons.datasource.DataSourceRepositoryHolder; +import org.apache.synapse.commons.datasource.RepositoryBasedDataSourceFinder; +import org.apache.synapse.commons.datasource.factory.DataSourceFactory; +import org.apache.synapse.message.store.impl.jdbc.JDBCMessageStoreConstants; +import org.apache.synapse.securevault.secret.SecretInformation; +import org.apache.synapse.securevault.secret.SecretManager; + +import javax.naming.Context; +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Map; +import java.util.Properties; + +/** + * Class JDBCUtil provides the Utility functions to create JDBC resources + */ +public class JDBCConfiguration { + + /** + * Logger for the class + */ + private static final Log log = LogFactory.getLog(JDBCConfiguration.class); + + /** + * Information about datasource + */ + private DataSourceInformation dataSourceInformation; + + /** + * Name of the data source + */ + private String dataSourceName; + + /** + * Jndi properties + */ + private Properties jndiProperties = new Properties(); + + /** + * Data source + */ + private DataSource dataSource; + + /** + * Name of the table + */ + private String tableName; + + /** + * Creating datasource at startup using configured parameters + * + * @param parameters - parameters given in configuration + */ + public void buildDataSource(Map parameters) { + try { + // Get datasource information + if ((parameters.get(JDBCMessageStoreConstants.JDBC_DSNAME)) != null) { + readLookupConfig(parameters); + dataSource = lookupDataSource(); + } else if ((parameters.get(JDBCMessageStoreConstants.JDBC_CONNECTION_DRIVER)) != null) { + readCustomDataSourceConfig(parameters); + dataSource = createCustomDataSource(); + } else { + handleException("The DataSource connection information must be specified for " + + "using a custom DataSource connection pool or for a JNDI lookup"); + } + + // Get table information + if (parameters.get(JDBCMessageStoreConstants.JDBC_TABLE) != null) { + tableName = (String) parameters.get(JDBCMessageStoreConstants.JDBC_TABLE); + } else { + tableName = JDBCMessageStoreConstants.JDBC_DEFAULT_TABLE_NAME; + } + } catch (Exception e) { + log.error("Error looking up DataSource connection information: ", e); + } + } + + /** + * Reading lookup information for existing datasource + * + * @param parameters - parameters given in configuration + */ + private void readLookupConfig(Map parameters) { + String dataSourceName = (String) parameters.get(JDBCMessageStoreConstants.JDBC_DSNAME); + this.setDataSourceName(dataSourceName); + + if (parameters.get(JDBCMessageStoreConstants.JDBC_ICCLASS) != null) { + Properties props = new Properties(); + props.put(Context.INITIAL_CONTEXT_FACTORY, parameters.get(JDBCMessageStoreConstants.JDBC_ICCLASS)); + props.put(Context.PROVIDER_URL, parameters.get(JDBCMessageStoreConstants.JDBC_CONNECTION_URL)); + props.put(Context.SECURITY_PRINCIPAL, parameters.get(JDBCMessageStoreConstants.JDBC_USERNAME)); + props.put(Context.SECURITY_CREDENTIALS, parameters.get(JDBCMessageStoreConstants.JDBC_PASSWORD)); + + this.setJndiProperties(props); + } + } + + /** + * Configure for custom datasource + * + * @param parameters - parameters given in configuration + */ + private void readCustomDataSourceConfig(Map parameters) { + DataSourceInformation dataSourceInformation = new DataSourceInformation(); + dataSourceInformation.setDriver((String) parameters.get(JDBCMessageStoreConstants.JDBC_CONNECTION_DRIVER)); + dataSourceInformation.setUrl((String) parameters.get(JDBCMessageStoreConstants.JDBC_CONNECTION_URL)); + + SecretInformation secretInformation = new SecretInformation(); + secretInformation.setUser((String) parameters.get(JDBCMessageStoreConstants.JDBC_USERNAME)); + secretInformation.setAliasSecret((String) parameters.get(JDBCMessageStoreConstants.JDBC_PASSWORD)); + dataSourceInformation.setSecretInformation(secretInformation); + + this.setDataSourceInformation(dataSourceInformation); + } + + /** + * Lookup the DataSource on JNDI using the specified name and optional properties + * + * @return a DataSource looked up using the specified JNDI properties + */ + private DataSource lookupDataSource() { + DataSource dataSource = null; + RepositoryBasedDataSourceFinder finder = DataSourceRepositoryHolder.getInstance() + .getRepositoryBasedDataSourceFinder(); + + if (finder.isInitialized()) { + // First try a lookup based on the data source name only + dataSource = finder.find(dataSourceName); + } + if (dataSource == null) { + // Decrypt the password if needed + String password = jndiProperties.getProperty(Context.SECURITY_CREDENTIALS); + if (password != null && !"".equals(password)) { + jndiProperties.put(Context.SECURITY_CREDENTIALS, getActualPassword(password)); + } + // Lookup the data source using the specified jndi properties + dataSource = DataSourceFinder.find(dataSourceName, jndiProperties); + if (dataSource == null) { + handleException("Cannot find a DataSource " + dataSourceName + " for given JNDI" + + " properties :" + jndiProperties); + } + } + if (dataSource != null) { + log.info("Successfully looked up datasource " + dataSourceName); + } + return dataSource; + } + + /** + * Create a custom DataSource using the specified data source information. + * + * @return a DataSource created using specified properties + */ + protected DataSource createCustomDataSource() { + DataSource dataSource = DataSourceFactory.createDataSource(dataSourceInformation); + if (dataSource != null) { + log.info("Successfully created data source for " + dataSourceInformation.getUrl()); + } + return dataSource; + } + + /** + * Get the password from SecretManager . here only use SecretManager + * + * @param aliasPassword alias password + * @return if the SecretManager is initiated , then , get the corresponding secret + * , else return alias itself + */ + private String getActualPassword(String aliasPassword) { + SecretManager secretManager = SecretManager.getInstance(); + if (secretManager.isInitialized()) { + return secretManager.getSecret(aliasPassword); + } + return aliasPassword; + } + + /** + * Get a Connection from current datasource + * + * @return - Connection + * @throws java.sql.SQLException - Failure in creating datasource connection + */ + public Connection getConnection() throws SQLException { + Connection con = getDataSource().getConnection(); + if (con == null) { + String msg = "Connection from DataSource " + getDSName() + " is null."; + throw new SynapseException(msg); + } + return con; + } + + /** + * Return the name or (hopefully) unique connection URL specific to the DataSource being used + * This is used for logging purposes only + * + * @return a unique name or URL to refer to the DataSource being used + */ + public String getDSName() { + if (dataSourceName != null) { + return dataSourceName; + } else if (dataSourceInformation != null) { + String name = dataSourceInformation.getUrl(); + if (name == null) { + name = dataSourceInformation.getDatasourceName(); + } + return name; + } + return null; + } + + /** + * Set DataSourceInformation + * + * @param dataSourceInformation - information to set + */ + public void setDataSourceInformation(DataSourceInformation dataSourceInformation) { + this.dataSourceInformation = dataSourceInformation; + } + + /** + * Set JNDI Properties + * + * @param jndiProperties - properties to set + */ + public void setJndiProperties(Properties jndiProperties) { + this.jndiProperties = jndiProperties; + } + + /** + * Get datasource + * + * @return - Datasource currently in use + */ + public DataSource getDataSource() { + return dataSource; + } + + /** + * Get datasource name currently in use + * + * @param dataSourceName - Datasource name + */ + public void setDataSourceName(String dataSourceName) { + this.dataSourceName = dataSourceName; + } + + /** + * Table name in use + * + * @return - Name of the table + */ + public String getTableName() { + return tableName; + } + + /** + * Handle Exceptions during process + * + * @param o - Exception needs to handle + */ + private void handleException(Object o) { + log.error(o); + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/util/Statement.java b/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/util/Statement.java new file mode 100644 index 000000000..4db753c2c --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/message/store/impl/jdbc/util/Statement.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.message.store.impl.jdbc.util; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Statement class for raw SQL statement + */ +public abstract class Statement { + /** + * Defines the SQL statement which should be executed. + */ + private String statement = null; + + /** + * List of parameters which should be included in the statement. + */ + private final List parameters = new ArrayList(); + + /** + * Provides the de-serialized outcome of the query. + * + * @param resultSet the result-set obtained from the DB. + * @return the result which contain each row and the corresponding column. + */ + public abstract List getResult(ResultSet resultSet) throws SQLException; + + public Statement(String rawStatement) { + this.statement = rawStatement; + } + + public String getStatement() { + return statement; + } + + public void addParameter(Object o) { + parameters.add(o); + } + + public List getParameters() { + return parameters; + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/impl/resequencer/ResequenceMessageStore.java b/modules/core/src/main/java/org/apache/synapse/message/store/impl/resequencer/ResequenceMessageStore.java new file mode 100644 index 000000000..88a9ce89a --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/message/store/impl/resequencer/ResequenceMessageStore.java @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.message.store.impl.resequencer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.synapse.MessageContext; +import org.apache.synapse.SynapseException; +import org.apache.synapse.core.SynapseEnvironment; +import org.apache.synapse.message.store.impl.jdbc.JDBCMessageStore; +import org.apache.synapse.message.store.impl.jdbc.StoreException; +import org.apache.synapse.message.store.impl.jdbc.util.Statement; +import org.apache.synapse.util.xpath.SynapseXPath; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; + +/** + *

+ * This represents the store which will allow to re-sequence messages. + *

+ * + * @see JDBCMessageStore + */ +public class ResequenceMessageStore extends JDBCMessageStore { + /** + * Logger for the class + */ + private static final Log log = LogFactory.getLog(ResequenceMessageStore.class); + + /** + * Number of milliseconds for a second + */ + private static final int MILLISECONDS = 1000; + + /** + * The value of the next sequence id which should be processed. + */ + private long nextSequenceId = 0; + + /** + * The xpath expression evaluated to identify sequence. + */ + private SynapseXPath xPath; + + /** + * Maximum number of time the sequence should wait for gap detection. + */ + private long gapTimeoutInterval; + + /** + * The number of counts elapsed as waiting for gap. + */ + private long nextElapsedTime; + + /** + *

+ * Indicates whether the processor was started. + *

+ *

+ * This will be used when clustering, if the processor has not started before, the message will be retrieved from + * last processed id table. + *

+ */ + private boolean hasStarted = false; + + /** + *

+ * There could be situations where the sequence id could duplicate, hence there will be a unique message id + * maintained. + *

+ *

+ * Will co-relate between the message-id and the sequence-id. + *

+ * key - message id value. + * value - sequence id value. + */ + private ConcurrentHashMap sequenceIdMapper = new ConcurrentHashMap<>(); + + + /** + *

+ * Returns the start id indicated in the DB. + *

+ * + * @param resultSet the result returned from the DB query. + * @return the result serialized into DB. + * @throws SQLException if an error is encountered while accessing the DB. + */ + private List startIdSelectionResult(ResultSet resultSet) throws SQLException { + ArrayList elements = new ArrayList<>(); + while (resultSet.next()) { + HashMap rowData = new HashMap<>(); + long sequenceId = resultSet.getLong(ResequenceMessageStoreConstants.SEQUENCE_ID_COLUMN); + rowData.put(ResequenceMessageStoreConstants.SEQUENCE_ID_COLUMN, sequenceId); + elements.add(rowData); + if (log.isDebugEnabled()) { + log.debug("DB returned " + sequenceId + " as the result"); + } + } + return elements; + } + + /** + *

+ * This method should be called at the start of the store. + *

+ *

+ * Will read from the DB and identify the id which should be processed. + *

+ * + * @return the start id of the store. + */ + private long readStartId() { + Long sequenceId = 0L; + final int minimumRowCount = 0; + String storeName = this.getName(); + final String lastProcessIdSelectStatement = "SELECT " + ResequenceMessageStoreConstants.SEQ_ID + " FROM " + + ResequenceMessageStoreConstants.LAST_PROCESS_ID_TABLE_NAME + " WHERE " + + ResequenceMessageStoreConstants.STATEMENT_COLUMN + "= ?"; + Statement statement = new Statement(lastProcessIdSelectStatement) { + @Override + public List getResult(ResultSet resultSet) throws SQLException { + return startIdSelectionResult(resultSet); + } + }; + statement.addParameter(storeName); + List processedRows = getProcessedRows(statement); + if (processedRows.size() > minimumRowCount) { + final int firstIndex = 0; + Map processedRowMap = processedRows.get(firstIndex); + sequenceId = (Long) processedRowMap.get(ResequenceMessageStoreConstants.SEQUENCE_ID_COLUMN); + log.info("Starting sequence id recorded as:" + sequenceId); + } + return sequenceId; + } + + /** + * Initializes the sequence xPath value. + * + * @param parameters the list of parameters defined in the configuration. + */ + private void initResequenceParams(Map parameters) { + xPath = (SynapseXPath) parameters.get(ResequenceMessageStoreConstants.SEQUENCE_NUMBER_XPATH); + gapTimeoutInterval = Integer.parseInt((String) parameters.get(ResequenceMessageStoreConstants + .MAX_NUMBER_OF_WAITING_COUNT)); + //Convert the gap time into milliseconds + if (gapTimeoutInterval >= 0) { + gapTimeoutInterval = gapTimeoutInterval * MILLISECONDS; + nextElapsedTime = System.currentTimeMillis() + gapTimeoutInterval; + if (log.isDebugEnabled()) { + log.debug("Resequencer initialized with xpath:" + xPath.toString() + ",the waiting count configured:" + + gapTimeoutInterval); + } + } else { + nextElapsedTime = -1; + } + } + + /** + * {@inheritDoc} + */ + @Override + public void setParameters(Map parameters) { + initResequenceParams(parameters); + super.setParameters(parameters); + } + + /** + * {@inheritDoc} + */ + @Override + public void init(SynapseEnvironment synapseEnvironment) { + super.init(synapseEnvironment); + nextSequenceId = readStartId() + 1; + if (log.isDebugEnabled()) { + log.debug("Next sequence which will be processed:" + nextSequenceId); + } + } + + /** + * Extracts the sequence id from the message context. + * + * @param message the message context. + * @return sequence id of the message. + */ + private Long getMessageSequenceId(MessageContext message) throws StoreException { + String sequenceIdValue; + sequenceIdValue = xPath.stringValueOf(message); + if (log.isDebugEnabled()) { + log.debug("Sequence id extracted from the incoming message " + message.getMessageID() + " is:" + + sequenceIdValue); + } + return Long.parseLong(sequenceIdValue); + } + + /** + * Will get the current message belonging to a sequence. + * + * @return the current message in the sequence. + */ + private MessageContext getCurrentMessage() { + MessageContext msg = null; + final int firstRowIndex = 0; + try { + String tableName = getJdbcConfiguration().getTableName(); + String selectMessageStatement = "SELECT message FROM " + tableName + " WHERE " + + ResequenceMessageStoreConstants.SEQ_ID + "= ?"; + Statement statement = new Statement(selectMessageStatement) { + @Override + public List getResult(ResultSet resultSet) throws SQLException { + return messageContentResultSet(resultSet, this.getStatement()); + } + }; + statement.addParameter(nextSequenceId - 1); + List processedRows = getProcessedRows(statement); + if (!processedRows.isEmpty()) { + msg = getMessageContext(processedRows, firstRowIndex); + if (log.isTraceEnabled()) { + log.trace("Message with id " + msg.getMessageID() + " returned for sequence " + nextSequenceId); + } + } else { + if (log.isTraceEnabled()) { + log.trace("Sequences not returned from DB, next sequence will be:" + nextSequenceId); + } + } + } catch (SynapseException ex) { + throw new SynapseException("Error while peek the message", ex); + } + return msg; + } + + /** + * Will get the next message belonging to a sequence. + * + * @return the next message in the sequence. + */ + private MessageContext getNextMessage() { + MessageContext msg = null; + final int firstRowIndex = 0; + try { + String tableName = getJdbcConfiguration().getTableName(); + String selectMessageStatement = "SELECT message FROM " + tableName + " WHERE " + + ResequenceMessageStoreConstants.SEQ_ID + "= ?"; + Statement statement = new Statement(selectMessageStatement) { + @Override + public List getResult(ResultSet resultSet) throws SQLException { + return messageContentResultSet(resultSet, this.getStatement()); + } + }; + statement.addParameter(nextSequenceId); + List processedRows = getProcessedRows(statement); + if (!processedRows.isEmpty()) { + msg = getMessageContext(processedRows, firstRowIndex); + nextSequenceId++; + if (log.isTraceEnabled()) { + log.trace("Message with id " + msg.getMessageID() + " returned for sequence " + nextSequenceId); + } + } else { + if (log.isTraceEnabled()) { + log.trace("Sequences not returned from DB, next sequence will be:" + nextSequenceId); + } + } + } catch (SynapseException ex) { + throw new SynapseException("Error while peek the message", ex); + } + return msg; + } + + /** + *

+ * Remove message statement. + *

+ *

+ * When re-sequenced we need to maintain the last processed id along with the removal. So that at an event the node + * crashes we know where to start from. + *

+ *

+ *

+ * {@inheritDoc} + */ + @Override + protected List removeMessageStatement(String msgId) { + Long messageSequenceId = sequenceIdMapper.remove(msgId); + String messageStoreName = this.getName(); + if (messageSequenceId == null) { + log.error("The message with id " + msgId + " is not tracked within the memory."); + } + ArrayList statements = new ArrayList<>(); + final String deleteMessageStatement = "DELETE FROM " + getJdbcConfiguration().getTableName() + + " WHERE msg_id=?"; + final String insertLastProcessIdStatement = "INSERT INTO " + + ResequenceMessageStoreConstants.LAST_PROCESS_ID_TABLE_NAME + " SELECT ?, ?" + + " FROM " + ResequenceMessageStoreConstants.LAST_PROCESS_ID_TABLE_NAME + " WHERE statement = ? " + + "HAVING COUNT(*) = 0"; + + final String updateLastProcessIdStatement = "UPDATE " + ResequenceMessageStoreConstants.LAST_PROCESS_ID_TABLE_NAME + + " SET statement = ? , seq_id = ? WHERE statement = ?"; + Statement sequenceIdInsertStatement = new Statement(insertLastProcessIdStatement) { + @Override + public List getResult(ResultSet resultSet) throws SQLException { + throw new UnsupportedOperationException(); + } + }; + + Statement sequenceIdUpdateStatement = new Statement(updateLastProcessIdStatement) { + @Override + public List getResult(ResultSet resultSet) throws SQLException { + throw new UnsupportedOperationException(); + } + }; + + Statement deleteMessage = new Statement(deleteMessageStatement) { + @Override + public List getResult(ResultSet resultSet) throws SQLException { + throw new UnsupportedOperationException(); + } + }; + deleteMessage.addParameter(msgId); + sequenceIdInsertStatement.addParameter(messageStoreName); + sequenceIdInsertStatement.addParameter(messageSequenceId); + sequenceIdInsertStatement.addParameter(messageStoreName); + sequenceIdUpdateStatement.addParameter(messageStoreName); + sequenceIdUpdateStatement.addParameter(messageSequenceId); + sequenceIdUpdateStatement.addParameter(messageStoreName); + statements.add(deleteMessage); + statements.add(sequenceIdInsertStatement); + statements.add(sequenceIdUpdateStatement); + if (log.isDebugEnabled()) { + log.debug("Removing message with id:" + msgId + " and last process id:" + messageSequenceId); + } + return statements; + } + + /** + * Identify the message context from the processed rows. + * + * @param processedRows the processed row value. + * @return the message context represented in the corresponding row. + */ + private MessageContext getMessageContext(List processedRows, int rowIndex) { + Map map = processedRows.get(rowIndex); + return (MessageContext) map.get(MESSAGE_COLUMN_NAME); + } + + /** + * Get the sequence id belonging to the column. + * + * @param processedRows the list of processed rows. + * @param rowIndex the index of the row which should be retrieved. + * @return the sequence id of the message. + */ + private long getSequenceId(List processedRows, int rowIndex) { + Map map = processedRows.get(rowIndex); + return (long) map.get(ResequenceMessageStoreConstants.SEQUENCE_ID_COLUMN); + } + + /** + *

+ * Gets message with minimum sequence id. + *

+ * + * @param resultSet the results returned from the query. + * @param statement statement which is executed to obtain the results. + * @return message which has the minimum sequence id. + * @throws SQLException if an error is returned from the db while obtaining the sequence id value. + */ + private List getMessageWithMinimumId(ResultSet resultSet, String statement) throws SQLException { + ArrayList elements = new ArrayList<>(); + while (resultSet.next()) { + try { + HashMap rowData = new HashMap<>(); + byte[] msgObj = resultSet.getBytes(MESSAGE_COLUMN_NAME); + MessageContext responseMessageContext = deserializeMessage(msgObj); + rowData.put(MESSAGE_COLUMN_NAME, responseMessageContext); + long sequenceId = resultSet.getLong(ResequenceMessageStoreConstants.SEQUENCE_ID_COLUMN); + rowData.put(ResequenceMessageStoreConstants.SEQUENCE_ID_COLUMN, sequenceId); + elements.add(rowData); + } catch (SQLException e) { + String message = "Error executing statement : " + statement + " against " + "DataSource : " + + getJdbcConfiguration().getDSName(); + throw new SynapseException(message, e); + } + } + return elements; + } + + /** + *

+ * Retrieve the minimum sequence number of the available sequence ids in the DB + *

+ *

+ * This operation should call when there's a gap and a timeout occurs. + *

+ *

+ * Note : This operation would reset the "nextSequenceId" to the minimum sequence id generated from the + * DB. + *

+ * + * @return the message context of the next sequence + */ + private MessageContext getMessageWithMinimumSequence() { + String tableName = getJdbcConfiguration().getTableName(); + String selectMinimumSequenceIdStatement = "SELECT message,seq_id FROM " + tableName + " WHERE " + + ResequenceMessageStoreConstants.SEQ_ID + "=(SELECT min(?)" + " from " + tableName + ")"; + Statement stmt = new Statement(selectMinimumSequenceIdStatement) { + @Override + public List getResult(ResultSet resultSet) throws SQLException { + return getMessageWithMinimumId(resultSet, this.getStatement()); + } + }; + stmt.addParameter(ResequenceMessageStoreConstants.SEQ_ID); + MessageContext msg = null; + final int firstRowIndex = 0; + try { + List processedRows = getProcessedRows(stmt); + if (!processedRows.isEmpty()) { + msg = getMessageContext(processedRows, firstRowIndex); + long sequenceId = getSequenceId(processedRows, firstRowIndex); + nextSequenceId = sequenceId + 1; + + if (log.isTraceEnabled()) { + log.trace("Message with id " + msg.getMessageID() + " returned as the minimum, the minimum " + + "sequence " + "will be marked as " + nextSequenceId); + } + } + } catch (SynapseException ex) { + throw new SynapseException("Error while peek the message", ex); + } + return msg; + } + + /** + *

+ * Stores message in database by providing the correct sequence id. + *

+ *

+ * {@inheritDoc} + */ + @Override + protected Statement getStoreMessageStatement(MessageContext context, Long sequenceId) throws StoreException { + Statement storeMessageStatement; + sequenceId = getMessageSequenceId(context); + storeMessageStatement = super.getStoreMessageStatement(context, sequenceId); + return storeMessageStatement; + } + + /** + *

+ * Specifies whether the store should wait instead of processing the message. + *

+ *

+ * This is called if a gap is being detected. The condition would be if the maximum number of peeks have breached. + *

+ * + * @return true if the processor should wait. + */ + private boolean shouldWait() { + long currentTime = System.currentTimeMillis(); + return nextElapsedTime < 0 || currentTime <= nextElapsedTime; + } + + /** + * {@inheritDoc} + */ + @Override + public MessageContext poll() { + MessageContext messageContext = getCurrentMessage(); + messageContext = remove(messageContext.getMessageID()); + if (messageContext != null) { + return messageContext; + } else { + throw new NoSuchElementException("First element not found and remove failed !"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public MessageContext peek() throws SynapseException { + MessageContext msg; + if (!hasStarted) { + nextSequenceId = readStartId() + 1; + hasStarted = true; + } + msg = getNextMessage(); + if (null == msg && !shouldWait()) { + msg = getMessageWithMinimumSequence(); + } + if (null != msg) { + long currentSequenceId = nextSequenceId - 1; + String messageId = msg.getMessageID(); + sequenceIdMapper.put(messageId, currentSequenceId); + if (nextElapsedTime > 0) { + nextElapsedTime = System.currentTimeMillis() + gapTimeoutInterval; + } + if (log.isDebugEnabled()) { + log.debug("Message with sequence " + currentSequenceId + " and message id " + messageId + + " will be returned to the processor."); + log.debug("Next elapsed time would be marked as:" + nextElapsedTime); + } + } + return msg; + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/message/store/impl/resequencer/ResequenceMessageStoreConstants.java b/modules/core/src/main/java/org/apache/synapse/message/store/impl/resequencer/ResequenceMessageStoreConstants.java new file mode 100644 index 000000000..a28d81e7b --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/message/store/impl/resequencer/ResequenceMessageStoreConstants.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.message.store.impl.resequencer; + +/** + * Holds the constants pertaining to re-sequencing + */ +class ResequenceMessageStoreConstants { + /** + * xpath expression to extract the sequence number + */ + static final String SEQUENCE_NUMBER_XPATH = "store.resequence.id.path"; + /** + * Specifies the property name of maximum number of peeks the gap would be detected for. + */ + static final String MAX_NUMBER_OF_WAITING_COUNT = "store.resequence.timeout"; + /** + * Sequence id column name. + */ + static final String SEQ_ID = "seq_id"; + /** + * Sequence id column name, which is stored in the db. + */ + static final String SEQUENCE_ID_COLUMN = "seq_id"; + /** + * Specifies the column value of the statement. + */ + static final String STATEMENT_COLUMN = "statement"; + /** + * Specifies the table name which will be used to maintain the last process id. + */ + static final String LAST_PROCESS_ID_TABLE_NAME = "tbl_lastprocessid"; + /** + * Specifies the meta info key value of the last process id. + */ + private static final String LAST_PROCESS_ID_META_INFO = "LastProcessId"; + +} diff --git a/modules/core/src/main/java/org/apache/synapse/securevault/secret/SecretInformation.java b/modules/core/src/main/java/org/apache/synapse/securevault/secret/SecretInformation.java new file mode 100644 index 000000000..afff7561b --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/securevault/secret/SecretInformation.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.securevault.secret; + +import org.apache.synapse.securevault.SecretResolver; + +/** + * Encapsulates the All information related to a DataSource + */ +public class SecretInformation { + + private String user; + private String aliasSecret; + private String secretPrompt; + private SecretResolver localSecretResolver; + private SecretResolver globalSecretResolver; + private String token; + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + public String getAliasSecret() { + return aliasSecret; + } + + public void setAliasSecret(String aliasSecret) { + this.aliasSecret = aliasSecret; + } + + public String getSecretPrompt() { + return secretPrompt; + } + + public void setSecretPrompt(String secretPrompt) { + this.secretPrompt = secretPrompt; + } + + /** + * Get actual password based on SecretCallbackHandler and alias password + * If SecretCallbackHandler is null, then returns alias password + * + * @return Actual password + */ + public String getResolvedSecret() { + + SecretResolver secretResolver = null; + + if (localSecretResolver != null && localSecretResolver.isInitialized()) { + secretResolver = localSecretResolver; + } else if (globalSecretResolver != null && globalSecretResolver.isInitialized() + && globalSecretResolver.isTokenProtected(token)) { + secretResolver = globalSecretResolver; + } + + if (secretResolver != null) { + if (aliasSecret != null && !"".equals(aliasSecret)) { + if (secretPrompt == null) { + return secretResolver.resolve(aliasSecret); + } else { + return secretResolver.resolve(aliasSecret, secretPrompt); + } + } + } + return aliasSecret; + } + + public SecretResolver getLocalSecretResolver() { + return localSecretResolver; + } + + public void setLocalSecretResolver(SecretResolver localSecretResolver) { + this.localSecretResolver = localSecretResolver; + } + + public SecretResolver getGlobalSecretResolver() { + return globalSecretResolver; + } + + public void setGlobalSecretResolver(SecretResolver globalSecretResolver) { + this.globalSecretResolver = globalSecretResolver; + } + + public String getToken() { + return token; + } + + public void setToken(String token) { + this.token = token; + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/securevault/secret/SecretManager.java b/modules/core/src/main/java/org/apache/synapse/securevault/secret/SecretManager.java new file mode 100644 index 000000000..9e52c8216 --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/securevault/secret/SecretManager.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.securevault.secret; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.synapse.securevault.SecureVaultException; +import org.apache.synapse.securevault.commons.MiscellaneousUtil; +import org.apache.synapse.securevault.definition.IdentityKeyStoreInformation; +import org.apache.synapse.securevault.definition.KeyStoreInformationFactory; +import org.apache.synapse.securevault.definition.TrustKeyStoreInformation; +import org.apache.synapse.securevault.keystore.IdentityKeyStoreWrapper; +import org.apache.synapse.securevault.keystore.TrustKeyStoreWrapper; + +import java.util.Properties; + +/** + * Entry point for manage secrets + */ +public class SecretManager { + + private static Log log = LogFactory.getLog(SecretManager.class); + + private final static SecretManager SECRET_MANAGER = new SecretManager(); + + /* Default configuration file path for secret manager*/ + private final static String PROP_DEFAULT_CONF_LOCATION = "secret-manager.properties"; + /* If the location of the secret manager configuration is provided as a property- it's name */ + private final static String PROP_SECRET_MANAGER_CONF = "secret.manager.conf"; + /* Property key for secretRepositories*/ + private final static String PROP_SECRET_REPOSITORIES = "secretRepositories"; + /* Type of the secret repository */ + private final static String PROP_PROVIDER = "provider"; + /* Dot string */ + private final static String DOT = "."; + + /*Root Secret Repository */ + private SecretRepository parentRepository; + /* True , if secret manage has been started up properly- need to have a at + least one Secret Repository*/ + private boolean initialized = false; + + public static SecretManager getInstance() { + return SECRET_MANAGER; + } + + private SecretManager() { + } + + /** + * Initializes the Secret Manager by providing configuration properties + * + * @param properties Configuration properties + */ + public void init(Properties properties) { + + if (initialized) { + if (log.isDebugEnabled()) { + log.debug("Secret Manager already has been started."); + } + return; + } + + if (properties == null) { + if (log.isDebugEnabled()) { + log.debug("KeyStore configuration properties cannot be found"); + } + return; + } + + String configurationFile = MiscellaneousUtil.getProperty( + properties, PROP_SECRET_MANAGER_CONF, PROP_DEFAULT_CONF_LOCATION); + + Properties configurationProperties = MiscellaneousUtil.loadProperties(configurationFile); + if (configurationProperties == null || configurationProperties.isEmpty()) { + if (log.isDebugEnabled()) { + log.debug("Configuration properties can not be loaded form : " + + configurationFile + " Will use synapse properties"); + } + configurationProperties = properties; + + } + + String repositoriesString = MiscellaneousUtil.getProperty( + configurationProperties, PROP_SECRET_REPOSITORIES, null); + if (repositoriesString == null || "".equals(repositoriesString)) { + if (log.isDebugEnabled()) { + log.debug("No secret repositories have been configured"); + } + return; + } + + String[] repositories = repositoriesString.split(","); + if (repositories == null || repositories.length == 0) { + if (log.isDebugEnabled()) { + log.debug("No secret repositories have been configured"); + } + return; + } + + + //Create a KeyStore Information for private key entry KeyStore + IdentityKeyStoreInformation identityInformation = + KeyStoreInformationFactory.createIdentityKeyStoreInformation(properties); + + // Create a KeyStore Information for trusted certificate KeyStore + TrustKeyStoreInformation trustInformation = + KeyStoreInformationFactory.createTrustKeyStoreInformation(properties); + + String identityKeyPass = null; + String identityStorePass = null; + String trustStorePass = null; + if (identityInformation != null) { + identityKeyPass = identityInformation + .getKeyPasswordProvider().getResolvedSecret(); + identityStorePass = identityInformation + .getKeyStorePasswordProvider().getResolvedSecret(); + } + + if (trustInformation != null) { + trustStorePass = trustInformation + .getKeyStorePasswordProvider().getResolvedSecret(); + } + + + if (!validatePasswords(identityStorePass, identityKeyPass, trustStorePass)) { + if (log.isDebugEnabled()) { + log.info("Either Identity or Trust keystore password is mandatory" + + " in order to initialized secret manager."); + } + return; + } + + IdentityKeyStoreWrapper identityKeyStoreWrapper = new IdentityKeyStoreWrapper(); + identityKeyStoreWrapper.init(identityInformation, identityKeyPass); + + TrustKeyStoreWrapper trustKeyStoreWrapper = new TrustKeyStoreWrapper(); + if (trustInformation != null) { + trustKeyStoreWrapper.init(trustInformation); + } + + SecretRepository currentParent = null; + for (String secretRepo : repositories) { + + StringBuffer sb = new StringBuffer(); + sb.append(PROP_SECRET_REPOSITORIES); + sb.append(DOT); + sb.append(secretRepo); + String id = sb.toString(); + sb.append(DOT); + sb.append(PROP_PROVIDER); + + String provider = MiscellaneousUtil.getProperty( + configurationProperties, sb.toString(), null); + if (provider == null || "".equals(provider)) { + handleException("Repository provider cannot be null "); + } + + if (log.isDebugEnabled()) { + log.debug("Initiating a File Based Secret Repository"); + } + + try { + + Class aClass = getClass().getClassLoader().loadClass(provider.trim()); + Object instance = aClass.newInstance(); + + if (instance instanceof SecretRepositoryProvider) { + SecretRepository secretRepository = ((SecretRepositoryProvider) instance). + getSecretRepository(identityKeyStoreWrapper, trustKeyStoreWrapper); + secretRepository.init(configurationProperties, id); + if (parentRepository == null) { + parentRepository = secretRepository; + } + secretRepository.setParent(currentParent); + currentParent = secretRepository; + if (log.isDebugEnabled()) { + log.debug("Successfully Initiate a Secret Repository provided by : " + + provider); + } + } else { + handleException("Invalid class as SecretRepositoryProvider : Class Name : " + + provider); + } + + } catch (ClassNotFoundException e) { + handleException("A Secret Provider cannot be found for class name : " + provider); + } catch (IllegalAccessException e) { + handleException("Error creating a instance from class : " + provider); + } catch (InstantiationException e) { + handleException("Error creating a instance from class : " + provider); + } + } + + initialized = true; + } + + /** + * Returns the secret corresponding to the given alias name + * + * @param alias The logical or alias name + * @return If there is a secret , otherwise , alias itself + */ + public String getSecret(String alias) { + if (!initialized || parentRepository == null) { + if (log.isDebugEnabled()) { + log.debug("There is no secret repository. Returning alias itself"); + } + return alias; + } + return parentRepository.getSecret(alias); + } + +// /** +// * Returns the encrypted value corresponding to the given alias name +// * +// * @param alias The logical or alias name +// * @return If there is a encrypted value , otherwise , alias itself +// */ +// public String getEncryptedData(String alias) { +// if (!initialized || parentRepository == null) { +// if (log.isDebugEnabled()) { +// log.debug("There is no secret repository. Returning alias itself"); +// } +// return alias; +// } +// return parentRepository.getEncryptedData(alias); +// } + + public boolean isInitialized() { + return initialized; + } + + public void shoutDown() { + this.parentRepository = null; + this.initialized = false; + } + + private static void handleException(String msg) { + log.error(msg); + throw new SecureVaultException(msg); + } + + private boolean validatePasswords(String identityStorePass, + String identityKeyPass, String trustStorePass) { + boolean isValid = false; + if (trustStorePass != null && !"".equals(trustStorePass)) { + if (log.isDebugEnabled()) { + log.debug("Trust Store Password cannot be found."); + } + isValid = true; + } else { + if (identityStorePass != null && !"".equals(identityStorePass) && + identityKeyPass != null && !"".equals(identityKeyPass)) { + if (log.isDebugEnabled()) { + log.debug("Identity Store Password " + + "and Identity Store private key Password cannot be found."); + } + isValid = true; + } + } + return isValid; + } +} diff --git a/modules/integration/pom.xml b/modules/integration/pom.xml index cddd88268..3f783b068 100644 --- a/modules/integration/pom.xml +++ b/modules/integration/pom.xml @@ -57,9 +57,9 @@ - + @@ -81,7 +81,11 @@ - + + + + + diff --git a/modules/integration/src/test/java/org/apache/synapse/samples/framework/DerbyServerController.java b/modules/integration/src/test/java/org/apache/synapse/samples/framework/DerbyServerController.java index 5af733a33..0b24448f6 100644 --- a/modules/integration/src/test/java/org/apache/synapse/samples/framework/DerbyServerController.java +++ b/modules/integration/src/test/java/org/apache/synapse/samples/framework/DerbyServerController.java @@ -82,7 +82,18 @@ private void initData() throws Exception { //client String dbName = "synapsedb"; - String createTableQuery = "CREATE table company(name varchar(10), id varchar(10), price double)"; + String createCompanyTableQuery = "CREATE table company(name varchar(10), id varchar(10), price double)"; + final String createJDBCMessageStoreQuery = "CREATE TABLE jdbc_message_store" + + "(indexId INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY(Start with 1, Increment by 1)," + + "msg_id VARCHAR(200) NOT NULL,message BLOB NOT NULL)"; + final String createResequenceMessageStoreQuery = "CREATE TABLE tbl_resequence" + + "(indexId INTEGER GENERATED ALWAYS AS IDENTITY(Start with 1, Increment by 1) PRIMARY KEY," + + "msg_id VARCHAR(200) NOT NULL UNIQUE," + + "seq_id INTEGER NOT NULL UNIQUE," + + "message BLOB NOT NULL)"; + final String createLastProcessedIdTblQuery = "CREATE TABLE tbl_lastprocessid" + + "(statement VARCHAR(20) PRIMARY KEY,seq_id INTEGER NOT NULL UNIQUE)"; + String connectionURL = "jdbc:derby://localhost:1527/" + dbName + ";create=true"; java.util.Properties props = new java.util.Properties(); @@ -96,7 +107,10 @@ private void initData() throws Exception { Class.forName("org.apache.derby.jdbc.ClientDriver").newInstance(); conn = DriverManager.getConnection(connectionURL, props); Statement s = conn.createStatement(); - s.execute(createTableQuery); + s.execute(createCompanyTableQuery); + s.execute(createJDBCMessageStoreQuery); + s.execute(createResequenceMessageStoreQuery); + s.execute(createLastProcessedIdTblQuery); s.execute("INSERT into company values ('IBM','c1',0.0)"); s.execute(" INSERT into company values ('SUN','c2',0.0)"); s.execute(" INSERT into company values ('MSFT','c3',0.0)"); diff --git a/modules/integration/src/test/java/org/apache/synapse/samples/framework/SynapseTestCase.java b/modules/integration/src/test/java/org/apache/synapse/samples/framework/SynapseTestCase.java index fbec07fa3..0da6d5317 100644 --- a/modules/integration/src/test/java/org/apache/synapse/samples/framework/SynapseTestCase.java +++ b/modules/integration/src/test/java/org/apache/synapse/samples/framework/SynapseTestCase.java @@ -66,6 +66,36 @@ protected SynapseTestCase(int sampleId) { loadConfiguration(); } + /** + * Apply configuration values to message store XML. + * + * @param synapseRepoPath path to the synapse XML repository. + * @param axis2BlockingConfigPath path to axis2 blocking client. + */ + private void applyMessageStoreConfigurationToFile(OMElement synapseRepoPath, OMElement axis2BlockingConfigPath, + OMElement config) { + try { + if (null != synapseRepoPath && null != axis2BlockingConfigPath) { + String currentDir = SynapseTestUtils.getCurrentDir(); + String sampleFilePath = SynapseTestUtils.getRequiredParameter(config.getFirstChildWithName + (new QName(SampleConfigConstants.TAG_SYNAPSE_CONF)), + SampleConfigConstants.TAG_SYNAPSE_CONF_XML); + String normalizeFilePath = FilenameUtils.normalize(currentDir + sampleFilePath); + File synapseSampleFile = new File(normalizeFilePath); + String synapseXml = FileUtils.readFileToString(synapseSampleFile); + String modifiedSynapseAxis2 = SynapseTestUtils.replace(synapseXml, "${AXIS2_REPO}", + FilenameUtils.normalize(currentDir + synapseRepoPath.getText())); + String modifiedAxis2Blocking = SynapseTestUtils.replace(modifiedSynapseAxis2, "${AXIS2_CONFIG}", + FilenameUtils.normalize(currentDir + axis2BlockingConfigPath.getText())); + FileUtils.writeStringToFile(synapseSampleFile, modifiedAxis2Blocking); + } + } catch (IOException e) { + String message = "Error occurred while reading the sample configuration"; + log.error(message, e); + fail(message); + } + } + private void loadConfiguration() { // Parse the sample descriptor OMElement sampleConfig = loadDescriptorInfoFile(); @@ -91,6 +121,17 @@ private void loadConfiguration() { this.sampleName = sampleNameElt.getText(); } + // Load synapse repo name and axis2blocking client, needed for message stores + OMElement synapseConfigElement = + sampleConfig.getFirstChildWithName(new QName(SampleConfigConstants.TAG_SYNAPSE_CONF)); + OMElement synapseRepoPath = synapseConfigElement.getFirstChildWithName( + new QName(SampleConfigConstants.TAG_SYNAPSE_CONF_AXIS2_REPO)); + OMElement axis2BlockingConfigPath = synapseConfigElement.getFirstChildWithName( + new QName(SampleConfigConstants.TAG_AXIS2_BLOCKING_CONFIG)); + + //Apply the configurations to the file + applyMessageStoreConfigurationToFile(synapseRepoPath, axis2BlockingConfigPath, sampleConfig); + // Load Synapse, backend server and client configurations synapseController = initSynapseConfigInfo(sampleConfig); backendServerControllers = initBackEndServersConfigInfo(sampleConfig); diff --git a/modules/integration/src/test/java/org/apache/synapse/samples/framework/TestSamplesHandlerSuite.java b/modules/integration/src/test/java/org/apache/synapse/samples/framework/TestSamplesHandlerSuite.java index 18b441700..12c3a2d1a 100644 --- a/modules/integration/src/test/java/org/apache/synapse/samples/framework/TestSamplesHandlerSuite.java +++ b/modules/integration/src/test/java/org/apache/synapse/samples/framework/TestSamplesHandlerSuite.java @@ -244,6 +244,8 @@ private static void populateSamplesMap() { sampleClassRepo.put("451", Sample451.class); sampleClassRepo.put("452", Sample452.class); sampleClassRepo.put("460", Sample460.class); + sampleClassRepo.put("706", Sample706.class); + sampleClassRepo.put("707", Sample707.class); sampleClassRepo.put("800", Sample800.class); sampleClassRepo.put("10001", Sample10001.class); diff --git a/modules/integration/src/test/java/org/apache/synapse/samples/framework/config/SampleConfigConstants.java b/modules/integration/src/test/java/org/apache/synapse/samples/framework/config/SampleConfigConstants.java index 08708f72c..6c6d610b5 100644 --- a/modules/integration/src/test/java/org/apache/synapse/samples/framework/config/SampleConfigConstants.java +++ b/modules/integration/src/test/java/org/apache/synapse/samples/framework/config/SampleConfigConstants.java @@ -44,6 +44,7 @@ public class SampleConfigConstants { public static final String TAG_BE_SERVER_CONF_AXIS2_REPO = "axis2Repo"; public static final String TAG_BE_SERVER_CONF_AXIS2_XML = "axis2Xml"; + public static final String TAG_AXIS2_BLOCKING_CONFIG = "axis2BlockingXml"; public static final String TAG_BE_SERVER_CONF_AXIS2_HTTP_PORT = "httpPort"; public static final String TAG_BE_SERVER_CONF_AXIS2_HTTPS_PORT = "httpsPort"; public static final String TAG_BE_SERVER_CONF_AXIS2_COUNTER_ENABLED = "counterEnabled"; diff --git a/modules/integration/src/test/java/org/apache/synapse/samples/framework/tests/advanced/Sample706.java b/modules/integration/src/test/java/org/apache/synapse/samples/framework/tests/advanced/Sample706.java new file mode 100644 index 000000000..07238e053 --- /dev/null +++ b/modules/integration/src/test/java/org/apache/synapse/samples/framework/tests/advanced/Sample706.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.samples.framework.tests.advanced; + +import org.apache.synapse.samples.framework.SampleClientResult; +import org.apache.synapse.samples.framework.SynapseTestCase; +import org.apache.synapse.samples.framework.clients.StockQuoteSampleClient; + +public class Sample706 extends SynapseTestCase { + public Sample706() { + super(706); + } + + public void testJDBCMessageStore() throws InterruptedException { + String trpUrl = "http://localhost:8280/"; + StockQuoteSampleClient client = getStockQuoteClient(); + log.info("Running test: Introduction to JDBC message store "); + SampleClientResult result = client.placeOrder(null, trpUrl, null, "WSO2-1"); + assertTrue("Client received response successfully ", result.responseReceived()); + Thread.sleep(7000); + assertEquals(1, getAxis2Server().getMessageCount("SimpleStockQuoteService", "placeOrder")); + } +} diff --git a/modules/integration/src/test/java/org/apache/synapse/samples/framework/tests/advanced/Sample707.java b/modules/integration/src/test/java/org/apache/synapse/samples/framework/tests/advanced/Sample707.java new file mode 100644 index 000000000..92b3a75eb --- /dev/null +++ b/modules/integration/src/test/java/org/apache/synapse/samples/framework/tests/advanced/Sample707.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.samples.framework.tests.advanced; + +import org.apache.synapse.samples.framework.SynapseTestCase; +import org.apache.synapse.samples.framework.clients.StockQuoteSampleClient; + +public class Sample707 extends SynapseTestCase { + public Sample707() { + super(707); + } + + public void testResequenceMessageStore() throws InterruptedException { + String trpUrl = "http://localhost:8280/"; + StockQuoteSampleClient client = getStockQuoteClient(); + log.info("Running test: Introduction to the script mediator using js scripts "); + client.placeOrder(null, trpUrl, null, "WSO2-4"); + client.placeOrder(null, trpUrl, null, "WSO2-2"); + client.placeOrder(null, trpUrl, null, "WSO2-3"); + client.placeOrder(null, trpUrl, null, "WSO2-1"); + Thread.sleep(10000); + assertEquals(4, getAxis2Server().getMessageCount("SimpleStockQuoteService", "placeOrder")); + + } +} diff --git a/modules/integration/src/test/resources/axis2Xml/synapse/axis2_blocking_client.xml b/modules/integration/src/test/resources/axis2Xml/synapse/axis2_blocking_client.xml new file mode 100644 index 000000000..8aa5d6508 --- /dev/null +++ b/modules/integration/src/test/resources/axis2Xml/synapse/axis2_blocking_client.xml @@ -0,0 +1,325 @@ + + + + + + + true + false + false + false + + + + + + 30000 + + + + false + + + + + + false + + admin + axis2 + + + + + + + + + + + + + + + + + + + + + + services + rest + + + false + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 8200 + + + + + + + + + + + + + + + + + + HTTP/1.1 + chunked + true + 200 + + + + + + HTTP/1.1 + chunked + true + 200 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/modules/integration/src/test/resources/extras/synapse_sample_706_altered_msmp.xml b/modules/integration/src/test/resources/extras/synapse_sample_706_altered_msmp.xml new file mode 100644 index 000000000..f3b836efe --- /dev/null +++ b/modules/integration/src/test/resources/extras/synapse_sample_706_altered_msmp.xml @@ -0,0 +1,66 @@ + + + + + + + +
+ + -1 + 1.0 + +
+
+ + + + + + + + + + + + + + + + + The main sequence for the message mediation + + + org.apache.derby.jdbc.ClientDriver + false + synapse + jdbc:derby://localhost:1527/synapsedb + synapse + jdbc_message_store + + + 10000 + ${AXIS2_REPO} + ${AXIS2_CONFIG} + +
diff --git a/modules/integration/src/test/resources/extras/synapse_sample_707_altered_msmp.xml b/modules/integration/src/test/resources/extras/synapse_sample_707_altered_msmp.xml new file mode 100644 index 000000000..5da49b249 --- /dev/null +++ b/modules/integration/src/test/resources/extras/synapse_sample_707_altered_msmp.xml @@ -0,0 +1,71 @@ + + + + + + + +
+ + -1 + 1.0 + +
+
+ + + + + + + + + + + + + + + + + The main sequence for the message mediation + + + -1 + false + MyStore + + synapse + org.apache.derby.jdbc.ClientDriver + synapse + jdbc:derby://localhost:1527/synapsedb + tbl_resequence + + + 10000 + ${AXIS2_REPO} + ${AXIS2_CONFIG} + +
diff --git a/modules/integration/src/test/resources/sample706.xml b/modules/integration/src/test/resources/sample706.xml new file mode 100644 index 000000000..6dfd6f96c --- /dev/null +++ b/modules/integration/src/test/resources/sample706.xml @@ -0,0 +1,42 @@ + + + + + 706 + Introduction to JDBC Message Store + + modules/integration/target/test_repos/synapse + modules/integration/target/test_repos/synapse/conf/axis2_def.xml + modules/integration/target/test_repos/synapse/conf/axis2_blocking_client.xml + modules/integration/target/test_repos/synapse/samples/synapse_sample_706_altered_msmp.xml + + + + modules/integration/target/test_repos/axis2Server + modules/integration/target/test_repos/axis2Server/conf/axis2_def.xml + true + + + + + + modules/integration/target/test_repos/axis2Client + + diff --git a/modules/integration/src/test/resources/sample707.xml b/modules/integration/src/test/resources/sample707.xml new file mode 100644 index 000000000..83ea0d023 --- /dev/null +++ b/modules/integration/src/test/resources/sample707.xml @@ -0,0 +1,42 @@ + + + + + 707 + Introduction to Resequence Message Store + + modules/integration/target/test_repos/synapse + modules/integration/target/test_repos/synapse/conf/axis2_def.xml + modules/integration/target/test_repos/synapse/conf/axis2_blocking_client.xml + modules/integration/target/test_repos/synapse/samples/synapse_sample_707_altered_msmp.xml + + + + modules/integration/target/test_repos/axis2Server + modules/integration/target/test_repos/axis2Server/conf/axis2_def.xml + true + + + + + + modules/integration/target/test_repos/axis2Client + + diff --git a/pom.xml b/pom.xml index 641dd8061..8edf03af5 100644 --- a/pom.xml +++ b/pom.xml @@ -1193,7 +1193,7 @@ 1.2.14 - 10.4.2.0 + 10.14.2.0 3.2.3 8.9 0.9.94 diff --git a/repository/conf/sample/synapse_sample_706.xml b/repository/conf/sample/synapse_sample_706.xml new file mode 100644 index 000000000..af1c09421 --- /dev/null +++ b/repository/conf/sample/synapse_sample_706.xml @@ -0,0 +1,64 @@ + + + + + + + +
+ + -1 + 1.0 + +
+
+ + + + + + + + + + + + + + + + + The main sequence for the message mediation + + + org.apache.derby.jdbc.ClientDriver + false + synapse + jdbc:derby://localhost:1527/synapsedb + synapse + jdbc_message_store + + + 10000 + +
diff --git a/repository/conf/sample/synapse_sample_707.xml b/repository/conf/sample/synapse_sample_707.xml new file mode 100644 index 000000000..d4dde7f26 --- /dev/null +++ b/repository/conf/sample/synapse_sample_707.xml @@ -0,0 +1,69 @@ + + + + + + + +
+ + -1 + 1.0 + +
+
+ + + + + + + + + + + + + + + + + The main sequence for the message mediation + + + -1 + false + MyStore + + synapse + org.apache.derby.jdbc.ClientDriver + synapse + jdbc:derby://localhost:1527/synapsedb + tbl_resequence + + + 10000 + +