博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
初识ActiveMQ消息中间件
阅读量:6036 次
发布时间:2019-06-20

本文共 20738 字,大约阅读时间需要 69 分钟。

hot3.png

ActiveMQ官方网站:

关于ActiveMQ消息传递的方式详见:

本篇博客旨在解决的问题:

1.如何在普通Java环境中使用ActiveMQ

2.ActiveMQ如何与Spring的整合(XML配置)

3.在SpringBoot中如何使用ActiveMQ

环境:

1. windows 10 64bit

2. apache-activemq-5.14.4

3. jdk 1.8

4. maven 3.3

前置条件:

1.安装启动ActiveMQ:

在官方网站()上下载ActiveMQ

d47a9f3851879e287a8aca7b5a1d33778d8.jpg

解压后,进入到目录bin中,根据自己操作系统的位数进入到win64或者win32目录下,然后点击activemq.bat启动ActiveMQ。

启动后在浏览器输入,看到以下画面表示启动成功:

2a4d12ca340a646b7254222fdf3985a1ed3.jpg

点击“”进入到ActiveMQ的后台管理界面,若要求输入用户名密码则初始用户名密码为admin,admin,如下:

17364ff0c0f8dde0ff0421fca8077690902.jpg

2.本博客使用Maven构建项目,引入以下依赖(问题1与问题2需要引入):

org.apache.activemq
activemq-all
5.14.4

问题1-如何在普通Java环境中使用ActiveMQ:

采用PTP方式传递消息:

消息生产者:

package at.flying.activemq.ptp;import at.flying.domain.Student;import com.github.flyinghe.tools.CommonUtils;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.commons.lang3.time.DateFormatUtils;import javax.jms.*;import java.util.Date;/** * PTP方式传递消息 */public class ActiveMQProducer {    public static void main(String[] args) throws Exception {        // 构造ConnectionFactory实例对象        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");        // 从工厂获取连接对象        Connection connection = connectionFactory.createConnection();        // 启动        connection.start();        // 获取操作连接,一个发送或接收消息的线程        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);        // 获取消息目的地,消息发送给谁        Destination destination = session.createQueue("test-queue");        // 获取消息生产者        MessageProducer producer = session.createProducer(destination);        // 设置不持久化,此处学习,实际根据项目决定        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);        // 构造消息        for (int i = 1; i <= 4; i++) {            Student student = new Student();            student.setId((long) i);            student.setName("学生" + i);            student.setBirthday(new Date());            TextMessage message = session.createTextMessage(CommonUtils.serialize(student));            // 发送消息到目的地方            producer.send(message);            System.out.println(String.format("发送消息:%d-%s-%s", student.getId(), student.getName(),                    DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));        }        connection.close();    }}

消息消费者1:

package at.flying.activemq.ptp;import at.flying.domain.Student;import com.github.flyinghe.tools.CommonUtils;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.commons.lang3.time.DateFormatUtils;import javax.jms.*;/** * PTP方式接收消息 */public class ActiveMQConsumer1 {    public static void main(String[] args) throws Exception {        // 构造ConnectionFactory实例对象        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");        // 从工厂获取连接对象        Connection connection = connectionFactory.createConnection();        // 启动        connection.start();        // 获取操作连接,一个发送或接收消息的线程        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);        // 获取消息目的地,消息发送给谁        Destination destination = session.createQueue("test-queue");        // 消费者,消息接收者        MessageConsumer consumer = session.createConsumer(destination);        consumer.setMessageListener(new MessageListener() {            @Override            public void onMessage(Message message) {                try {                    if (null != message) {                        Student student = CommonUtils.deserialize(((TextMessage) message).getText());                        System.out.println(                                String.format("ActiveMQConsumer1-接受消息:%d-%s-%s", student.getId(), student.getName(),                                        DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));                    }                } catch (JMSException e) {                }            }        });        System.in.read();        connection.close();    }}

消息消费者2:

package at.flying.activemq.ptp;import at.flying.domain.Student;import com.github.flyinghe.tools.CommonUtils;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.commons.lang3.time.DateFormatUtils;import javax.jms.*;/** * PTP方式接收消息 */public class ActiveMQConsumer2 {    public static void main(String[] args) throws Exception {        // 构造ConnectionFactory实例对象        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");        // 从工厂获取连接对象        Connection connection = connectionFactory.createConnection();        // 启动        connection.start();        // 获取操作连接,一个发送或接收消息的线程        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);        // 获取消息目的地,消息发送给谁        Destination destination = session.createQueue("test-queue");        // 消费者,消息接收者        MessageConsumer consumer = session.createConsumer(destination);        consumer.setMessageListener(new MessageListener() {            @Override            public void onMessage(Message message) {                try {                    if (null != message) {                        Student student = CommonUtils.deserialize(((TextMessage) message).getText());                        System.out.println(                                String.format("ActiveMQConsumer2-接受消息:%d-%s-%s", student.getId(), student.getName(),                                        DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));                    }                } catch (JMSException e) {                }            }        });        System.in.read();        connection.close();    }}

先启动两个消息消费者,再启动消息生产者,控制台输出信息如下:

消息生产者:

3805fc9e1c8455841c63995a21e22fdfdea.jpg

消息消费者1:

f659871274d560c2c019b2c4823355e2d79.jpg

消息消费者2:

9820b87f04d2f9b830207ee88532c127a11.jpg

这个结果使我们很容易理解PTP的消息传递方式。

采用Pub/Sub方式传递消息:

消息生产者:

package at.flying.activemq.pubsub;import at.flying.domain.Student;import com.github.flyinghe.tools.CommonUtils;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.commons.lang3.time.DateFormatUtils;import javax.jms.*;import java.util.Date;/** * Pub/Sub方式传递消息 */public class ActiveMQProducer {    public static void main(String[] args) throws Exception {        // 构造ConnectionFactory实例对象        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");        // 从工厂获取连接对象        Connection connection = connectionFactory.createConnection();        // 启动        connection.start();        // 获取操作连接,一个发送或接收消息的线程        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);        // 获取消息目的地,消息发送给谁        Destination destination = session.createTopic("test-topic");        // 获取消息生产者        MessageProducer producer = session.createProducer(destination);        // 设置不持久化,此处学习,实际根据项目决定        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);        // 构造消息        for (int i = 1; i <= 4; i++) {            Student student = new Student();            student.setId((long) i);            student.setName("学生" + i);            student.setBirthday(new Date());            TextMessage message = session.createTextMessage(CommonUtils.serialize(student));            // 发送消息到目的地方            producer.send(message);            System.out.println(String.format("发送消息:%d-%s-%s", student.getId(), student.getName(),                    DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));        }        connection.close();    }}

消息消费者1:

package at.flying.activemq.pubsub;import at.flying.domain.Student;import com.github.flyinghe.tools.CommonUtils;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.commons.lang3.time.DateFormatUtils;import javax.jms.*;/** * Pub/Sub方式接收消息 */public class ActiveMQConsumer1 {    public static void main(String[] args) throws Exception {        // 构造ConnectionFactory实例对象        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");        // 从工厂获取连接对象        Connection connection = connectionFactory.createConnection();        // 启动        connection.start();        // 获取操作连接,一个发送或接收消息的线程        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);        // 获取消息目的地,消息发送给谁        Destination destination = session.createTopic("test-topic");        // 消费者,消息接收者        MessageConsumer consumer = session.createConsumer(destination);        consumer.setMessageListener(new MessageListener() {            @Override            public void onMessage(Message message) {                try {                    if (null != message) {                        Student student = CommonUtils.deserialize(((TextMessage) message).getText());                        System.out.println(                                String.format("ActiveMQConsumer1-接受消息:%d-%s-%s", student.getId(), student.getName(),                                        DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));                    }                } catch (JMSException e) {                }            }        });        System.in.read();        connection.close();    }}

消息消费者2:

package at.flying.activemq.pubsub;import at.flying.domain.Student;import com.github.flyinghe.tools.CommonUtils;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.commons.lang3.time.DateFormatUtils;import javax.jms.*;/** * Pub/Sub方式接收消息 */public class ActiveMQConsumer2 {    public static void main(String[] args) throws Exception {        // 构造ConnectionFactory实例对象        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");        // 从工厂获取连接对象        Connection connection = connectionFactory.createConnection();        // 启动        connection.start();        // 获取操作连接,一个发送或接收消息的线程        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);        // 获取消息目的地,消息发送给谁        Destination destination = session.createTopic("test-topic");        // 消费者,消息接收者        MessageConsumer consumer = session.createConsumer(destination);        consumer.setMessageListener(new MessageListener() {            @Override            public void onMessage(Message message) {                try {                    if (null != message) {                        Student student = CommonUtils.deserialize(((TextMessage) message).getText());                        System.out.println(                                String.format("ActiveMQConsumer2-接受消息:%d-%s-%s", student.getId(), student.getName(),                                        DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));                    }                } catch (JMSException e) {                }            }        });        System.in.read();        connection.close();    }}

先启动两个消息消费者,再启动消息生产者,控制台输出信息如下:

消息生产者:

51fb3d3ae4301e34e50e9b23ad5b872f4f5.jpg

消息消费者1:

37f5ff1f3ec76fea58565c9904fc92c1e5c.jpg

消息消费者2:

5cef322f01a022d26d99c44ac6ddbf6387a.jpg

这个结果使我们很容易理解Pub/Sub的消息传递方式。

总结:

从以上代码可以看出PTP与Pub/Sub方式的消息传递,只是在创建消息目的地的时候不一样:

PTP方式创建的消息目的地是Queue(队列),Pub/Sub方式创建的消息目的地是Topic(主题)。

问题2-ActiveMQ如何与Spring的整合(XML配置):

ActiveMQ与Spring整合时并不需要额外依赖类似xxx-spring.jar的jar包,因为在activemq-all包中已经包含了这些依赖。

类似于其他框架诸如Quartz定时等框架与Spring整合一样,需要配置xml并在applicationContext.xml总配置文件中引入ActiveMQ的配置文件。

ActiveMQ的配置文件如下:

消息监听器定义如下:

package at.flying.activemq.listener;import at.flying.domain.Student;import com.github.flyinghe.tools.CommonUtils;import org.apache.commons.lang3.time.DateFormatUtils;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;/** * 消息监听器(消费者) */public class IMessageListener implements MessageListener {    @Override    public void onMessage(Message message) {        if (message instanceof TextMessage) {            TextMessage textMessage = (TextMessage) message;            try {                Student student = CommonUtils.deserialize(textMessage.getText());                System.out.println(                        String.format("%sListener-接受消息:%d-%s-%s",                                message.getJMSDestination().toString().toLowerCase().startsWith("topic") ? "Topic" :                                        "Queue",                                student.getId(), student.getName(),                                DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));            } catch (JMSException e) {                e.printStackTrace();            }        }    }}

在applicationContext.xml总配置文件中引入ActiveMQ的配置文件:

c3aeef12c77e8a34ceb126cfc1e2bf28b52.jpg

至此,配置文件配置完毕。

为测试ActiveMQ在Web应用中的使用,我们需要写一个页面与一个Controller来做测试。

准备一个JSP页面(其实随便啥页面都行):

<%@ page language = "java" import = "java.util.*" pageEncoding = "UTF-8" %><%@taglib prefix = "c" uri = "http://java.sun.com/jsp/jstl/core" %><%    String path = request.getContextPath();    String basePath =            request.getScheme() + "://" + request.getServerName() + ":" + request.getServerPort() + path + "/";%>        
Activemq 学习

Activemq 学习

测试PTP方式传递消息

sid:
name:

测试Pub/Sub方式传递消息

sid:
name:

准备一个接收请求的Controller:

package at.flying.web.action;import at.flying.domain.Student;import com.github.flyinghe.tools.CommonUtils;import org.apache.commons.lang3.time.DateFormatUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.servlet.ModelAndView;import javax.jms.*;import java.util.Date;@Controller@RequestMapping("activemq")public class ActivemqAction {    @Autowired    @Qualifier("jmsTemplate")    private JmsTemplate jmsTemplate;    @Autowired    @Qualifier("queueDestination")    private Destination queueDestination;    @Autowired    @Qualifier("topicDestination")    private Destination topicDestination;    @RequestMapping(value = "testQueue", method = {RequestMethod.GET, RequestMethod.POST})    public ModelAndView testQueue(            @RequestParam("sid")                    Long sid,            @RequestParam("name")                    String name) {        ModelAndView modelAndView = new ModelAndView();        this.jmsTemplate.send(this.queueDestination, new MessageCreator() {            @Override            public Message createMessage(Session session) throws JMSException {                Student student = new Student();                student.setBirthday(new Date());                student.setId(sid);                student.setName(name);                TextMessage message = session.createTextMessage(CommonUtils.serialize(student));                // 发送消息到目的地方                System.out.println(String.format("发送消息:%d-%s-%s", student.getId(), student.getName(),                        DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));                return message;            }        });        modelAndView.setViewName("redirect:/activemq/start.jsp");        return modelAndView;    }    @RequestMapping(value = "testTopic", method = {RequestMethod.GET, RequestMethod.POST})    public ModelAndView testTopic(            @RequestParam("sid")                    Long sid,            @RequestParam("name")                    String name) {        ModelAndView modelAndView = new ModelAndView();        this.jmsTemplate.send(this.topicDestination, new MessageCreator() {            @Override            public Message createMessage(Session session) throws JMSException {                Student student = new Student();                student.setBirthday(new Date());                student.setId(sid);                student.setName(name);                TextMessage message = session.createTextMessage(CommonUtils.serialize(student));                // 发送消息到目的地方                System.out.println(String.format("发送消息:%d-%s-%s", student.getId(), student.getName(),                        DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));                return message;            }        });        modelAndView.setViewName("redirect:/activemq/start.jsp");        return modelAndView;    }}

测试结果如下:

①PTP方式:

消息发送页面:

50093bc21bfa546393f51285f64ad25f56b.jpg

消息发送控制台:

7d2c4f5341baaba511cfe4680b48e63d71f.jpg

消息接收控制台:

2f0e87cf865ac7347f6960e661a0fd60b57.jpg

 

②Pub/Sub方式:

消息发送页面:

8e4c5bf4b0e85c864cc38205d05f334db75.jpg

消息发送控制台:

a589dc7d474bf3468469c1b7226ddc7b05c.jpg

消息接收控制台:

3a4f4f3b8a5a000c96ca9baa63a7fbe68c7.jpg

 

问题3-在SpringBoot中如何使用ActiveMQ:

首先在pom文件中加入如下依赖:

org.springframework.boot
spring-boot-starter-activemq

application.properties文件中加入如下配置:

#ActiveMQspring.activemq.broker-url=tcp://localhost:61616spring.activemq.user=adminspring.activemq.password=admin#配置消息类型,false则消息模式为PTP,true则消息模式为PUB/SUB,默认值为falsespring.jms.pub-sub-domain=false

新建一个配置类ActiveMQConfig:

package at.flying.springbootproject.config.activemq;import org.apache.activemq.command.ActiveMQQueue;import org.apache.activemq.command.ActiveMQTopic;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import javax.jms.Destination;@Configurationpublic class ActiveMQConfig {    @Bean("test-queue")    public Destination testQueue() {        return new ActiveMQQueue("test-queue");    }    @Bean("test-topic")    public Destination testTopic() {        return new ActiveMQTopic("test-topic");    }}

配置监听器(消息消费者):

package at.flying.springbootproject.config.activemq;import org.springframework.jms.annotation.JmsListener;import org.springframework.stereotype.Component;@Componentpublic class ConsumerListenters {    @JmsListener(destination = "test-queue")    public void testQueue1(String msg) {        System.out.println(String.format("test-queue1:%s", msg));    }    @JmsListener(destination = "test-queue")    public void testQueue2(String msg) {        System.out.println(String.format("test-queue2:%s", msg));    }    @JmsListener(destination = "test-topic")    public void testTopic1(String msg) {        System.out.println(String.format("test-topic1:%s", msg));    }    @JmsListener(destination = "test-topic")    public void testTopic2(String msg) {        System.out.println(String.format("test-topic2:%s", msg));    }}

配置消息生产者:

package at.flying.springbootproject.service;import org.apache.commons.lang3.StringUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.jms.core.JmsMessagingTemplate;import org.springframework.stereotype.Service;import javax.jms.Destination;@Servicepublic class ActiveMQService {    @Autowired    private JmsMessagingTemplate jmsMessagingTemplate;    @Autowired    @Qualifier("test-queue")    private Destination testQueue;    @Autowired    @Qualifier("test-topic")    private Destination testTopic;    public void testQueue(String msg) {        if (StringUtils.isNotBlank(msg)) {            this.jmsMessagingTemplate.convertAndSend(this.testQueue, msg);        }    }    public void testTopic(String msg) {        if (StringUtils.isNotBlank(msg)) {            this.jmsMessagingTemplate.convertAndSend(this.testTopic, msg);        }    }}

到这里其实已经配置完毕,但是为了测试效果我们还需要一个Controller来接受页面请求然后触发消息的发送:

package at.flying.springbootproject.controller;import at.flying.springbootproject.service.ActiveMQService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.ResponseBody;@Controller@RequestMapping("activemq")public class TestActiveMQController {    @Autowired    private ActiveMQService activeMQService;    @RequestMapping("test-queue")    @ResponseBody    public String test1(            @RequestParam(value = "msg", required = false)                    String msg) {        this.activeMQService.testQueue(msg);        return msg;    }    @RequestMapping("test-topic")    @ResponseBody    public String test2(            @RequestParam(value = "msg", required = false)                    String msg) {        this.activeMQService.testTopic(msg);        return msg;    }}

然后我们打开浏览器请求test-queue:

7fa541bc0365b0354ec629c4b43ebfedeac.jpg

连续请求4次,控制台输出如下:

993766914aa9c53f4005c38d369a43f27d7.jpg

输出了四条消息,并且是两个消费者轮流消费。

现在我们来测试test-topic:

注意:

此时我们需要把application.properties里的spring.jms.pub-sub-domain属性改为true,因为true值才代表消息模式为PUB/SUB,若不更改不会报错,但是发送Topic消息时消息消费者不会消费该消息,也就是没有触发Topic消息监听器。

我们打开浏览器请求test-topic:

8e46561cd3c6c38edcb9d90d01e78e84ef1.jpg

连续请求2次,控制台输出如下:

bdfe3a6bc4a0d14b65608d8e12bba99957b.jpg

输出了四条消息,同一消息两个Topic消费者均消费了。

转载于:https://my.oschina.net/u/2608182/blog/3048620

你可能感兴趣的文章
VC++ 监视文件(夹)
查看>>
【转】keyCode对照表及JS监听组合按键
查看>>
[Java开发之路](14)反射机制
查看>>
mac gentoo-prefix安装git svn
查看>>
浅尝异步IO
查看>>
C - Train Problem II——(HDU 1023 Catalan 数)
查看>>
Speak loudly
查看>>
iOS-在项目中引入RSA算法
查看>>
[译] 听说你想学 React.js ?
查看>>
gulp压缩合并js与css
查看>>
块级、内联、内联块级
查看>>
Predicate
查看>>
[面试题记录01]实现一个function sum达到一下目的
查看>>
这个季节的忧伤,点到为止
查看>>
mysql通过配置文件进行优化
查看>>
省级网站群建设关注点
查看>>
工作第四天之采集资源
查看>>
我的友情链接
查看>>
H3CS-WLAN、H3CSE-Security认证考试
查看>>
5.0中redis-cli的集群管理测试
查看>>