# RabbitMQ 消息队列

# 概述

  • RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。允许应用 程序之间通过在消息中发送数据进行通信。可以实现应用程序的解耦、异步通信、流量削峰等操作,并附带了一个易于使用的可视化管理工具, 帮助监控消息代理的每一个环节。

  • RabbitMQ是一个开源的,在AMQP(高级消息队列协议)基础上完整的,可复原的的企业消息系统。

  • 支持主流的操作系统,Linux、Windows、MacOX等。

  • 多种开发语言支持,Java、Python、Ruby、.Net、PHP、C/C++、node.js。

  • 官网:https://www.rabbitmq.com/ (opens new window)

rabbit-summary.png

关键字说明

关键字 说明
Server(broker) 接受客户端连接,实现AMQP消息队列和路由功能的进程,消息服务器。
Virtual Host 虚拟主机,权限控制的最小粒度,一个Virtual Host里面可以有若干个Exchange和Queue
Exchange 交换机,接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为。
Message Queue 消息队列,用于存储还未被消费者消费的消息。
Message 由Header和Body组成,Header是由生产者添加的各种属性的集合。而Body是真正需要传输的APP数据。
Binding Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。
Connection 连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。
Channel 信道,仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。
Command AMQP的命令,客户端通过Command完成与AMQP服务器的交互来实现自身的逻辑。

如果业务中使用到了RabbitMQ的延时队列功能,请安装rabbitmq_delayed_message插件

# 配置

# 引入依赖

<dependency>
    <groupId>com.mediway.hos</groupId>
    <artifactId>hos-framework-mq-starter</artifactId>
</dependency>

# 配置项

# yml配置文件

使用RabbitMQ需要配置消息服务器相关信息

spring:
  rabbitmq:
    # RabittMQ服务器地址、端口号、用户名、密码需要根据自己的配置自行配置
    host: 127.0.0.1
    port: 5673
    username: guest
    password: guest
    # 配置手动执行消息ack确认---消息消费方配置,生产方不需要配置
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual

# 发送消息时,消息失败允许重发最大重发次数----消息生产方配置,消费方不需要配置
# 不配做时,默认是 3 
framework:
  rabbit:
    retry:
      count: 3

# 消息生产方创建表

作为消息的生产方时,需要创建消息落库表

rabbitmq-sql.png

# 消费方队列与绑定关系配置

作为消息的消费方时,需要进行消息队列、队列与交换机绑定关系的配置,实现 RoutingKeyConfigurer接口,并根据需要自行重写相应方法

public interface RoutingKeyConfigurer {

    /**
     * 延时delay消息配置
     */
    List<RoutingKeyDelayConfigure> delayRoutingKeyConfig()
    /**
     * direct消息配置
     */
    List<RoutingKeyDirectConfigure> directRoutingKeyConfig()
    /**
     * topic消息配置
     */
    List<RoutingKeyTopicConfigure> topicRoutingKeyConfig()
    /**
     * fanout消息配置
     */
    List<RoutingKeyFanoutConfigure> fanoutRoutingKeyConfig()
    /**
     * header消息配置
     */
    List<RoutingKeyHeaderConfigure> headerRoutingKeyConfig()
}

各交换机配置项关键字说明

  • routingKey 消息的路由,唯一确定
  • queueName 消息队列名称,唯一性
  • deadLetterConfigure 死信消息配置,即接收消息过程中业务处理产生非业务性意外错误时,消息将要转发的队列

queueName 死信消息队列名,唯一性

routingKey 死信消息的路由,唯一确定

# 消费方消息接收配置

作为消息的消费方时,需要进行消息接收的处理配置,实现 RabbitMessageListenerService 接口,并设置需要接收 消息的泛型,子类必须加上@ReceiveMessageListener,并设置队列名

/**
 * 统一消息接收接口
 * 子类使用 {@link @ReceiveMessageListener} 监听,并设置监听路由
 */
public interface RabbitMessageListenerService<T> {
    /**
     * 消息处理方法
     *
     * @param message    接收到的消息
     * @param messageProperties  接收到消息的Id
     * @param channel 消息路由
     */
    void messageHandler(MessageObject message, MessageProperties messageProperties, Channel channel) throws Exception;
}
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface ReceiveMessageListener {
    /**
     * 消息队列名
     */
    String queueName() default "";
    /**
     * 是否手动确认
     */
    boolean manualAck() default false;
}

# 使用示例

根据RabbitMQ交换机的不同,区分不同的使用情况

# 消息生产方使用

  • 引入maven依赖、引入配置项、配置消息落库表
  • 然后使用消息发送工具 MessageSendTool 执行消息发送,如:执行direct消息发送
  • 消息发送时,可以根据具体业务逻辑,使用不同的消息类型 MessageTypeEnum

# MessageSendTool

消息发送工具类,构建对应的消息发送参数,执行不同发送逻辑

    /**
     * 普通消息发送--消息只发送一次
     */
    public void sendMessage(MessageDetail messageDetail)

    /**
     * 消息发送---允许消息重发
     */
    public void sendRetryMessage(MessageDetail messageDetail)

    /**
     * 延时消息发送---非重试消息发送--消息只发送一次
     */
    public void sendDelayMessage(DelayMessageDetail messageDetail)

    /**
     * 延时消息发送---允许消息重发
     */
    public void sendDelayRetryMessage(DelayMessageDetail messageDetail)

# MessageTypeEnum

消息发送类型,允许生产方执行的消息发送类型

public enum MessageTypeEnum {
    DIRECT_MESSAGE(0, "direct消息类型"),
    TOPIC_MESSAGE(1, "topic消息类型"),
    FANOUT_MESSAGE(2, "fanout消息类型"),
    HEADER_MESSAGE(3, "header消息类型"),
    DELAY_MESSAGE(4, "delay延时消息类型")
}

# 消息生产方代码示例

@RestController
@RequestMapping("rabbitmq/test")
@Api(tags = "RabbitMQ消息发送-普通消息")
public class RabbitPublisherController {

    @Autowired
    private MessageSendTool messageSendTool;

    @ApiOperation(value = "direct消息发送")
    @OperLog(title = "RabbitMQ模块", content = "direct消息发送示例")
    @GetMapping("/direct")
    public String directMessageSend(@RequestBody Object object) {
        // 1、普通消息发送
        try {
            // 消息发送方的消息体
            MessageDetail messageDetail = new MessageDetail();
            // 使用direct消息类型:MessageTypeEnum.DIRECT_MESSAGE
            messageDetail.setRoutingKey("test.routing.key.111");
            messageDetail.setMessage(object);

            // 执行消息发送
            messageSendTool.sendMessage(messageDetail); 
            return "OK";
        } catch (Exception e) {
            e.printStackTrace();
            return "direct交换机消息发送 error:" + e.getMessage();
        }
        // 2、延时消息发送
         try {
            // 消息发送方的消息体
            DelayMessageDetail delayMessageDetail = new DelayMessageDetail();
            // 使用延时消息类型:MessageTypeEnum.DELAY_MESSAGE
            delayMessageDetail.setMessageType(MessageTypeEnum.DELAY_MESSAGE);
            delayMessageDetail.setRoutingKey("test.deleay.routing.key.111");
            delayMessageDetail.setMessage(object);
            // 延时 5s 后才允许消费方接收消息
            delayMessageDetail.setDelayedTime(5000);

            // 执行延时消息发送
            messageSendTool.sendDelayMessage(delayMessageDetail); 
            return "OK";
        } catch (Exception e) {
            e.printStackTrace();
            return "延时消息发送 error:" + e.getMessage();
        }
        // 3、允许普通消息重复发送
        try {
            // 消息发送方的消息体
            MessageDetail messageDetail = new MessageDetail();
            // 使用direct消息类型:MessageTypeEnum.DIRECT_MESSAGE
            messageDetail.setMessageType(MessageTypeEnum.DIRECT_MESSAGE);
            messageDetail.setRoutingKey("test.retry.routing.key.111");
            messageDetail.setMessage(object);

            // 允许消息重发
            messageSendTool.sendRetryMessage(messageDetail); 
            return "OK";
        } catch (Exception e) {
            e.printStackTrace();
            return "direct交换机允许消息重发 error:" + e.getMessage();
        }
        // 4、允许延时消息重复发送
        try {
            // 消息发送方的消息体
           DelayMessageDetail delayMessageDetail = new DelayMessageDetail();
           // 使用延时消息类型:MessageTypeEnum.DELAY_MESSAGE
           delayMessageDetail.setMessageType(MessageTypeEnum.DELAY_MESSAGE);
           delayMessageDetail.setRoutingKey("test.deleay.routing.key.111");
           delayMessageDetail.setMessage(object);
           // 延时 5s 后才允许消费方接收消息
           delayMessageDetail.setDelayedTime(5000);

            // 执行允许延时消息重发
            messageSendTool.sendDelayRetryMessage(delayMessageDetail); 
            return "OK";
        } catch (Exception e) {
            e.printStackTrace();
            return "延时允许消息重发 error:" + e.getMessage();
        }
    }
}

# 消息消费方使用

消息接收类型分别有 direct、topic、fanout、header、delay消息类型 五种不同的类型,下面分别介绍说明.

# Direct消息类型接收

  • direct消息接收时,必须routingKey匹配相等,才可以接收到,可自行查询详细的路由规则
# 接收消息配置

消息配置时,需要与消息生产方沟通确定routingKey;queueName用于对应routingKey消息的接收

@Service
public class ConsumerRoutingKeyConfigurer implements RoutingKeyConfigurer {

    @Override
    public List<RoutingKeyDirectConfigure> directRoutingKeyConfig() {
        // 消息配置集合
        List<RoutingKeyDirectConfigure> routingConfigureList = new ArrayList<>();

        // xiaohong队列
        RoutingKeyDirectConfigure configureXiaoHong = new RoutingKeyDirectConfigure();
        configureXiaoHong.setRoutingKey("test.user.1111");
        configureXiaoHong.setQueueName("test_direct_queue_xiaohong");

        // xiaoming队列
        RoutingKeyDirectConfigure configureXiaoMing = new RoutingKeyDirectConfigure();
        configureXiaoMing.setRoutingKey("test.user.2222");
        configureXiaoMing.setQueueName("test_direct_queue_xiaoming");

        // direct消息路由添加结果
        routingConfigureList.add(configureXiaoHong);
        routingConfigureList.add(configureXiaoMing);

        return routingConfigureList;
    }
}
# 接收消息实现
  • 实现test_direct_queue_xiaohong 队列消息接收
@Slf4j
@Service
@ReceiveMessageListener(queueName = "test_direct_queue_xiaohong")
public class XiaoHongListenerService implements RabbitMessageListenerService<MessageObject> {

    @Override
    public void messageHandler(MessageObject message, MessageProperties messageProperties, Channel channel) {
        // 根据接收到的消息处理相关业务逻辑 key:  test.user.1111
        log.info("XiaoHongListenerService.接收到消息messageI:" + messageProperties.getMessageId());
        log.info("XiaoHongListenerService.接收到消息routingKey:" + messageProperties.getReceivedRoutingKey());
        log.info("XiaoHongListenerService.接收到消息message:" + message.toString());
    }

}

ReceiveMessageListener如果设置manualAck=true,需消费者手动确认消息。

  • 实现 test_direct_queue_xiaoming 队列消息接收
@Slf4j
@Service
@ReceiveMessageListener(queueName = "test_direct_queue_xiaoming")
public class XiaoMingListenerService implements RabbitMessageListenerService<MessageObject> {

    @Override
    public void messageHandler(MessageObject message, MessageProperties messageProperties, Channel channel) {
        // 根据接收到的消息处理相关业务逻辑 key:  test.user.2222
        log.info("XiaoMingListenerService.接收到消息messageI:" + messageProperties.getMessageId());
        log.info("XiaoMingListenerService.接收到消息routingKey:" + messageProperties.getReceivedRoutingKey());
        log.info("XiaoMingListenerService.接收到消息message:" + message.toString());
    }

}

# Topic消息类型接收

  • topic消息接收时,routingKey类似模糊匹配,举例如下

通过使用“*”和“#”通配符进行处理,“.”将路由键分为了几个标识符,“*”匹配 1 个,“#”匹配一个或多个

生产方routingKey 消费方routingKey 是否可以匹配
test.topic.blog.java test.topic.blog.java
test.topic.blog.java test.topic.blog.*
test.topic.blog.java test.topic.*
test.topic.blog.java test.topic.blog.#
test.topic.blog.java test.topic.#
test.topic.blog.java test.topic.blog.jQuery

# 接收消息配置

消息配置时,需要与消息生产方沟通确定routingKey;queueName用于对应routingKey消息的接收

@Service
public class ConsumerRoutingKeyConfigurer implements RoutingKeyConfigurer {

 @Override
    public List<RoutingKeyTopicConfigure> topicRoutingKeyConfig() {
        // 消息配置集合
        List<RoutingKeyTopicConfigure> routingConfigureList = new ArrayList<>();

        RoutingKeyTopicConfigure testTopicBlogJQuery = new RoutingKeyTopicConfigure();
        testTopicBlogJQuery.setQueueName("test_queue_topic_blog_jquery");
        testTopicBlogJQuery.setRoutingKey("test.topic.blog.jQuery");
        // 设置死信队列
        // xiaozhang队列设置死信队列
        RoutingKeyDeadLetterConfigure deadLetterConfigure = new RoutingKeyDeadLetterConfigure();
        deadLetterConfigure.setQueueName("test_queue_topic_blog_jquery_sixin");
        deadLetterConfigure.setRoutingKey("test.topic.blog.jQuery.sixin");
        testTopicBlogJQuery.setDeadLetterConfigure(deadLetterConfigure);


        // Topic消息类型
        RoutingKeyTopicConfigure testTopicBlogJava = new RoutingKeyTopicConfigure();
        testTopicBlogJava.setQueueName("test_queue_topic_blog_java");
        testTopicBlogJava.setRoutingKey("test.topic.blog.java");

        RoutingKeyTopicConfigure testTopicBlog = new RoutingKeyTopicConfigure();
        testTopicBlog.setQueueName("test_queue_topic_blog");
        testTopicBlog.setRoutingKey("test.topic.blog.*");

        RoutingKeyTopicConfigure testTopic = new RoutingKeyTopicConfigure();
        testTopic.setQueueName("test_queue_topic_blog");
        testTopic.setRoutingKey("test.topic.#");

        // topic消息路由添加结果
        routingConfigureList.add(testTopicBlogJQuery);
        routingConfigureList.add(testTopicBlogJava);
        routingConfigureList.add(testTopicBlog);
        routingConfigureList.add(testTopic);

        return routingConfigureList;
    }
}

# 接收消息实现
  • 实现 test_queue_topic_blog_jquery 队列接收消息
@Slf4j
@Service
@ReceiveMessageListener(queueName = "test_queue_topic_blog_jquery")
public class TopicBlogJQueryListenerService implements RabbitMessageListenerService<MessageObject> {
    // 此队列的 routingKey是 “test.topic.blog.jQuery” ,
    // 只能接收生产方发送的routingKey = “test.topic.blog.jQuery” 的消息
    @Override
    public void messageHandler(MessageObject message, MessageProperties messageProperties, Channel channel) throws Exception {
        // 根据接收到的消息处理相关业务逻辑  key:test.topic.blog.jQuery
        log.info("TopicBlogJQueryListenerService.接收到消息messageI:" + messageProperties.getMessageId());
        log.info("TopicBlogJQueryListenerService.接收到消息routingKey:" + messageProperties.getReceivedRoutingKey());
        log.info("TopicBlogJQueryListenerService.接收到消息message:" + message.toString());

        if (message != null) {
            throw new RuntimeException("主动抛出异常,JQuery转发到死信队列");
        }
    }
}

此队列接收消息,主动抛出异常,此消息会转发到此队列配置的 死信队列 中

  • 实现 test_queue_topic_blog_jquery_sixin 队列接收消息

此死信队列,不考虑生产方,是由 test_queue_topic_blog_jquery 队列转发而来

@Slf4j
@Service
@ReceiveMessageListener(queueName = ConsumerQueueConstants.test_topic_queue_blog_jquery_sixin)
public class TopicBlogJQuerySiXinListenerService implements RabbitMessageListenerService<MessageObject> {
    @Override
    public void messageHandler(MessageObject message, MessageProperties messageProperties, Channel channel) throws Exception {
        // 根据接收到的消息处理相关业务逻辑
        log.info("TopicBlogJQuerySiXinListenerService.接收到消息messageI:" + messageProperties.getMessageId());
        log.info("TopicBlogJQuerySiXinListenerService.接收到消息routingKey:" + messageProperties.getReceivedRoutingKey());
        log.info("TopicBlogJQuerySiXinListenerService.接收到消息message:" + message.toString());
    }
}
  • 实现 test_queue_topic_blog_java 队列接收消息
@Slf4j
@Service
@ReceiveMessageListener(queueName = "test_queue_topic_blog_java")
public class TopicBlogJavaListenerService implements RabbitMessageListenerService<MessageObject> {
    // 此队列的 routingKey是 “test.topic.blog.java” ,
    // 只能接收生产方发送的routingKey = “test.topic.blog.java” 的消息
    @Override
    public void messageHandler(MessageObject message, MessageProperties messageProperties, Channel channel) throws Exception {
        // 根据接收到的消息处理相关业务逻辑  key:test.topic.blog.java
        log.info("TopicBlogJavaListenerService.接收到消息messageI:" + messageProperties.getMessageId());
        log.info("TopicBlogJavaListenerService.接收到消息routingKey:" + messageProperties.getReceivedRoutingKey());
        log.info("TopicBlogJavaListenerService.接收到消息message:" + message.toString());
    }
}
  • 实现 test_queue_topic_blog 队列接收消息
@Slf4j
@Service
@ReceiveMessageListener(queueName = "test_queue_topic_blog")
public class TopicBlogListenerService implements RabbitMessageListenerService<MessageObject> {
    // 此队列的 routingKey是 “test.topic.blog.*” 
    // 可以接收生产方发送的routingKey = {“test.topic.blog.java”, 
    //  “test.topic.blog.jQuery”} 的消息
    @Override
    public void messageHandler(MessageObject message, MessageProperties messageProperties, Channel channel) {
        // 根据接收到的消息处理相关业务逻辑   key:test.topic.blog.*
        log.info("TopicBlogListenerService===接收到消息messageI:" + messageProperties.getMessageId());
        log.info("TopicBlogListenerService===接收到消息routingKey:" + messageProperties.getReceivedRoutingKey());
        log.info("TopicBlogListenerService===接收到消息message:" + message.toString());
    }
}

# Fanout消息类型接收

  • fanout 消息是以广播的形式将消息分发给所有的队列,不考虑routingKey
# 接收消息配置
@Slf4j
@Service
public class ConsumerRoutingKeyConfigurer implements RoutingKeyConfigurer {

    @Override
    public List<RoutingKeyFanoutConfigure> fanoutRoutingKeyConfig() {
        List<RoutingKeyFanoutConfigure> fanoutConfigureList = new ArrayList<>();

        // fanout---北京
        RoutingKeyFanoutConfigure fanoutBeiJing = new RoutingKeyFanoutConfigure();
        fanoutBeiJing.setQueueName("test_queue_fanout_beijing");

        // fanout---深圳
        RoutingKeyFanoutConfigure fanoutShenZhen = new RoutingKeyFanoutConfigure();
        fanoutShenZhen.setQueueName("test_queue_fanout_shenzhen");

        // 集合添加结果
        fanoutConfigureList.add(fanoutBeiJing);
        fanoutConfigureList.add(fanoutShenZhen);

        return fanoutConfigureList;
    }
}
# 接收消息实现
  • 所有fanout消息队列都会接收到 fanout消息
  • 实现 test_queue_fanout_beijing 队列接收消息
@Slf4j
@Service
@ReceiveMessageListener(queueName = "test_queue_fanout_beijing")
public class FanoutBeiJingListenerService implements RabbitMessageListenerService<MessageObject> {

    @Override
    public void messageHandler(MessageObject message, MessageProperties messageProperties, Channel channel) {
        // 根据接收到的消息处理相关业务逻辑
        log.info("FanoutBeiJingListenerService.接收到消息messageI:" + messageProperties.getMessageId());
        log.info("FanoutBeiJingListenerService.接收到消息routingKey:" + messageProperties.getReceivedRoutingKey());
        log.info("FanoutBeiJingListenerService.接收到消息message:" + message.toString());
    }

}
  • 实现 test_queue_fanout_shenzhen 队列接收消息
@Slf4j
@Service
@ReceiveMessageListener(queueName = "test_queue_fanout_shenzhen")
public class FanoutShenZhenListenerService implements RabbitMessageListenerService<MessageObject> {

    @Override
    public void messageHandler(MessageObject message, MessageProperties messageProperties, Channel channel) {
        // 根据接收到的消息处理相关业务逻辑
        log.info("FanoutShenZhenListenerService.接收到消息messageI:" + messageProperties.getMessageId());
        log.info("FanoutShenZhenListenerService.接收到消息routingKey:" + messageProperties.getReceivedRoutingKey());
        log.info("FanoutShenZhenListenerService.接收到消息message:" + message.toString());
    }
}

# Header消息类型接收

  • header消息是以消息头的规则,将消息分发到匹配消息头的队列,不考虑routingKey

生产方消息头 消费方匹配消息头 是否可以匹配
name=小明,id=123 name=小明,id=123 any.match
name=小明,id=123 name=小明,id=123 all.match
name=小明,id=123 name=小明,id=456 all.match
name=小明,id=123 name=小明,id=123 any.exist
name=小明,id=123 name=小明,id=123 all.exist
name=小明,id=123 name=小红,id=123 all.exist
name=小明,id=123 name=小明,any.exist

# 接收消息配置
@Slf4j
@Service
public class ConsumerRoutingKeyConfigurer implements RoutingKeyConfigurer {
    @Override
    public List<RoutingKeyHeaderConfigure> headerRoutingKeyConfig() {
        Map<String, Object> headerMap = new HashMap<String, Object>();
        headerMap.put("name", "小明");
        headerMap.put("id", "123");

        List<RoutingKeyHeaderConfigure> headerConfigureList = new ArrayList<>();

        // header---id、name---any---match
        RoutingKeyHeaderConfigure headerAnyMatch = new RoutingKeyHeaderConfigure();
        headerAnyMatch.setQueueName("test_queue_header_name_or_id_match");
        headerAnyMatch.setWhereAny(true);
        headerAnyMatch.setMatch(true);
        headerAnyMatch.setHeaders(headerMap);

        // header---id、name---all---match
        RoutingKeyHeaderConfigure headerAllMatch = new RoutingKeyHeaderConfigure();
        headerAllMatch.setQueueName("test_queue_header_name_and_id_match");
        headerAllMatch.setWhereAll(true);
        headerAllMatch.setMatch(true);
        headerAllMatch.setHeaders(headerMap);

        // header---id、name---any---exist
        RoutingKeyHeaderConfigure headerAnyExist = new RoutingKeyHeaderConfigure();
        headerAnyExist.setQueueName("test_queue_header_name_or_id_exist");
        headerAnyExist.setWhereAny(true);
        headerAnyExist.setExist(true);
        headerAnyExist.setHeaders(headerMap);

        // 集合添加结果
        headerConfigureList.add(headerAnyMatch);
        headerConfigureList.add(headerAllMatch);
        headerConfigureList.add(headerAnyExist);

        return headerConfigureList;
    }
}

# 接收消息实现
  • 实现 test_queue_header_name_or_id_match 队列接收消息
@Slf4j
@Service
@ReceiveMessageListener(queueName = "test_queue_header_name_or_id_match")
public class HeaderNameOrIdMatchListenService implements RabbitMessageListenerService<MessageObject> {
    // 此队列 匹配规则是 name、id---any.match
    // 此队列 会接收到 生产方发送的 消息头是 name = 小明,id = 123的消息
    @Override
    public void messageHandler(MessageObject message, MessageProperties messageProperties, Channel channel) {
        // 根据接收到的消息处理相关业务逻辑
        log.info("HeaderNameOrIdMatchListenService===接收到消息messageI:" + messageProperties.getMessageId());
        log.info("HeaderNameOrIdMatchListenService===接收到消息routingKey:" + messageProperties.getReceivedRoutingKey());
        log.info("HeaderNameOrIdMatchListenService===接收到消息message:" + message.toString());
    }
}
  • 实现 test_queue_header_name_and_id_match 队列接收消息
@Slf4j
@Service
@ReceiveMessageListener(queueName = "test_queue_header_name_and_id_match")
public class HeaderNameAndIdMatchListenService implements RabbitMessageListenerService<MessageObject> {
    // 此队列 匹配规则是 name、id---all.match
    // 此队列 会接收到 生产方发送的 消息头是 name = 小明,id = 123的消息
    @Override
    public void messageHandler(MessageObject message, MessageProperties messageProperties, Channel channel) {
        // 根据接收到的消息处理相关业务逻辑
        log.info("HeaderNameAndIdMatchListenService===接收到消息messageI:" + messageProperties.getMessageId());
        log.info("HeaderNameAndIdMatchListenService===接收到消息routingKey:" + messageProperties.getReceivedRoutingKey());
        log.info("HeaderNameAndIdMatchListenService===接收到消息message:" + message.toString());
    }
}
  • 实现 test_queue_header_name_or_id_exist 队列接收消息
@Slf4j
@Service
@ReceiveMessageListener(queueName = "test_queue_header_name_or_id_exist")
public class HeaderNameOrIdExistListenService implements RabbitMessageListenerService<MessageObject> {
    // 此队列 匹配规则是 name、id---any.exist
    // 此队列 会接收到 生产方发送的 消息头是 name = 小明,id = 123的消息
    @Override
    public void messageHandler(MessageObject message, MessageProperties messageProperties, Channel channel) {
        // 根据接收到的消息处理相关业务逻辑
        log.info("HeaderNameOrIdExistListenService===接收到消息messageI:" + messageProperties.getMessageId());
        log.info("HeaderNameOrIdExistListenService===接收到消息routingKey:" + messageProperties.getReceivedRoutingKey());
        log.info("HeaderNameOrIdExistListenService===接收到消息message:" + message.toString());
    }
}

# Delay消息类型接收

  • Delay消息接收时,必须routingKey匹配相等,才可以接收到
# 接收消息配置
@Slf4j
@Service
public class ConsumerRoutingKeyConfigurer implements RoutingKeyConfigurer {
  @Override
    public List<RoutingKeyDelayConfigure> delayRoutingKeyConfig() {
        List<RoutingKeyDelayConfigure> delayConfigureList = new ArrayList<>();

        // 延时消息配置
        RoutingKeyDelayConfigure delayConfigure = new RoutingKeyDelayConfigure();
        delayConfigure.setQueueName("test_queue_delay");
        delayConfigure.setRoutingKey("message.delay.11111");

        // 集合添加结果
        delayConfigureList.add(delayConfigure);
        return delayConfigureList;
    }
}
# 接收消息实现
  • 实现 test_queue_delay 队列接收消息
@Slf4j
@Service
@ReceiveMessageListener(queueName = "test_queue_delay")
public class DelayListenerService implements RabbitMessageListenerService<MessageObject> {

    @Override
    public void messageHandler(MessageObject message, MessageProperties messageProperties, Channel channel) {
        // 根据接收到的消息处理相关业务逻辑  key:  message.delay.11111
        log.info("DelayListenerService===接收到消息messageI:" + messageProperties.getMessageId());
        log.info("DelayListenerService===接收到消息routingKey:" + messageProperties.getReceivedRoutingKey());
        log.info("DelayListenerService===接收到消息message:" + message.toString());
    }
}

# 控制台简述

RabbitMQ的详细安装过程,可自行查找相关资料,这里简单介绍控制台各个菜单含义。安装后的控制台界面如下:

rabbitmq-broker.png

# Overview

# Queued message

  • Ready:待消费的消息总数
  • Unacked:待应答的消息总数
  • Total:总数 Ready+Unacked
  • Publish:producter pub消息的速率。

# Message rates

  • Publisher confirm:broker确认pub消息的速率。
  • Deliver(manual ack):customer手动确认的速率。
  • Deliver( auto ack):customer自动确认的速率。
  • Consumer ack:customer正在确认的速率。
  • Redelivered:正在传递'redelivered'标志集的消息的速率。
  • Get (manual ack):响应basic.get而要求确认的消息的传输速率。
  • Get (auto ack):响应于basic.get而发送不需要确认的消息的速率。
  • Return:将basic.return发送给producter的速率。
  • Disk read:queue从磁盘读取消息的速率。
  • Disk write:queue从磁盘写入消息的速率。

# Churn statistics 流失统计

  • Connection operations:连接操作的创建和关闭速率
  • Channel operations:通道操作的创建和关闭速率
  • Queue operations:队列操作的声明、创建、关闭速率
  • Created 创建、Closed 关闭、Declared 声明

# Ports and contexts 端口和Web容器定义

用来记录开放的端口以及协议。

# Export definitions导出定义

定义由用户,虚拟主机,权限,参数,交换,队列和绑定组成。 它们不包括队列的内容或集群名称。 独占队列不会被导出。

# Import definitions导入定义

导入的定义将与当前定义合并。 如果在导入过程中发生错误,则所做的任何更改都不会回滚

# Connections

连接到RabbitMQ服务器上的相关连接属性

  • Name:名称。
  • User name:使用的用户名。
  • State:当前的状态,running:运行中;idle:空闲。
  • SSL/TLS:是否使用ssl进行连接。
  • Protocol:使用的协议。
  • Channels:创建的channel的总数。
  • From client:每秒发出的数据包。
  • to client:每秒收到的数据包。

# Channels

通道

  • channel:名称。
  • User name:使用的用户名。
  • Mode:渠道保证模式。 可以是以下之一,或者不是:C: confirm。T:transactional(事务)。
  • State :当前的状态,running:运行中;idle:空闲。
  • Unconfirmed:待confirm的消息总数。
  • Prefetch:设置的prefetch的个数。
  • Unacker:待ack的消息总数。
  • publish:producter pub消息的速率。
  • confirm:producter confirm消息的速率。
  • deliver/get:consumer 获取消息的速率。
  • ack:consumer ack消息的速率。

# Exchange

交换机

  • Name:名称。
  • Type:交换机类型
  • Features:功能。
  • Message rate in:消息进入的速率。
  • Message rate out:消息出去的速率。

# Queues

队列

  • Name:名称。

  • Type:交换机类型

  • Features:功能。

  • Ready:待消费的消息总数。

  • Unacked:待应答的消息总数。

  • Total:总数 Ready+Unacked。

  • incoming:消息进入的速率。

  • deliver/get:消息获取的速率。

  • ack:消息应答的速率。

# Admin

可以对用户、虚拟主机、策略、限制、集群进行管理

  • User:用户管理。
  • Virtual Host:虚拟主机,可以查询和新增虚拟主机。虚拟主机类似于租户。
  • Policies:策略,可以查询用户策略、添加更新一个策略等。
  • Limits:限制,可以为虚拟机增加一些限制条件,最大连接数及最大队列数,但是默认虚拟机无法添加
  • Cluster:集群,集群的节点管理