ActiveMQ官方网站:
关于ActiveMQ消息传递的方式详见:
本篇博客旨在解决的问题:
1.如何在普通Java环境中使用ActiveMQ
2.ActiveMQ如何与Spring的整合(XML配置)
3.在SpringBoot中如何使用ActiveMQ
环境:
1. windows 10 64bit
2. apache-activemq-5.14.4
3. jdk 1.8
4. maven 3.3
前置条件:
1.安装启动ActiveMQ:
在官方网站()上下载ActiveMQ
解压后,进入到目录bin中,根据自己操作系统的位数进入到win64或者win32目录下,然后点击activemq.bat启动ActiveMQ。
启动后在浏览器输入,看到以下画面表示启动成功:
点击“”进入到ActiveMQ的后台管理界面,若要求输入用户名密码则初始用户名密码为admin,admin,如下:
2.本博客使用Maven构建项目,引入以下依赖(问题1与问题2需要引入):
org.apache.activemq activemq-all 5.14.4
问题1-如何在普通Java环境中使用ActiveMQ:
采用PTP方式传递消息:
消息生产者:
package at.flying.activemq.ptp;import at.flying.domain.Student;import com.github.flyinghe.tools.CommonUtils;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.commons.lang3.time.DateFormatUtils;import javax.jms.*;import java.util.Date;/** * PTP方式传递消息 */public class ActiveMQProducer { public static void main(String[] args) throws Exception { // 构造ConnectionFactory实例对象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 从工厂获取连接对象 Connection connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接,一个发送或接收消息的线程 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取消息目的地,消息发送给谁 Destination destination = session.createQueue("test-queue"); // 获取消息生产者 MessageProducer producer = session.createProducer(destination); // 设置不持久化,此处学习,实际根据项目决定 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 构造消息 for (int i = 1; i <= 4; i++) { Student student = new Student(); student.setId((long) i); student.setName("学生" + i); student.setBirthday(new Date()); TextMessage message = session.createTextMessage(CommonUtils.serialize(student)); // 发送消息到目的地方 producer.send(message); System.out.println(String.format("发送消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); } connection.close(); }}
消息消费者1:
package at.flying.activemq.ptp;import at.flying.domain.Student;import com.github.flyinghe.tools.CommonUtils;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.commons.lang3.time.DateFormatUtils;import javax.jms.*;/** * PTP方式接收消息 */public class ActiveMQConsumer1 { public static void main(String[] args) throws Exception { // 构造ConnectionFactory实例对象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 从工厂获取连接对象 Connection connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接,一个发送或接收消息的线程 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取消息目的地,消息发送给谁 Destination destination = session.createQueue("test-queue"); // 消费者,消息接收者 MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { if (null != message) { Student student = CommonUtils.deserialize(((TextMessage) message).getText()); System.out.println( String.format("ActiveMQConsumer1-接受消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); } } catch (JMSException e) { } } }); System.in.read(); connection.close(); }}
消息消费者2:
package at.flying.activemq.ptp;import at.flying.domain.Student;import com.github.flyinghe.tools.CommonUtils;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.commons.lang3.time.DateFormatUtils;import javax.jms.*;/** * PTP方式接收消息 */public class ActiveMQConsumer2 { public static void main(String[] args) throws Exception { // 构造ConnectionFactory实例对象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 从工厂获取连接对象 Connection connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接,一个发送或接收消息的线程 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取消息目的地,消息发送给谁 Destination destination = session.createQueue("test-queue"); // 消费者,消息接收者 MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { if (null != message) { Student student = CommonUtils.deserialize(((TextMessage) message).getText()); System.out.println( String.format("ActiveMQConsumer2-接受消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); } } catch (JMSException e) { } } }); System.in.read(); connection.close(); }}
先启动两个消息消费者,再启动消息生产者,控制台输出信息如下:
消息生产者:
消息消费者1:
消息消费者2:
这个结果使我们很容易理解PTP的消息传递方式。
采用Pub/Sub方式传递消息:
消息生产者:
package at.flying.activemq.pubsub;import at.flying.domain.Student;import com.github.flyinghe.tools.CommonUtils;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.commons.lang3.time.DateFormatUtils;import javax.jms.*;import java.util.Date;/** * Pub/Sub方式传递消息 */public class ActiveMQProducer { public static void main(String[] args) throws Exception { // 构造ConnectionFactory实例对象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 从工厂获取连接对象 Connection connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接,一个发送或接收消息的线程 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取消息目的地,消息发送给谁 Destination destination = session.createTopic("test-topic"); // 获取消息生产者 MessageProducer producer = session.createProducer(destination); // 设置不持久化,此处学习,实际根据项目决定 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 构造消息 for (int i = 1; i <= 4; i++) { Student student = new Student(); student.setId((long) i); student.setName("学生" + i); student.setBirthday(new Date()); TextMessage message = session.createTextMessage(CommonUtils.serialize(student)); // 发送消息到目的地方 producer.send(message); System.out.println(String.format("发送消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); } connection.close(); }}
消息消费者1:
package at.flying.activemq.pubsub;import at.flying.domain.Student;import com.github.flyinghe.tools.CommonUtils;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.commons.lang3.time.DateFormatUtils;import javax.jms.*;/** * Pub/Sub方式接收消息 */public class ActiveMQConsumer1 { public static void main(String[] args) throws Exception { // 构造ConnectionFactory实例对象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 从工厂获取连接对象 Connection connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接,一个发送或接收消息的线程 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取消息目的地,消息发送给谁 Destination destination = session.createTopic("test-topic"); // 消费者,消息接收者 MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { if (null != message) { Student student = CommonUtils.deserialize(((TextMessage) message).getText()); System.out.println( String.format("ActiveMQConsumer1-接受消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); } } catch (JMSException e) { } } }); System.in.read(); connection.close(); }}
消息消费者2:
package at.flying.activemq.pubsub;import at.flying.domain.Student;import com.github.flyinghe.tools.CommonUtils;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.commons.lang3.time.DateFormatUtils;import javax.jms.*;/** * Pub/Sub方式接收消息 */public class ActiveMQConsumer2 { public static void main(String[] args) throws Exception { // 构造ConnectionFactory实例对象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 从工厂获取连接对象 Connection connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接,一个发送或接收消息的线程 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取消息目的地,消息发送给谁 Destination destination = session.createTopic("test-topic"); // 消费者,消息接收者 MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { if (null != message) { Student student = CommonUtils.deserialize(((TextMessage) message).getText()); System.out.println( String.format("ActiveMQConsumer2-接受消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); } } catch (JMSException e) { } } }); System.in.read(); connection.close(); }}
先启动两个消息消费者,再启动消息生产者,控制台输出信息如下:
消息生产者:
消息消费者1:
消息消费者2:
这个结果使我们很容易理解Pub/Sub的消息传递方式。
总结:
从以上代码可以看出PTP与Pub/Sub方式的消息传递,只是在创建消息目的地的时候不一样:
PTP方式创建的消息目的地是Queue(队列),Pub/Sub方式创建的消息目的地是Topic(主题)。
问题2-ActiveMQ如何与Spring的整合(XML配置):
ActiveMQ与Spring整合时并不需要额外依赖类似xxx-spring.jar的jar包,因为在activemq-all包中已经包含了这些依赖。
类似于其他框架诸如Quartz定时等框架与Spring整合一样,需要配置xml并在applicationContext.xml总配置文件中引入ActiveMQ的配置文件。
ActiveMQ的配置文件如下:
消息监听器定义如下:
package at.flying.activemq.listener;import at.flying.domain.Student;import com.github.flyinghe.tools.CommonUtils;import org.apache.commons.lang3.time.DateFormatUtils;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;/** * 消息监听器(消费者) */public class IMessageListener implements MessageListener { @Override public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { Student student = CommonUtils.deserialize(textMessage.getText()); System.out.println( String.format("%sListener-接受消息:%d-%s-%s", message.getJMSDestination().toString().toLowerCase().startsWith("topic") ? "Topic" : "Queue", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); } catch (JMSException e) { e.printStackTrace(); } } }}
在applicationContext.xml总配置文件中引入ActiveMQ的配置文件:
至此,配置文件配置完毕。
为测试ActiveMQ在Web应用中的使用,我们需要写一个页面与一个Controller来做测试。
准备一个JSP页面(其实随便啥页面都行):
<%@ page language = "java" import = "java.util.*" pageEncoding = "UTF-8" %><%@taglib prefix = "c" uri = "http://java.sun.com/jsp/jstl/core" %><% String path = request.getContextPath(); String basePath = request.getScheme() + "://" + request.getServerName() + ":" + request.getServerPort() + path + "/";%>Activemq 学习 Activemq 学习
测试PTP方式传递消息
测试Pub/Sub方式传递消息
准备一个接收请求的Controller:
package at.flying.web.action;import at.flying.domain.Student;import com.github.flyinghe.tools.CommonUtils;import org.apache.commons.lang3.time.DateFormatUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.servlet.ModelAndView;import javax.jms.*;import java.util.Date;@Controller@RequestMapping("activemq")public class ActivemqAction { @Autowired @Qualifier("jmsTemplate") private JmsTemplate jmsTemplate; @Autowired @Qualifier("queueDestination") private Destination queueDestination; @Autowired @Qualifier("topicDestination") private Destination topicDestination; @RequestMapping(value = "testQueue", method = {RequestMethod.GET, RequestMethod.POST}) public ModelAndView testQueue( @RequestParam("sid") Long sid, @RequestParam("name") String name) { ModelAndView modelAndView = new ModelAndView(); this.jmsTemplate.send(this.queueDestination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { Student student = new Student(); student.setBirthday(new Date()); student.setId(sid); student.setName(name); TextMessage message = session.createTextMessage(CommonUtils.serialize(student)); // 发送消息到目的地方 System.out.println(String.format("发送消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); return message; } }); modelAndView.setViewName("redirect:/activemq/start.jsp"); return modelAndView; } @RequestMapping(value = "testTopic", method = {RequestMethod.GET, RequestMethod.POST}) public ModelAndView testTopic( @RequestParam("sid") Long sid, @RequestParam("name") String name) { ModelAndView modelAndView = new ModelAndView(); this.jmsTemplate.send(this.topicDestination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { Student student = new Student(); student.setBirthday(new Date()); student.setId(sid); student.setName(name); TextMessage message = session.createTextMessage(CommonUtils.serialize(student)); // 发送消息到目的地方 System.out.println(String.format("发送消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); return message; } }); modelAndView.setViewName("redirect:/activemq/start.jsp"); return modelAndView; }}
测试结果如下:
①PTP方式:
消息发送页面:
消息发送控制台:
消息接收控制台:
②Pub/Sub方式:
消息发送页面:
消息发送控制台:
消息接收控制台:
问题3-在SpringBoot中如何使用ActiveMQ:
首先在pom文件中加入如下依赖:
org.springframework.boot spring-boot-starter-activemq
application.properties文件中加入如下配置:
#ActiveMQspring.activemq.broker-url=tcp://localhost:61616spring.activemq.user=adminspring.activemq.password=admin#配置消息类型,false则消息模式为PTP,true则消息模式为PUB/SUB,默认值为falsespring.jms.pub-sub-domain=false
新建一个配置类ActiveMQConfig:
package at.flying.springbootproject.config.activemq;import org.apache.activemq.command.ActiveMQQueue;import org.apache.activemq.command.ActiveMQTopic;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import javax.jms.Destination;@Configurationpublic class ActiveMQConfig { @Bean("test-queue") public Destination testQueue() { return new ActiveMQQueue("test-queue"); } @Bean("test-topic") public Destination testTopic() { return new ActiveMQTopic("test-topic"); }}
配置监听器(消息消费者):
package at.flying.springbootproject.config.activemq;import org.springframework.jms.annotation.JmsListener;import org.springframework.stereotype.Component;@Componentpublic class ConsumerListenters { @JmsListener(destination = "test-queue") public void testQueue1(String msg) { System.out.println(String.format("test-queue1:%s", msg)); } @JmsListener(destination = "test-queue") public void testQueue2(String msg) { System.out.println(String.format("test-queue2:%s", msg)); } @JmsListener(destination = "test-topic") public void testTopic1(String msg) { System.out.println(String.format("test-topic1:%s", msg)); } @JmsListener(destination = "test-topic") public void testTopic2(String msg) { System.out.println(String.format("test-topic2:%s", msg)); }}
配置消息生产者:
package at.flying.springbootproject.service;import org.apache.commons.lang3.StringUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.jms.core.JmsMessagingTemplate;import org.springframework.stereotype.Service;import javax.jms.Destination;@Servicepublic class ActiveMQService { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired @Qualifier("test-queue") private Destination testQueue; @Autowired @Qualifier("test-topic") private Destination testTopic; public void testQueue(String msg) { if (StringUtils.isNotBlank(msg)) { this.jmsMessagingTemplate.convertAndSend(this.testQueue, msg); } } public void testTopic(String msg) { if (StringUtils.isNotBlank(msg)) { this.jmsMessagingTemplate.convertAndSend(this.testTopic, msg); } }}
到这里其实已经配置完毕,但是为了测试效果我们还需要一个Controller来接受页面请求然后触发消息的发送:
package at.flying.springbootproject.controller;import at.flying.springbootproject.service.ActiveMQService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.ResponseBody;@Controller@RequestMapping("activemq")public class TestActiveMQController { @Autowired private ActiveMQService activeMQService; @RequestMapping("test-queue") @ResponseBody public String test1( @RequestParam(value = "msg", required = false) String msg) { this.activeMQService.testQueue(msg); return msg; } @RequestMapping("test-topic") @ResponseBody public String test2( @RequestParam(value = "msg", required = false) String msg) { this.activeMQService.testTopic(msg); return msg; }}
然后我们打开浏览器请求test-queue:
连续请求4次,控制台输出如下:
输出了四条消息,并且是两个消费者轮流消费。
现在我们来测试test-topic:
注意:
此时我们需要把application.properties里的spring.jms.pub-sub-domain属性改为true,因为true值才代表消息模式为PUB/SUB,若不更改不会报错,但是发送Topic消息时消息消费者不会消费该消息,也就是没有触发Topic消息监听器。
我们打开浏览器请求test-topic:
连续请求2次,控制台输出如下:
输出了四条消息,同一消息两个Topic消费者均消费了。