最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

RabbitMQ 实现消息队列延迟

来源:博客园


(资料图片仅供参考)

1.概述

要实现RabbitMQ的消息队列延迟功能,一般采用官方提供的 rabbitmq_delayed_message_exchange插件。但RabbitMQ版本必须是3.5.8以上才支持该插件,否则得用其死信队列功能。

2.安装RabbitMQ延迟插件

  • 检查插件使用rabbitmq-plugins list命令用于查看RabbitMQ安装的插件。
rabbitmq-plugins list

检查RabbitMQ插件安装情况

  • 下载插件

如果没有安装插件,则直接访问官网进行下载

https://www.rabbitmq.com/community-plugins.html
  • 安装插件

下载后,将其拷贝到RabbitMQ安装目录的plugins目录;并进行解压,如:

E:\software\RabbitMQ Server\rabbitmq_server-3.11.13\plugins

打开cmd命令行窗口,如果系统已经配置RabbitMQ环境变量,则直接执行以下的命令进行安装;否则需要进入到RabbitMQ安装目录的sbin目录。

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3.实现RabbitMQ消息队列延迟功能

  • pom.xml配置信息文件中,添加相关依赖文件
4.0.0com.oliverabbitmq-spring-demo0.0.1-SNAPSHOTorg.springframework.bootspring-boot-starter-parent2.7.7org.springframework.bootspring-boot-starter-amqporg.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-testtest    org.eclipse.paho    org.eclipse.paho.client.mqttv3    1.2.5org.apache.maven.pluginsmaven-compiler-plugin1.81.8
  • application.yml配置文件中配置RabbitMQ信息
server:  port: 8080spring:  #给项目来个名字  application:    name: rabbitmq-spring-demo  #配置rabbitMq 服务器  rabbitmq:    host: 127.0.0.1    port: 5672    username: admin    password: admin123    #虚拟host。可以不设置,使用server默认host;不同虚拟路径下的队列是隔离的    virtual-host: /
  • RabbitMQ配置类
package com.olive.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.CustomExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;/** * RabbitMQ配置类 **/@Configurationpublic class RabbitMqConfig {public static final String DELAY_EXCHANGE_NAME = "delayed_exchange";public static final String DELAY_QUEUE_NAME = "delay_queue_name";public static final String DELAY_ROUTING_KEY = "delay_routing_key";@Beanpublic CustomExchange delayExchange() {Map args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);}@Beanpublic Queue queue() {Queue queue = new Queue(DELAY_QUEUE_NAME, true);return queue;}@Beanpublic Binding binding(Queue queue, CustomExchange delayExchange) {return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();}}
  • 发送消息

实现消息发送,设置消息延迟5s。

package com.olive.service;import java.text.SimpleDateFormat;import java.util.Date;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import com.olive.config.RabbitMqConfig;/** * 消息发送者 **/@Servicepublic class CustomMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("消息发送时间:" + sdf.format(new Date()));rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE_NAME, RabbitMqConfig.DELAY_ROUTING_KEY, msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 消息延迟5秒message.getMessageProperties().setHeader("x-delay", 5000);return message;}});}}
  • 接收消息
package com.olive.service;import java.text.SimpleDateFormat;import java.util.Date;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.olive.config.RabbitMqConfig;/** * 消息接收者 **/@Componentpublic class CustomMessageReceiver {@RabbitListener(queues = RabbitMqConfig.DELAY_QUEUE_NAME)public void receive(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println(sdf.format(new Date()) + msg);System.out.println("Receiver:执行取消订单");}}
  • 测试验证
package com.olive.controller;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import com.olive.service.CustomMessageSender;@RestControllerpublic class DelayMessageController {@Autowiredprivate CustomMessageSender customMessageSender;@GetMapping("/sendMessage")public String sendMessage() {// 发送消息customMessageSender.sendMsg("你已经支付超时,取消订单通知!");return "success";}}

发送消息,访问

http://127.0.0.1:8080/sendMessage

查看控制台打印的信息

关键词: