-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit ff989ac
Showing
2,963 changed files
with
237,725 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
/private/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
Connection | ||
* JMS定义的接口,代表跟MQ服务器的一个连接,通常一个服务器只用定义一个接口 | ||
* 当一个 Connection 定义的时候,默认是关闭状态.需要手动开启 | ||
* 一个 Connection 可以创建N多个 Session | ||
* 当程序执行完毕,应该关闭 Connection,它会自动的去关闭 Session,MessageProducer 和 MessageConsumer | ||
|
||
|
||
Session | ||
* JMS定义的接口,由 Connection 生成,一个或者多个. Session 是一个收发消息的线程 | ||
* 可以使用 Session 创建 MessageProducer,MessageConsumer和Message | ||
* Session 可以被事务化,或者非事务化,通过向创建 Session 的方法,传递 boolean 参数控制 | ||
Session session = connection.createSession(boolean transacted, int acknowledgeMode); | ||
transacted:是否创建事务session | ||
'本地事务' | ||
结束事务有两种方法,提交或者回滚 | ||
当一个事务提交,消息被处理,如果事务中有一个步骤失败,事务就回滚.这个事务中已执行的动作将会被撤销. | ||
在发送消息最后也必须要用 session.commit();方法表示提交事务 | ||
'所谓的事务回滚,其实就是---不发送消息到message,在回滚/提交之前.你可以进行大量的消息写入队列或者其他操作,提交就去了服务器,回滚就啥也没了' | ||
|
||
acknowledgeMode:消息签收模式 | ||
Session.AUTO_ACKNOWLEDGE | ||
当消费者从receive或者onMessage成功返回时,Session自动签收消费者这条消息的收条 | ||
'自动的签收消息' | ||
|
||
Session.CLIENT_ACKNOWLEDGE | ||
消费者通过调用消息(Message)的acknowledge方法签收消息 | ||
这种情况下,签收发生在 Session 层面 | ||
签收一个已经消费的消息,会自动的签收这个 Session 所有已经消息的收条 | ||
'手动的签收消息' | ||
|
||
Session.DUPS_OK_ACKNOWLEDGE | ||
这参数,Session不会确保对传送消息的签收,它可能引起消息的重复,但是降低了Session的开销 | ||
只有消费者允许重复的消息,才使用 | ||
'不执行签收,N个消费者可能会消费同一条消息,或者一个消费者重复消费同一条消息' | ||
|
||
|
||
MessageProducer | ||
* 是一个由 Session 创建的对象,用来向 Destination 发送消息 | ||
* session创建 | ||
MessageProducer createProducer(Destination destination) | ||
* 发送消息 | ||
void send(Message message) | ||
void send(Message message, int deliveryMode, int priority, long timeToLive) | ||
void send(Destination destination,Message message) | ||
void send(Destination destination,Message message,int deliveryMode,int priority,long timeToLive) | ||
* 参数介绍 | ||
deliveryMode | ||
* 消息传送模式(是否持久化) | ||
DeliveryMode.NON_PERSISTENT //不持久化,性能好.数据易丢失 | ||
DeliveryMode.PERSISTENT(默认) //持久化,性能差.数据不易丢失 | ||
|
||
priority | ||
* 消息优先级 | ||
优先级从 0-9 一共十个级别(默认值=4),JMS不要求严格按照10个优先级发送消息 | ||
0-4 普通消息 // | ||
5-9 加急消息 //该消息,理论会比普通消息先到达.注意哟,是理论 | ||
|
||
timeToLive | ||
* 过期时间 | ||
默认清空下,消息永远不会过期,如果需要消息在一定时间内失去意义,那么可以设置该选项 | ||
单位是毫秒 | ||
|
||
|
||
MessageConsumer | ||
* 是一个由 Session 创建的对象,用来从 Destination 接收消息 | ||
* session创建 | ||
MessageConsumer createConsumer(Destination destination); | ||
MessageConsumer createConsumer(Destination destination,String messageSelector); | ||
MessageConsumer createConsumer(Destination destination,String messageSelector,boolean NoLocal); | ||
* 参数介绍 | ||
messageSelector | ||
* 消息选择器(过滤器) | ||
* 是基于 SQL92的语法,其实就是某种规则的字符串(例如:正则) | ||
|
||
NoLocal | ||
* 默认为 false | ||
* 为 true 的话,只能接收和自己相同连接(Connection)所发布的消息,此标识仅仅'适用于主题,不适用于队列' | ||
name | ||
* 明确,订阅主题的时候,声明的订阅名称.持久订阅时需要此参数 | ||
|
||
|
||
TopicSubscriber createDurableSubscriber(Topic topic,String name); | ||
TopicSubscriber createDurableSubscriber(Topic topic,String name,String messageSelector,boolean noLocal) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
------------------------------------- | ||
ActiveMQ-Destination高级特性 | | ||
------------------------------------- | ||
|
||
|
||
------------------------------------- | ||
ActiveMQ-Wildcards | | ||
------------------------------------- | ||
# Wildcards 通配符 | ||
# 这东西并不是JMS的规范,而是ActiveMQ的扩展 | ||
# ActiveMQ支持一下三种:wildcards | ||
①:".",用于作为路径上名字间的分隔符 | ||
②:"*",用于匹配任何名字 | ||
③:">",用于递归匹配任何以这个名字开始的 Destination | ||
------------------------------------- | ||
ActiveMQ-Composite Destinations | | ||
------------------------------------- | ||
# 组合队列 | ||
# 组合队列,允许用一个虚拟的 Destination 代表多个 Destination | ||
# 说白了就是,同时往多个队列,或者 topic 推送消息 | ||
# 代码 | ||
/** | ||
此处就是创建了带有三个队列的一个 Queue | ||
多个Queue用逗号隔开 | ||
*/ | ||
Queue queue = new ActiveMQQueue("queue#1,queue#2,queue#3"); | ||
/** | ||
通过这个,创建消息生产者 | ||
*/ | ||
MessageProducer producer = session.createProducer(queue); | ||
# 也可以混合发送,就是一个 Queue 里面不进有队列,还有 Topic | ||
# 代码 | ||
Queue queue = new ActiveMQQueue("queue#1,topic://topic#1"); | ||
MessageProducer producer = session.createProducer(queue); | ||
* 跟上面其实一个德行,无非就是.话题的格式是用: topic:// | ||
|
||
# 也可以通过配置来完成...不过很傻逼,不学 | ||
|
||
|
||
------------------------------------- | ||
ActiveMQ-Delete Inactive Destination | | ||
------------------------------------- | ||
# 自动删除队列 | ||
# 一般清空下,ActiveMQ,的 Queue,在不使用之后,可以通过WEB控制,或者JMX的方式来进行删除 | ||
# 也可以通过配置,让MQ自动探测到无用的队列(一定时间内为空的队列),进行删除.释放资源 | ||
|
||
<broker schedulePeriodForDestinationPurge="10000"> | ||
<destinationPolicy> | ||
<policyMap> | ||
<policyEntries> | ||
<policyEntry queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="30000"/> | ||
</policyEntries> | ||
</policyMap> | ||
</destinationPolicy> | ||
</broker> | ||
|
||
* schedulePeriodForDestinationPurge :多长时间检查一次,这里设置为10S,默认为0 | ||
* inactiveTimeoutBeforGC :当 Destination 为空后,多长时间被删除,这里是30s,默认为60 | ||
gcInactiveDestinations :设置删除掉不活动的队列,默认为 false | ||
|
||
------------------------------------- | ||
ActiveMQ- Destination Options | | ||
------------------------------------- | ||
# 队列选项,这东西也并不是JMS规范中的东西 | ||
# 其实就是说可以在队列名称后面向HTTP的URL传参一样传递一些参数 | ||
# 参数有 | ||
consumer.prefetchSize | ||
* consumer持久的未ack的最大消息数量,默认值为 variable | ||
consumer.maximumPendingMessageLimit | ||
* 用来控制非持久化的topic在存在慢消费者的情况下丢弃的数量 | ||
consumer.noLocal | ||
* 默认为 false | ||
consumer.dispatchAsync | ||
* 是否异步分发,默认为 true | ||
consumer.retroactive | ||
* 是否回溯消费者,默认为 false | ||
consumer.selector | ||
* JMS的selector,默认为 null | ||
consumer.exclusive | ||
* 是否为独占消费者,默认为 false | ||
consumer.priority | ||
* 设置消费者的优先级别,默认 0 | ||
|
||
# Demo | ||
Queue queue = new ActiveMQQueue("queue#1?consumer.exclusive=false&consumer.priority=4"); | ||
MessageConsumer messageConsumer = session.createConsumer(queue); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
package com.kevin.demo.activemq.helloworld; | ||
import javax.jms.Connection; | ||
import javax.jms.ConnectionFactory; | ||
import javax.jms.DeliveryMode; | ||
import javax.jms.Destination; | ||
import javax.jms.JMSException; | ||
import javax.jms.MessageProducer; | ||
import javax.jms.Session; | ||
import javax.jms.TextMessage; | ||
import org.apache.activemq.ActiveMQConnectionFactory; | ||
/** | ||
* 消息生产者 | ||
* */ | ||
public class Provider { | ||
public static void main(String[] args) throws JMSException { | ||
/** | ||
* 1,创建连接工场.使用JMS规范 | ||
* 用户名和密码使用默认,连接方式为TCP | ||
* */ | ||
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( | ||
ActiveMQConnectionFactory.DEFAULT_USER, | ||
ActiveMQConnectionFactory.DEFAULT_PASSWORD, | ||
"tcp://123.207.122.145:61616"); | ||
/*** | ||
* 2,通过工厂获取一个连接,并且打开它(在创建连接的时候,也是可以设置用户名和密码) | ||
* 默认是关闭状态 | ||
* */ | ||
Connection connection = connectionFactory.createConnection(); | ||
connection.start(); | ||
/** | ||
* 3,通过连接,获取会话 | ||
* 参数1,是否启用事务(不启用) | ||
* 参数2,消息签收模式(消费者手动签收) | ||
* */ | ||
Session session = connection.createSession(Boolean.TRUE,Session.CLIENT_ACKNOWLEDGE); | ||
/** | ||
* 4,通过会话创建 Destination 对象 | ||
* Destination ,用来描述 消费者 和 消息来源的对象.其实是一个高层的抽象 | ||
* 在PTP模式中,Destination 被称为:Queue,也就是队列 | ||
* 在Pub/Sub模式中,Destination 被称为:Topic,也就是主题 | ||
* */ | ||
Destination destination = session.createQueue("queue#1"); //PTP模式,创建的是队列,并且指定队列名称 | ||
/** | ||
* 5,通过会话创建 消息生产者 MessageProducer | ||
* 构造传入 Destination 描述对象 | ||
* 这里的构造,也可以指定为 null,在 messageProducer 执行 send 发送消息的时候,才指定 Destination | ||
* */ | ||
MessageProducer messageProducer = session.createProducer(destination); | ||
/** | ||
* 6,设置 消息生产者的一些属性 | ||
* 也可以在执行 send 的时候,去指定属性 | ||
* */ | ||
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //设置为非持久化 | ||
messageProducer.setPriority(9); //设置消息优先级别 | ||
/** | ||
* 7,最后,通过会话,使用JMS规范中的 TextMessage 形式创建数据,并使用 消息生产者MessageProducer 来发送消息 | ||
* */ | ||
for(int x = 0;x < 5;x++){ | ||
TextMessage message = session.createTextMessage("消息签收模式-客户端签收消息#" + x); //直接指定消息内容 | ||
//message.setText("hello"); 也可以创建后添加消息 | ||
messageProducer.send(message); | ||
//N多构造,可以在发送的时候,配置一些属性 | ||
//messageProducer.send(destination, message, DeliveryMode.NON_PERSISTENT, Priority.DEBUG_INT, 9999999); | ||
} | ||
session.commit(); | ||
/** | ||
* 8,关闭资源 | ||
* */ | ||
if(connection != null){ | ||
/** | ||
* 仅需要关闭此连接,它会自己去关闭其他的有关连接 | ||
* */ | ||
connection.close(); | ||
} | ||
} | ||
} | ||
|
||
|
||
|
||
----------------------------------------------------------------------------------------------- | ||
package com.kevin.demo.activemq.helloworld; | ||
import javax.jms.Connection; | ||
import javax.jms.ConnectionFactory; | ||
import javax.jms.Destination; | ||
import javax.jms.JMSException; | ||
import javax.jms.MessageConsumer; | ||
import javax.jms.Session; | ||
import javax.jms.TextMessage; | ||
import org.apache.activemq.ActiveMQConnectionFactory; | ||
/** | ||
* 消息消费者 | ||
* */ | ||
public class Consumer { | ||
public static void main(String[] args) throws JMSException { | ||
/** | ||
* 1,建立ConnectionFactory工厂 | ||
* 使用默认的用户名和密码,连接方式为TCP | ||
* */ | ||
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( | ||
ActiveMQConnectionFactory.DEFAULT_USER, | ||
ActiveMQConnectionFactory.DEFAULT_PASSWORD, | ||
"tcp://123.207.122.145:61616"); | ||
/** | ||
* 2,通过ConnectionFactory工厂,获取到连接 Connection,并且打开连接(默认为关闭状态) | ||
* */ | ||
Connection connection = connectionFactory.createConnection(); | ||
connection.start(); | ||
/** | ||
* 3,通过连接,创建会话,Session | ||
* 不启用事务 | ||
* 消息签收模式为 消费者手动签收 | ||
* */ | ||
Session session = connection.createSession(Boolean.FALSE,Session.CLIENT_ACKNOWLEDGE); | ||
/** | ||
* 4,通过 会话 创建 Destination 对象 | ||
* Destination ,用来描述 消费者 和 消息来源的对象.其实是一个高层的抽象 | ||
* 在PTP模式中,Destination 被称为:Queue,也就是队列 | ||
* 在Pub/Sub模式中,Destination 被称为:Topic,也就是主题 | ||
* */ | ||
Destination destination = session.createQueue("queue#1"); //指定队列的名称 | ||
/** | ||
* 5,通过Session创建消息接收对象 | ||
* 构造传入 Destination 描述对象 | ||
* */ | ||
MessageConsumer messageConsumer = session.createConsumer(destination); | ||
/** | ||
* 6,使用接收对象(MessageConsumer),的 receive 来创建 ,JMS规范的 TextMessage 对象, | ||
* */ | ||
while(true){ | ||
/** | ||
* receive(),这个方法有很多重载形式 | ||
* 1,空参,阻塞,不多解释.BIO那种节奏 | ||
* 2,Long,阻塞时间,如果超时.就会往下执行 | ||
* 3,NoWait(),程序执行到这里检查是否有数据,有数据就消费.不然直接往下执行.不会等待,更不会阻塞 | ||
* */ | ||
TextMessage message = (TextMessage) messageConsumer.receive(); | ||
if(message == null){ | ||
break; | ||
} | ||
message.acknowledge(); //手动签收消息,其实就是另起线程.TCP通知MQ,这个消息我成功消费了 | ||
System.out.println(message.toString()); | ||
System.err.println("收到消息:" + message.getText()); | ||
} | ||
/** | ||
*7, 关闭资源 | ||
* */ | ||
if(connection != null){ | ||
connection.close(); | ||
} | ||
} | ||
} |
Oops, something went wrong.