spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 自动ack
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
上述的配置文件,启动rabbitmq中消息自动确认模式,并且配置消息失败重试处理。
失败重试处理:
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
配置错误交换机
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}
配置死信交换机
package com.heima.wemedia.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitmqConfig {@Beanpublic Queue delayQueue() {return QueueBuilder.durable("delay_queue").withArgument("x-dead-letter-exchange", "dlx_exchange") // 指定死信交换机.withArgument("x-dead-letter-routing-key", "dlx_key") // 指定死信路由键.build();}@Beanpublic DirectExchange normalExchange() {return new DirectExchange("normal_exchange");}@Beanpublic Binding bindDelayQueue() {return BindingBuilder.bind(delayQueue()).to(normalExchange()).with("normal_key");}@Beanpublic Queue dlxQueue() {return QueueBuilder.durable("dlx_queue").build();}@Beanpublic DirectExchange dlxExchange() {return new DirectExchange("dlx_exchange");}@Beanpublic Binding bindDlxQueue() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx_key");}}
死信交换机与错误交换机的异同
死信交换机(Dead Letter Exchange, DLX) 确实可以覆盖 错误交换机 的功能,同时还能处理以下情况的消息:
消费失败的消息:
如果消息在消费端处理失败,并且消息的重试次数耗尽,那么消息可以路由到死信队列。
如果消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
过期的消息:
如果队列或消息本身设置了 TTL(Time-To-Live),消息过期后会自动被投递到绑定的死信交换机。
队列溢出的消息:
如果队列达到最大长度限制,超出的消息会被路由到死信队列。
死信交换机与错误交换机的关系
错误交换机是死信交换机的一个特殊应用:
错误交换机专注于消费失败的消息,通常结合 Spring 的重试机制或业务逻辑。
而死信交换机能处理更多种类的消息异常,比如过期或溢出。
功能重叠但适用场景不同:
如何选择
如果只需要处理重试失败的消息,直接使用 RepublishMessageRecoverer 和错误队列即可。
如果希望统一处理多种异常(过期、溢出、消费失败),可以依赖死信交换机,并在死信队列中再区分消息类型。
