百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分析 > 正文

RabbitMQ 延迟消息实战(rabbitmq如何保证消息不被重复消费)

liebian365 2025-03-30 18:15 24 浏览 0 评论

现实生活中有一些场景需要延迟或在特定时间发送消息,例如智能热水器需要 30 分钟后打开,未支付的订单或发送短信、电子邮件和推送通知下午 2:00 开始的促销活动。

RabbitMQ 本身没有直接支持延迟队列的功能,如果您搜索“如何在 RabbitMQ 中使用延迟消息”,您很可能会遇到两种可能的解决方案。第一种解决方案是使用消息 TTL 功能和死信功能的组合。第二种选择是使用官方的 RabbitMQ 延迟消息插件。

什么是 RabbitMQ?

RabbitMQ 是一个开源消息代理(也称为面向消息的中间件),创建它是为了支持高级消息队列协议 (Advanced Message Queuing Protocol, AMQP)。此后,它通过插件架构进行了扩展,以支持简单(或流式)面向文本的消息协议 (Text Oriented Message Protocol, STOMP)、消息查询遥测传输 (Message Query Telemetry Transport, MQTT) 等协议。

对于集群和故障转移,RabbitMQ 服务器是用 Erlang 编写的,并采用了开放电信平台框架。用于与代理交互的客户端库可用于所有主要编程语言,源代码可在 Mozilla 公共许可证下获得。

简单来说,RabbitMQ是一个消息传递系统,可以在本地或云端使用。并且支持多种消息传递协议。

RabbitMQ 的主要特性

以下是 RabbitMQ 的一些特性:

  • 集群:RabbitMQ 中的集群在设计时考虑了两个目标。如果一个节点发生故障,事件的消费者和生产者可以继续运行,同时添加其他节点以横向扩展消息传递吞吐量。
  • 轻松路由:消息通过交换器然后到达队列,这提供了灵活的路由方式。 对于更复杂的路由,用户可以将交换器连接在一起或将他们的交换器类型开发为插件。
  • 可靠性:持久性、交付反馈、发布确认和高可用性是 RabbitMQ 对性能有直接影响的关键特性。
  • 安全性:客户端证书检查和仅 SSL 通信可以帮助保护客户端连接。虚拟主机可以调节用户访问,确保高级消息隔离。

在 RabbitMQ 中启用延迟消息

很长一段时间以来,人们一直在寻找使用 RabbitMQ 实现延迟消息传递的方法。 迄今为止,公认的解决方案是使用消息的组合——TTL 和死信交换器。

RabbitMQ 延迟消息插件向 RabbitMQ 添加了一种新的交换类型,如果用户愿意,允许延迟通过该交换路由的消息。 让我们看看如何使用这两种方法。

  • 使用 TTL 和 DLX 延迟消息传递
  • RabbitMQ 延迟消息插件

使用 TTL 和 DLX 延迟消息传递

通过组合这些功能,我们可以将消息发布到队列,该消息将在 TTL 后过期,然后它被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为 死信队列。

下面创建一个队列,为其设置 TTL 和 DLX 等:

// 创建两个交换器,一个为正常的交换器exchange.normal,另一个为死信交换器exchange.dlx
channel.exchangeDeclare("exchange.dlx", "direct", true);
channel.exchangeDeclare("exchange.normal", "fanout", true);


//创建一个队列queue.normal,并绑定到exchange.normal
Map args = new HashMap <>();
//设置队列中消息的过期时间
args.put("x-message-ttl", 10000);
//当queue.normal中的消息过期时,将发送到exchange.dlx
args.put("x-dead-letter-exchange", "exchange.dlx");
//也可以为这个 DLX 指定路由键,如果没有特殊指定,则使用原队列的路由键
args.put("x-dead-letter-routing-key", "routingkey");

channel.queueDeclare("queue.normal", true, false, false, args);
channel.queueBind("queue.normal", "exchange.normal", "");


//创建死信队列queue.dlx,并当到死信交换器exchange.dlx
channel.queueDeclare("queue.dlx", true, false, false, null);
channel.queueBind("queue.dlx", "exchange.dlx", "routingkey");

//向exchange.normal发布一条消息
channel.basicPublish("exchange.normal", "rk", MessageProperties.PERSISTENT_TEXT_PLAIN , "dlx".getBytes());

参考下图,生产者首先发送一条携带路由键为“rk”的消息,然后经过交换器 exchange.normal 顺利地存储到队列queue.normal 中。由于队列 queue.normal 设置了过期时间为 10s,在这 10s 内没有消费者消费这条消息,那么判定这条消息为过期。

由于设置了 DLX,过期之时,消息被丢给交换器 exchange.dlx 中,这时找到与 exchange.dlx 匹配的队列 queue.dlx,最后消息被存储在 queue.dlx 这个死信队列中。对于 RabbitMQ 来说,DLX 是一个非常有用的特性。

它可以处理异常情况下,消息不能够被消费者正确消费(消费者调用了 Basic.Nack 或者 Basic.Reject)而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。

在上图中,不仅展示的是死信队列的用法,也是延迟队列的用法,对于 queue.dlx 这个死信队列来说,同样可以看作延迟队列。

假设一个应用中需要将每条消息都设置为 10 秒的延迟,生产者通过 exchange.normal 这个交换器将发送的消息存储在 queue.normal 这个队列中。消费者订阅的并非是 queue.normal 这个队列,而是 queue.dlx 这个队列。当消息从 queue.normal 这个队列中过期之后被存入 queue.dlx 这个队列中,消费者就恰巧消费到了延迟 10 秒的这条消息。

在真实应用中,对于延迟队列可以根据延迟时间的长短分为多个等级,一般分为 5 秒、10 秒、30 秒、1 分钟、5 分 钟、10 分钟、30 分钟、1 小时这几个维度,当然也可以再细化一下。

参考下图,为了简化说明,这里只设置了 5 秒、10 秒、30 秒、1 分钟这四个等级。根据应用需求的不同,生产者在发送消息的时候通过设置不同的路由键,以此将消息发送到与交换器绑定的不同的队列中。

这里队列分别设置了过期时间为 5 秒、10 秒、30 秒、1 分钟,同时也分别配置了 DLX 和相应的死信队列。当相应的消息过期时,就会转存到相应的死信队列(即延迟队列)中,这样消费者根据业务自身的情况,分别选择不同延迟等级的延迟队列进行消费。

RabbitMQ 延迟消息插件

从安装插件开始,但首先,让我们看一下以下先决条件:

  • RabbitMQ 版本 3.5.8 及更高版本。
  • Erlang/OTP 18.0 及更高版本

插件安装

在Github下载插件。将插件复制到 RabbitMQ 的插件文件夹,然后运行以下命令启用它:

# 下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez
# 将插件移动到plugins目录下
mv rabbitmq_delayed_message_exchange-3.11.1.ez ./rabbitmq_server-3.11.1/plugins/
# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

延迟消息交换器

要使用延迟消息交换器,只需声明一个类型为 x-delayed-message 的交换器,如下所示:

// ... elided code ...
Map args = new HashMap();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...

稍后,我们将解释交换声明中的特殊参数 x-delayed-type 的含义。

延迟消息

要延迟消息,用户必须使用 x-delay 标头发布它,该标头接受一个整数,表示消息应由 RabbitMQ 延迟的毫秒数。

值得注意的是,在此上下文中的延迟表示着消息路由到队列或其他交换器的延迟。交换器没有消费者的概念。 因此,一旦延迟过去,插件将尝试将消息路由到与交换器的路由规则匹配的队列。如果消息无法路由到任何队列,它将被丢弃。

// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes();
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
headers = new HashMap();
headers.put("x-delay", 5000);
props.headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

在上面的示例中,消息在被插件路由之前将延迟五秒钟。

路由灵活性

当我们在上面声明交换时,我们使用了一个设置为 direct 的 x-delayed-type 参数。这告诉交换器我们希望它在路由消息、创建绑定等时具有什么样的行为。

检查延迟消息

一旦我们在消费者端收到消息,我们如何判断消息是否被延迟? x-delay 消息头由插件保留。如果您以 5000 毫秒的延迟发送消息,消费者会发现 x-delay 标头设置为 5000。

作者:三天e九顿
链接:
https://juejin.cn/post/7202996449776353340

来源:稀土掘金

相关推荐

精品博文嵌入式6410中蓝牙的使用

BluetoothUSB适配器拥有一个BluetoothCSR芯片组,并使用USB传输器来传输HCI数据分组。因此,LinuxUSB层、BlueZUSB传输器驱动程序以及B...

win10跟这台计算机连接的前一个usb设备工作不正常怎么办?

前几天小编闲来无事就跑到网站底下查看粉丝朋友给小编我留言询问的问题,还真的就给小编看到一个问题,那就是win10跟这台计算机连接的一个usb设备运行不正常怎么办,其实这个问题的解决方法时十分简单的,接...

制作成本上千元的键盘,厉害在哪?

这是稚晖君亲自写的开源资料!下方超长超详细教程预警!!全文导航:项目简介、项目原理说明、硬件说明、软件说明项目简介瀚文智能键盘是一把我为自己设计的——多功能、模块化机械键盘。键盘使用模块化设计。左侧的...

E-Marker芯片,USB数据线的“性能中枢”?

根据线缆行业的研究数据,在2019年搭载Type-C接口的设备出货量已达到20亿台,其中80%的笔记本电脑和台式电脑采用Type-C接口,50%的智能手机和平板电脑也使用Type-C接口。我们都知道,...

ZQWL-USBCANFD二次开发通讯协议V1.04

修订历史:1.功能介绍1.1型号说明本文档适用以下型号:  ZQWL-CAN(FD)系列产品,USB通讯采用CDC类实现,可以在PC机上虚拟出一个串口,串口参数N,8,1格式,波特率可以根据需要设置(...

win10系统无法识别usb设备怎么办(win10不能识别usb)

从驱动入手,那么win10系统无法识别usb设备怎么办呢?今天就为大家分享win10系统无法识别usb设备的解决方法。1、右键选择设备管理器,如图:  2、点击更新驱动程序,如图:  3、选择浏览...

微软七月Win8.1可选补丁有内涵,含大量修复

IT之家(www.ithome.com):微软七月Win8.1可选补丁有内涵,含大量修复昨日,微软如期为Win7、Win8.1发布7月份安全更新,累计为6枚安全补丁,分别修复总计29枚安全漏洞,其中2...

如何从零开始做一个 USB 键盘?(怎么制作usb)

分两种情况:1、做一个真正的USB键盘,这种设计基本上不涉及大量的软件编码。2、做一个模拟的USB键盘,实际上可以没有按键功能,这种的需要考虑大量的软件编码,实际上是一个单片机。第一种设计:买现成的U...

电脑识别U盘失败?5个实用小技巧,让你轻松搞定USB识别难题

电脑识别U盘失败?5个实用小技巧,让你轻松搞定USB识别难题注意:有些方法会清除USB设备里的数据,请谨慎操作,如果不想丢失数据,可以先连接到其他电脑,看能否将数据复制出来,或者用一些数据恢复软件去扫...

未知usb设备设备描述符请求失败怎么解决

出现未知daousb设备设备描述符请求失du败解决办zhi法如下:1、按下Windows+R打开【运行】;2、在版本运行的权限输入框中输入:services.msc按下回车键打开【服务】;2、在服务...

读《飘》47章20(飘每章概括)

AndAhwouldn'tleaveMissEllen'sgrandchildrenfornotrashystep-patobringup,never.Here,Ah...

英翻中 消失的过去 37(消失的英文怎么说?)

翻译(三十七):消失的过去/茱迪o皮考特VanishingActs/JodiPicoult”我能做什么?“直到听到了狄利亚轻柔的声音,我才意识到她已经在厨房里站了好一会儿了。当她说话的时候,...

RabbitMQ 延迟消息实战(rabbitmq如何保证消息不被重复消费)

现实生活中有一些场景需要延迟或在特定时间发送消息,例如智能热水器需要30分钟后打开,未支付的订单或发送短信、电子邮件和推送通知下午2:00开始的促销活动。RabbitMQ本身没有直接支持延迟...

Java对象拷贝原理剖析及最佳实践(java对象拷贝方法)

作者:宁海翔1前言对象拷贝,是我们在开发过程中,绕不开的过程,既存在于Po、Dto、Do、Vo各个表现层数据的转换,也存在于系统交互如序列化、反序列化。Java对象拷贝分为深拷贝和浅拷贝,目前常用的...

如何将 Qt 3D 渲染与 Qt Quick 2D 元素结合创建太阳系行星元素?

Qt组件推荐:QtitanRibbon:遵循MicrosoftRibbonUIParadigmforQt技术的RibbonUI组件,致力于为Windows、Linux和MacOSX提...

取消回复欢迎 发表评论: