1.    Composite Destinations  组合目的地

组合队列Composite Destinations : 允许用一个虚拟的destination代表多个destinations,这样就可以通过composite destinations在一个操作中同时向多个queue/topic发送消息。  有两种实现方式:    第一种:在客户端编码实现    第二种:在activemq.xml配置文件中实现

第一种:在客户端编码实现

    在composite destinations中,多个destination之间采用","分隔。如下:这里有2个destination  "my-queue3"和"topic://topic-1",这个代表主题模式的topic-1

private static final String queueName = "my-queue3,topic://topic-1";

 

  默认是queue模式。

 

package cn.qlq.activemq;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.JMSException;

import javax.jms.MessageProducer;

import javax.jms.Queue;

import javax.jms.Session;

import javax.jms.TemporaryQueue;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**

* 生产消息

*/

public class MsgProducer {

// 默认端口61616

private static final String url = "tcp://localhost:61616/";

private static final String queueName = "my-queue3,topic://topic-1";

private static Session session = null;

public static void main(String[] args) throws JMSException {

// 1创建ConnectionFactory

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

// 2.由connectionFactory创建connection

Connection connection = connectionFactory.createConnection();

// 3.启动connection

connection.start();

// 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式

session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

// 5.创建Destination(Queue继承Queue)

Queue destination = session.createQueue(queueName);

TemporaryQueue temporaryQueue = session.createTemporaryQueue();

// 6.创建生产者producer

MessageProducer producer = session.createProducer(destination);

for (int i = 0; i < 5; i++) {

// 7.创建Message,有好多类型,这里用最简单的TextMessage

TextMessage tms = session.createTextMessage("textMessage:" + i);

// 设置附加属性

tms.setStringProperty("str", "stringProperties" + i);

tms.setJMSPriority(6);

tms.setJMSReplyTo(temporaryQueue);

// 8.生产者发送消息

producer.send(tms);

}

// 9.提交事务

session.commit();

// 10.关闭connection

session.close();

connection.close();

}

}

 

  结果会创建5条  my-queue3  队列消息 与 5条 主题模式   topic-1   消息。

  消费者正常消费即可,与队列模型的消息和主题模式的消息消费一样。

第二种:在activemq.xml配置文件中实现

。。。

 

程序中向组合队列发送消息:

package cn.qlq.activemq;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.JMSException;

import javax.jms.MessageProducer;

import javax.jms.Queue;

import javax.jms.Session;

import javax.jms.TemporaryQueue;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**

* 生产消息

*/

public class MsgProducer {

// 默认端口61616

private static final String url = "tcp://localhost:61616/";

private static final String queueName = "comQueue";

private static Session session = null;

public static void main(String[] args) throws JMSException {

// 1创建ConnectionFactory

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

// 2.由connectionFactory创建connection

Connection connection = connectionFactory.createConnection();

// 3.启动connection

connection.start();

// 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式

session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

// 5.创建Destination(Queue继承Queue)

Queue destination = session.createQueue(queueName);

TemporaryQueue temporaryQueue = session.createTemporaryQueue();

// 6.创建生产者producer

MessageProducer producer = session.createProducer(destination);

for (int i = 0; i < 5; i++) {

// 7.创建Message,有好多类型,这里用最简单的TextMessage

TextMessage tms = session.createTextMessage("textMessage:" + i);

// 设置附加属性

tms.setStringProperty("str", "stringProperties" + i);

tms.setJMSPriority(6);

tms.setJMSReplyTo(temporaryQueue);

// 8.生产者发送消息

producer.send(tms);

}

// 9.提交事务

session.commit();

// 10.关闭connection

session.close();

connection.close();

}

}

结果:

  queue88产生五条消息:

 

   topic88生产五条消息:

 

 

 2 .Configure Startup Destinations--启动创建队列和主题,只是没有消息

  在启动ActiveMQ的时候如果需要创建Destination的话,可以在activemq.xml中配置:

...

 

 3.Delete Inactive Destinations---删除没有消息的队列或主题

  在ActiveMQ的queue在不使用之后,可以通过web控制台或者JMX方式来删除掉,当然,也可以通过配置,使得broker可以自动探测到无用的队列并删除掉,回收响应资源。

。。。

 说明:  schedulePeriodForDestinationPurge: 设置多长时间检查一次,这里是1秒。  inactiveTimoutBeforeGC: 设置当Destination为空后,多长时间被删除,这里是30秒。  gcInactiveDestinations:设置删除掉不活动的队列,默认为false

 4.wildcars(通配符)

  Wildcars用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展。

  ActiveMQ支持以下三种wildcars:    .     用于作为路径上名字间的分隔符    *    用于匹配路径上的任何名字    >   用于递归地匹配任何以这个名字开始的destination

 

5.  Destination 选项

这个是给消费者在JMS规范之外添加的功能特性,通过在队列名称后面使用类似url的语法添加多个选项。包括:1 consumer.perfetchSize,消费者持有的未确认的最大消费数量2 consumer.maximumPendingMessageLimit: 用来控制非持久化的topic在存在慢消费者的情况下,丢弃的数量,默认为03 consumer.noLocal: 默认false4 consumer.dispatchAsync: 是否异步分发,默认true5 consumer.retroactive: 是否为回溯消费者,默认false6 consumer.selector: JMS的selector,默认null7 consumer.exclusive: 是否为独占消费者,默认false8 consumer.priority:设置消费者的优先级,默认0

 使用示例:

Queue queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=

false&consumer.perfetchSize=10");

Consumer consumer = session.createConsumer(queue);

 

6.    虚拟destination用来创建逻辑destination,客户端可以通过它来生产和消费消息,它会把消息映射到物理destination.

ActiveMQ支持2种方式:  1:虚拟主题(Virtual Topics)  2:组合Destinations(Composite Destinations)

为什么使用虚拟主题?  ActiveMQ只有在持久订阅才是持久化的。持久订阅时,每一个持久订阅者,都相当于一个queue的客户端,它会收取所有消息。这种情况下存在两个问题:第一:同一应用内消费者端护在均衡的问题。也就是说一个应用程序内的持久化消息,不能使用对个消费者共同承担消息处理能力。因为每个消费者都会获取所有消息。因为每一个消费者都会获取所有信息。Queue到时可以解决这个问题,但broker端又不能将消息发送到多个应用端,所以纪要发布订阅,又要让消费者分组,这个功能JMS本身是没有的第二:同一应用内消费者端failover问题,由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理,系统的健壮性不高。

如何使用虚拟topic?第一:对于消息发布者来说,就是一个正常的topic,名称以VirtualTopic.开始,比如VirtualTopic.Orders,代码示例如下:

Topicdestination = session.createTopic("VirtualTopic.Orders");

第二:对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列名称,即可表明自己的身份即可实现消费端应用分组。例如Consumer.A.VirtualTopic.Orders说明它是名称为A的消费端,同理Consumer.B VirtualTopic.Orders说明是一名称为B的消费端。可以在同一个应用中使用多个消费者消费这个队列又因为不同应用使用的topic名称不一样,前缀不同,所以不同应用中都可以接受到全部消息。每一个客户端相当于一个持久订阅者,而且这个客户端可以使用多个消费者共同来承担消费任务。

代码示例:

Destination dest = session.createQueue("Consumer.A.VirtualTopic.Orders");

 

生产者代码:

package cn.qlq.activemq.topic;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class MsgProducer {

private static final String url = "tcp://127.0.0.1:61616";

private static final String topicName = "VirtualTopic.Orders";

public static void main(String[] args) throws JMSException {

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

Connection connection = connectionFactory.createConnection();

connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination destination = session.createTopic(topicName);

MessageProducer producer = session.createProducer(destination);

for (int i = 0; i < 10; i++) {

TextMessage tms = session.createTextMessage("textMessage:" + i);

producer.send(tms);

System.out.println("send:" + tms.getText());

}

connection.close();

}

}

 

消费者代码:

package cn.qlq.activemq;

import java.util.Enumeration;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageListener;

import javax.jms.Queue;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**

* 消费消息

*

* @author QiaoLiQiang

* @time 2018年9月18日下午11:26:41

*/

public class MsgConsumer {

// 默认端口61616

private static final String url = "tcp://localhost:61616/";

private static final String queueName = "Consumer.A.VirtualTopic.Orders";

public static void main(String[] args) throws JMSException {

// 1创建ConnectionFactory

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

// 2.由connectionFactory创建connection

Connection connection = connectionFactory.createConnection();

Enumeration jmsxPropertyNames = connection.getMetaData().getJMSXPropertyNames();

while (jmsxPropertyNames.hasMoreElements()) {

String nextElement = (String) jmsxPropertyNames.nextElement();

System.out.println("JMSX name ===" + nextElement);

}

// 3.启动connection

connection.start();

// 4.创建Session===第一个参数是是否事务管理,第二个参数是应答模式

Session session = connection.createSession(true, Session.SESSION_TRANSACTED);

// 5.创建Destination(Queue继承Queue)

Queue destination = session.createQueue(queueName);

// 6.创建消费者consumer

MessageConsumer consumer = session.createConsumer(destination);

int i = 0;

while (i < 5) {

TextMessage textMessage = (TextMessage) consumer.receive();

System.out.println("接收消息:" + textMessage.getText() + ";属性" + textMessage.getStringProperty("str"));

i++;

if (i == 5) {// 确保消费完所有的消息再进行确认

textMessage.acknowledge();

}

}

// 提交事务,进行确认收到消息

session.commit();

session.close();

connection.close();

}

}

 

其实把消费者队列化了。

修改虚拟主题的前缀:

默认前缀是VirtualTopic.>

自定义消费虚拟地址默认格式:Consumer.*.VirtualTopic.>

修改配置:

 

查看原文