springboot整合activeMQ

  消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。消息形式支持点对点和订阅-发布。

ActiveMQ是什么
    1、ActiveMQ是消息队列技术,为解决高并发问题而生
    2、ActiveMQ生产者消费者模型(生产者和消费者可以跨平台、跨系统)
    3、ActiveMQ支持如下两种消息传输方式

    • 点对点模式,生产者生产了一个消息,只能由一个消费者进行消费

    • 发布/订阅模式,生产者生产了一个消息,可以由多个消费者进行消费

1、安装activeMQ

  • 解压
  • 进入解压后的目录运行 ./bin/activemq start
  • 启动后activemq会启动两个端口:
    • 8161是activemq的管理页面,默认的账号密码都是admin
    • 61616是程序连接activemq的通讯地址

2、项目结构

《springboot整合activeMQ》

3、引入依赖

       <!-- activemq依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        
        <!--消息队列连接池-->
        <!-- 使用springboot2.0+及以下版本时候 -->
        <!-- <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.0</version>
        </dependency> -->
        <!-- 使用springboot2.1+时候 -->
        <dependency>
            <groupId>org.messaginghub</groupId>
            <artifactId>pooled-jms</artifactId>
        </dependency>

3、修改application.properties

#服务端口号
server.port=8080
server.servlet.context-path=/activemqsb

#activemq配置
#ActiveMQ通讯地址
spring.activemq.broker-url=tcp://localhost:61616
#用户名
spring.activemq.user=admin
#密码
spring.activemq.password=admin
#是否启用内存模式(就是不安装MQ,项目启动时同时启动一个MQ实例)
spring.activemq.in-memory=false
#信任所有的包
spring.activemq.packages.trust-all=true
#是否替换默认的连接池,使用ActiveMQ的连接池需引入的依赖
spring.activemq.pool.enabled=false

4、配置activeMQ

@Configuration
@EnableJms
public class ActiveMQConfig {
    @Bean
    public Queue queue() {
        return new ActiveMQQueue("springboot.queue") ;
    }

    //springboot默认只配置queue类型消息,如果要使用topic类型的消息,则需要配置该bean
    @Bean
    public JmsListenerContainerFactory jmsTopicListenerContainerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //这里必须设置为true,false则表示是queue类型
        factory.setPubSubDomain(true);
        return factory;
    }

    @Bean
    public Topic topic() {
        return new ActiveMQTopic("springboot.topic") ;
    }
}

5、创建消费者

@Service
//消费者
public class Consumer {
    //接收queue类型消息
    //destination对应配置类中ActiveMQQueue("springboot.queue")设置的名字
    @JmsListener(destination="springboot.queue")
    public void ListenQueue(String msg){
        System.out.println("接收到queue消息:" + msg);
    }

    //接收topic类型消息
    //destination对应配置类中ActiveMQTopic("springboot.topic")设置的名字
    //containerFactory对应配置类中注册JmsListenerContainerFactory的bean名称
    @JmsListener(destination="springboot.topic", containerFactory = "jmsTopicListenerContainerFactory")
    public void ListenTopic(String msg){
        System.out.println("接收到topic消息:" + msg);
    }
}

6、创建生产者

@RestController
//生产者
public class Producer {
    
    @Autowired
    private JmsMessagingTemplate jmsTemplate;

    @Autowired
    private Queue queue;

    @Autowired
    private Topic topic;

    //发送queue类型消息
    @GetMapping("/queue")
    public void sendQueueMsg(String msg){
        jmsTemplate.convertAndSend(queue, msg);
    }

    //发送topic类型消息
    @GetMapping("/topic")
    public void sendTopicMsg(String msg){
        jmsTemplate.convertAndSend(topic, msg);
    }
}

7、启动类

@SpringBootApplication
@EnableJms //启动消息队列
public class ActivemqsbApplication {

    public static void main(String[] args) {
        SpringApplication.run(ActivemqsbApplication.class, args);
    }

}

8、启动程序测试

  • 浏览器中输入http://localhost:8080/activemqsb/queue?msg=hello

    控制台:接收到queue消息:hello

  • 浏览器中输入http://localhost:8080/activemqsb/topic?msg=hello
    控制台:接收到topic消息:hello

参考链接:

    原文作者:QianTLL
    原文地址:
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。