加群联系作者vx:xiaoda0423
仓库地址:https://webvueblog.github.io/JavaPlusDoc/
https://1024bat.cn/
public void handleAccessEvent(ConsumerRecord<?, ?> recordMsg, Acknowledgment ack) {long start = System.currentTimeMillis();List<ConsumerRecord<?, ?>> records = Collections.singletonList(recordMsg);String timeStr = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());handleBiz(records, start, timeStr);ack.acknowledge(); // 手动提交偏移量,确保消息处理完毕log.info("==>设备数据-柜子事件处理耗时: {}ms", System.currentTimeMillis() - start);}public void handleBiz(List<ConsumerRecord<?, ?>> records, long countTime, String timeStr) {for (ConsumerRecord<?, ?> record : records) {try {String value = record.value().toString();
优化点 | 说明 |
---|---|
幂等性处理 | 若多次消费同一订单,确保不会重复写入(订单状态比对) |
异步执行更新 | bizOrderService.setRenewEndOrder() 可使用线程池异步提高性能 |
延迟队列支持 | 若逾期处理需延迟精确执行,可用 Kafka 延迟消息或 Redis 延时任务 |
Prometheus监控指标 | 上报处理成功数、失败数、处理耗时等,用于高可用监控 |
消费线程隔离 | 可将不同事件类型拆分不同消费者容器,提升隔离性与扩展性 |
/*** Kafka 消费监听 - 处理订单逾期事件*/
@KafkaListener(topics = KafkaConstant.ORDER_OVERDUE_TOPIC,containerFactory = "bizBRConsumerFactory",properties = {"max.poll.records = 100"} // 每次最多拉取 100 条,提升吞吐量
)
public void handle(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {if (CollectionUtils.isEmpty(records)) {return;}String dateTime = LocalDateTime.now().format(inputFormatter); // 当前时间格式化List<String> failedOrders = new ArrayList<>(); // 记录失败订单 ID 以便后续排查for (ConsumerRecord<String, String> record : records) {// 空数据直接跳过if (StringUtils.isBlank(record.value())) {continue;}try {log.info("[Kafka-订单逾期] 接收到原始消息:{}", record.value());// 反序列化消息为事件对象OrderOverdueEvent overdueEvent = GsonUtils.getObjectFromJson(record.value(), OrderOverdueEvent.class);if (overdueEvent == null || StringUtils.isBlank(overdueEvent.getOrderId())) {log.warn("[Kafka-订单逾期] 消息格式异常:{}", record.value());continue;}BExchSvcOrder order = bExchSvcOrderService.selectById(overdueEvent.getOrderId());if (!updatePreCheck(order)) {continue;}// 标记逾期延迟(内部逻辑需要)order.setOverdueDelay(true);// 首先尝试通过续费接口完结BExchSvcOrder result = bizOrderService.setRenewEndOrder(order, dateTime);if (result == null) {// 若续费接口返回为空,说明未续费成功,则直接设置为逾期状态log.info("[Kafka-订单逾期] 订单逾期,未续费完结,准备状态更新,订单:{}", JacksonUtils.toJson(order));BExchSvcOrderBO updateBO = BExchSvcOrderBO.builder().orderId(order.getOrderId()).oStatus(OrderConstant.OVERDUE) // 设置订单状态为 OVERDUE.overdueDelay(true).updType(OrderConstant.CHANGEBAT).build();// 更新数据库状态bExchSvcOrderService.update(updateBO);}} catch (Exception ex) {// 单条处理失败,继续处理下一条,不影响整体消费String failedId = record.value();log.error("[Kafka-订单逾期] 处理失败,订单原始内容:{}", failedId, ex);failedOrders.add(failedId);}}// 所有消息处理完成后统一提交 offsetack.acknowledge();if (!failedOrders.isEmpty()) {log.warn("[Kafka-订单逾期] 以下订单处理失败需排查:{}", failedOrders);}
}
配置优化:
properties
# 消费者配置 spring.kafka.consumer.max-poll-records=500 spring.kafka.listener.concurrency=3 spring.kafka.listener.poll-timeout=5000# 线程池配置 io.executor.corePoolSize=10 io.executor.maxPoolSize=20 io.executor.queueCapacity=1000
监控增强:
添加Metrics指标(处理速率、耗时分布等)
实现Circuit Breaker模式(如Resilience4j)
关键操作添加TraceID实现全链路追踪
数据一致性:
实现幂等处理(通过eventId去重)
使用事务消息保证最终一致性
添加死信队列处理无法消费的消息
压力测试:
使用JMeter进行不同批量大小的性能测试
调整线程池参数找到最优配置
测试故障恢复能力(如ES宕机时的重试机制)
✅ 使用线程池替代单线程消费,提高并发能力;
✅ 增强可配置性和灵活性(比如线程数量、阻塞时间);
✅ 异常处理更加细粒度;
✅ 避免重复初始化;
✅ 增强日志和可追踪性;
/*** 抽象延迟消息处理器,支持 Redisson 延迟队列消费* 适用于高并发、可扩展的异步延迟消息处理场景*/ public abstract class xxx<T> implements xxx<T> {private static final Logger log = LoggerFactory.getLogger(AbstractDelayMessageHandler.class);private static final int DEFAULT_POLL_SLEEP_TIME = 5000; // 默认阻塞队列轮询等待时间(毫秒)@Autowiredprivate RedissonClient redissonClient; // 注入 Redisson 客户端,用于操作延迟队列private RBlockingDeque<T> blockingDeque; // Redisson 阻塞队列private RDelayedQueue<T> delayedQueue; // Redisson 延迟队列// 消费标志位,控制线程是否运行// 是否启用消费者,配置优先级高@Value("${redisson.delay.consumer.enable:false}")protected boolean enable; // 是否启用消费者,配置优先级高@Value("${redisson.delay.consumer.threads:2}")private int consumerThreads; // 消费者线程数,默认2个,可通过配置覆盖@Value("${redisson.delay.consumer.pollTimeout:5000}")private long pollTimeoutMs; // 轮询延迟消息的超时时间,默认5000msprivate ExecutorService consumerExecutor; // 消费线程池// 阻塞拉取延迟消息(支持设置超时时间)message = blockingDeque.poll(pollTimeoutMs, TimeUnit.MILLISECONDS);if (Objects.isNull(message)) {continue;}// 过滤空消息if ((message instanceof String) && StringUtils.isBlank((String) message)) {continue;}log.info("消费延迟队列:{},接收消息:{}", queueName(), message);// 处理延迟消息(交由子类实现)this.handle(message);
✅ 延迟场景 1:优惠券即将过期提醒(提前1小时提醒)
实体类
CouponExpireReminderMessage.java
@Data @NoArgsConstructor @AllArgsConstructor public class CouponExpireReminderMessage implements Serializable {private static final long serialVersionUID = 1L;private String userId;private String couponId;private String couponName;private String expireTime; // 格式:yyyy-MM-dd HH:mm:ss }
延迟队列处理器
CouponExpireReminderHandler.java
@Component public class CouponExpireReminderHandler extends AbstractDelayMessageHandler<CouponExpireReminderMessage> {private static final Logger log = LoggerFactory.getLogger(CouponExpireReminderHandler.class);@Overrideprotected String queueName() {return"coupon:expire:remind:delay:queue";}@Overrideprotected void handle(CouponExpireReminderMessage message) {try {log.info("【优惠券过期提醒】用户:{},券:{}({}),过期时间:{}",message.getUserId(), message.getCouponId(), message.getCouponName(), message.getExpireTime());// 示例:通过站内信或推送系统通知用户// notificationService.sendExpireReminder(message.getUserId(), message.getCouponName());} catch (Exception e) {log.error("处理优惠券过期提醒异常:{}", message, e);}} }
加入延迟队列
CouponExpireReminderMessage msg = new CouponExpireReminderMessage("user123", "coupon456", "满50减20", "2025-04-10 23:59:59" ); redissonClient.getDelayedQueue(redissonClient.getBlockingDeque("coupon:expire:remind:delay:queue", new JsonJacksonCodec())).offer(msg, 1, TimeUnit.HOURS); // 提前一小时提醒
✅ 延迟场景 2:用户未读通知提醒(24小时后仍未读则再次提醒)
实体类
UnreadNotificationReminderMessage.java
@Data @NoArgsConstructor @AllArgsConstructor public class UnreadNotificationReminderMessage implements Serializable {private static final long serialVersionUID = 1L;private String userId;private String noticeId;private String title; }
处理器类
UnreadNotificationReminderHandler.java
@Component public class UnreadNotificationReminderHandler extends AbstractDelayMessageHandler<UnreadNotificationReminderMessage> {private static final Logger log = LoggerFactory.getLogger(UnreadNotificationReminderHandler.class);@Overrideprotected String queueName() {return"notice:unread:remind:delay:queue";}@Overrideprotected void handle(UnreadNotificationReminderMessage message) {try {// 模拟判断是否未读(例如通过DB检查状态)// boolean unread = noticeService.isUnread(message.getUserId(), message.getNoticeId());boolean unread = true; // 模拟if (unread) {log.info("【未读通知提醒】用户:{},通知标题:{}", message.getUserId(), message.getTitle());// 推送提醒// pushService.sendUnreadNotification(message.getUserId(), message.getTitle());}} catch (Exception e) {log.error("未读通知提醒处理失败:{}", message, e);}} }
高可用 Redisson 自带重连机制;建议集群部署支持备份
Redisson / RabbitMQ / Kafka 的延迟队列功能
无需关心底层使用哪种消息系统
RabbitMQ 延迟队列实现
RabbitMQ 本身不支持真正的延迟队列,但可使用插件或
TTL + DLX(死信交换机)
实现(使用延迟插件)
@Bean public CustomExchange delayExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange("delay.exchange", "x-delayed-message", true, false, args); }
实现类
多队列统一管理调度
支持多个不同延迟任务(如:订单取消、支付过期、短信发送等)统一注册和调度,提高可扩展性。
结构如下
com.xxx.delay ├── AbstractDelayMessageHandler<T> // 抽象类:每种业务继承它 ├── DelayQueueManager // 统一注册&调度 ├── ThreadPoolExecutorFactory // 统一线程池管理 ├── impl │ └── OrderCancelDelayHandler // 实现类:订单取消 ├── model │ └── OrderCancelMessage // 消息体
功能清单
功能
说明
🔁 失败重试机制
支持自定义最大重试次数和退避重试
⏱ 消费超时检测
如果处理时间超出阈值,记录慢消费日志
📊 Prometheus监控支持
可用于 Grafana 展示消费速率/失败率/重试次数等
失败重试机制(自动重新入队)
🎯 实现点
每条消息增加
retryCount
字段重试时延迟投递到 Redisson 延迟队列
超过最大次数后记录错误日志,不再消费