欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 旅游 > 在nodejs中使用RabbitMQ(一)安装,使用

在nodejs中使用RabbitMQ(一)安装,使用

2025/9/5 15:28:45 来源:https://blog.csdn.net/qq_35496811/article/details/145570093  浏览:    关键词:在nodejs中使用RabbitMQ(一)安装,使用

安装 

1、安装RabbitMQ,推荐直接使用docker安装。

docker container run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v ./data:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin1234 rabbitmq:4.0-management

5672端口,rabbitmq服务

15672端口,rabbitmq可视化管理界面 

2、windows,linux不用容器,可根据官网教程按步骤进行安装。Installing on RPM-based Linux | RabbitMQ

RabbitMQ 介绍

RabbitMQ 是一个开源的消息代理软件(也称为消息队列),它实现了高级消息队列协议(AMQP)。RabbitMQ 提供了可靠的消息传递机制,适用于构建分布式系统、微服务架构以及需要解耦组件的应用程序。它支持多种消息传递模式,并且具有高度的可扩展性和灵活性。官方还提供了多语言支持:Python,Java,Ruby,PHP,C#,JavaScript,Go。

RabbitMQ 的基本概念

1.生产者(Producer):左侧绿色方框代表消息的生产者,生产者将消息发送到 RabbitMQ 服务器。

2.连接(Connection)和通道(Channel):消息首先通过一个连接进入 RabbitMQ,连接内部包含多个通道。每个通道是一个轻量级的连接,用于减少开销并进行通信。

3.RabbitMQ 服务(RabbitMQ Server)和虚拟主机(Virtual Host):中央部分展示了 RabbitMQ 服务器及其内部结构。RabbitMQ 服务器中,可以创建多个虚拟主机(Virtual Host),每个虚拟主机是一个独立的消息命名空间。

4.交换器(Exchange):在每个虚拟主机内,有多个交换器。交换器负责接收来自生产者的消息,并根据预定的路由规则将消息分发到不同的队列。常见的交换机类型包括:

Direct:基于精确匹配的路由键进行消息路由。
Fanout:广播消息到所有绑定的队列。
Topic:基于通配符匹配的路由键进行消息路由。
Headers:基于消息头中的键值对进行消息路由。

5.队列(Queue):消息根据路由规则被分发到对应的队列中。队列用于存储和管理消息,等待消费者来获取消息。

6.消费者(Consumer):右侧黄色方框代表消费者。通过连接和通道,消费者从 RabbitMQ 服务器的队列中获取和处理消息。

 示例

一个生产者对应一个消费者。

 一个生产者对应多个消费者。

如何防止数据丢失

1、优先处理每一步错误,如,队列创建,exchange路由创建,消息是否发送成功。

2、持久化队列数据 durable: true,持久化队列中消息 persistent: true。

3、手动确认消息是否接收,在数据处理完后确认,channel.ack(msg)。 (注:要防止数据重复处理)
 

producer.ts 在代码中没有创建exchange,会使用默认exchange。

import RabbitMQ from 'amqplib/callback_api';function start() {RabbitMQ.connect("amqp://admin:admin1234@localhost:5672?heartbeat=60", function (err0, conn) {if (err0) {console.error("[AMQP]", err0.message);return setTimeout(start, 1000);}conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");return setTimeout(start, 1000);});console.log("[AMQP] connected");conn.createChannel(async (err2, channel) => {if (err2) {console.error("[AMQP]", err2.message);return setTimeout(start, 1000);}const queueName = 'queue1';// 创建一个队列channel.assertQueue(queueName, {durable: true, //队列持久化}, (err, ok) => {if (err) {console.log('队列创建失败!');}console.log(err, ok);});for (let i = 0; i < 30; ++i) {console.log('message send!', channel.sendToQueue(queueName,Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),{ persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在// (err: any, ok: Replies.Empty)=>{}));}});setTimeout(() => {conn.close();process.exit(0);}, 1000);});
}start();

consumer.ts 消费者可以启动零到多个,因为设置了持久化如果没有消费者,数据会保存等待消费。

import RabbitMQ, { type Replies } from 'amqplib/callback_api';RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {if (err0) {console.error(err0);return;}conn.createChannel(function (err1, channel) {const queueName = 'queue1';channel.assertQueue(queueName, { durable: true });console.log('[*] waiting...');// 一次只有一个未确认消息,防止消费者过载channel.prefetch(1);channel.consume(queueName,function (msg) {console.log('接收到的消息', msg, msg?.content.toString());/*  // 手动确认取消channel.ack(msg); noAck:false,// 自动确认消息// if (msg) {//   channel.ack(msg);// } */},{noAck: true, // 是否自动确认消息// noAck: false},(err: any, ok: Replies.Empty) => {console.log(err, ok);},);});conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");});
});

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词