欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 幼教 > Python RabbitMQ 消息队列监听

Python RabbitMQ 消息队列监听

2025/5/2 6:14:25 来源:https://blog.csdn.net/qq_34814092/article/details/143216134  浏览:    关键词:Python RabbitMQ 消息队列监听

Python RabbitMQ 消息队列监听

# coding: utf-8
# 测试消息消费import datetime
import logging as log
import os
from pathlib import Path
from typing import Listimport pika# 设置日志格式
Path("./logs").mkdir(parents=True, exist_ok=True)
os.chdir("./logs/")
log_file_name = datetime.date.today().strftime("%Y-%m-%d")
log_format = ("%(asctime)s.%(msecs)03d [%(levelname)s] [%(filename)s:%(lineno)d] - %(message)s"
)
log.basicConfig(level=log.INFO,filename="python-check-" + log_file_name + ".log",datefmt="%Y-%m-%d %H:%M:%S",format=log_format,encoding="utf-8",
)class RabbitMQConsumer:def __init__(self, host="localhost", queue_name="test-queue", batch_size=5):self.host = hostself.queue_name = queue_nameself.batch_size = batch_sizeself.connection = Noneself.channel = Noneself.message_count = 0self.messages: List[pika.spec.Basic.Deliver] = []self.delivery_tags: List[int] = []def conn(self):log.info("测试消费者 连接RabbitMQ开始!")self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host))self.channel = self.connection.channel()# 声明队列self.channel.queue_declare(queue=self.queue_name, durable=True)# 设置QoS,限制未确认的消息数量self.channel.basic_qos(prefetch_count=self.batch_size)log.info("测试消费者 连接RabbitMQ成功!")def close(self):log.info("测试消费者 关闭RabbitMQ连接!")if self.connection and not self.connection.is_closed:self.connection.close()def start_consuming(self):log.info("测试消费者 监听开始!")try:# 确保已连接if not self.connection or self.connection.is_closed:self.conn()log.info(f"测试消费者 监听队列:{self.queue_name}")# 设置消费回调self.channel.basic_consume(queue=self.queue_name,on_message_callback=self.customer,)# 开始消费self.channel.start_consuming()except KeyboardInterrupt:log.error("测试消费者 停止消费!")self.close()except Exception as e:log.error(f"测试消费者 发生错误 停止消费:{str(e)}")self.close()def customer(self, ch, method, properties, body):log.error(f"测试消费者 接受到消息:{body.decode()}")try:# 打印消息内容print(f"测试消费者 接受到消息:{body.decode()}")# 设置计数器和列表 消息达到batch_size才消费self.messages.append(body)self.delivery_tags.append(method.delivery_tag)self.message_count += 1# 当达到批处理大小时,进行批量确认if self.message_count >= self.batch_size:print(f"\n批量处理完成 {self.batch_size} 条消息")print("消息内容:", [msg.decode() for msg in self.messages])# 确认所有消息ch.basic_ack(delivery_tag=self.delivery_tags[-1], multiple=True)# 重置计数器和列表self.message_count = 0self.messages = []self.delivery_tags = []except Exception as e:log.error(f"测试消费者 处理消息异常:{str(e)}")# 发生错误时,拒绝消息并重新入队ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)if __name__ == "__main__":# 创建消费者实例并开始消费consumer = RabbitMQConsumer(host="localhost", queue_name="test-queue", batch_size=5)consumer.start_consuming()

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词