前言
本节主要记录消息队列(RabbitMQ)的安装、配置以及C++版的生产者消费者用法。
RabbitMQ 安装
RabbitMQ 是由 Erlang
语言开发,基于 AMQP
(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。 AMQP
协议格式如下:
amqp://<username>:<password>@<host>:<port>/<virtual>
username: 用户名
password: 登录密码
host: 服务所在主机地址
port: 服务端口号
virtual: 虚拟路径
软件下载
下载时注意版本对应关系。
配置环境变量
- Erlang
- 在系统变量中新增Erlang的安装路径作为ERLANG_HOME: path/erlang(安装目录)
- 在系统变量中Path变量里新增 %ERLANG_HOME%\bin
- RabbitMQ
- 在系统中新增RabbitMQ的安装路径作为RABBITMQ_SERVER: path/rabbitmq(安装目录)
- 在系统变量中Path变量里新增 %RABBITMQ_SERVER%\sbin
RabbitMQ 启动
检查 rabbitmq_management
是否安装
rabbitmq-plugins list # 查看RabbitMQ中所有插件
启用 rabbitmq_management
插件
rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins enable xxx # 启用 xxx 插件
rabbitmq-plugins disable xxx # 禁用 xxx 插件
启动RabbitMQ
rabbitmq-service.bat start # 启动命令
# 启动后访问 http://localhost:15672,使用默认账号 guest/guest 登录。
rabbitmq-service.bat stop # 停止命令
rabbitmq-service.bat restart # 重启命令
常见命令
rabbitmqctl list_users # 查看rabbitmq_management中的注册用户
rabbitmqctl add_user [username] [password] # 在rabbitmq_management中创建用户
# 给rabbitmq_management中用户设置 vhost(虚拟主机) 级别的权限
rabbitmqctl set_permissions -p <vhost> <user> "<conf>" "<write>" "<read>"
rabbitmqctl set_user_tag [tag] [...] ... # 给rabbitmq_management中用户设置标签
rabbitmq-server -restart # 重启RabbitMQ服务
rabbitmq-server stop # 关闭RabbitMQ服务
C++库编译
c++ 连接 RabbitMQ 的库目前不多, 主要有三个常用选项:rabbitmq-c
、SimpleAmqpClient
和 AMQP-CPP
。
rabbitmq-c
rabbitmq-c
是 RabbitMQ 官方提供的 C 语言 AMQP 客户端库,底层实现了 AMQP 0.9.1 协议。它的特点包括:- 轻量级:无额外依赖,非常适合嵌入式和性能敏感的应用场景。
- 提供基础的 AMQP 操作,例如连接管理、通道管理、队列声明、消息发布/消费等。
- 适用于 C 和 C++ 项目,但由于它是用 C 语言开发的,因此其 API 设计较为底层,缺乏一些现代化的封装。
适合轻量级、嵌入式、对性能要求较高的 C 语言项目。
SimpleAmqpClient
SimpleAmqpClient
是基于rabbitmq-c
的 C++ 封装库,旨在为 C++ 开发者提供更现代化、更易用的 API。其特点包括:- 基于
rabbitmq-c
,提供了一个更现代化的 C++ API,使得与底层 AMQP 协议的交互更加简便。 - 采用 RAII(资源管理即初始化)方式,自动管理连接和资源,减少手动管理连接的复杂性。
- 适用于 C++ 开发者,尤其是那些不想直接与
rabbitmq-c
的底层 API 交互的开发者。
适合现代化 C++ 项目,提供了对
rabbitmq-c
的便捷封装。- 基于
AMQP-CPP
AMQP-CPP
是一个高性能、独立的 C++ 客户端库,其特点包括:- 完全独立实现了 AMQP 0.9.1 协议,**不依赖
rabbitmq-c
**,且完全使用 C++ 开发。 - 采用异步设计,基于事件驱动(event-driven)模型,非常适合高并发、高吞吐量的应用。
- 提供对多线程、多连接的支持,适用于需要高并发和低延迟的场景。
- 支持基于 Boost.Asio 或 libev/libuv 的事件循环。
适用于高并发、多线程和高吞吐量的 C++ 应用。
- 完全独立实现了 AMQP 0.9.1 协议,**不依赖
本节主要使用 rabbitmq-c
库进行测试。
Windows 编译
编译 rabbitmq-c
,先下载 git clone https://github.com/alanxz/rabbitmq-c.git
,然后通过Cmake进行编译。编译过程:


生成完成后在 build 文件夹直接双击 .sln
文件用VS打开工程源码。本例 build 的是 Release,64 位程序,选中根项目右键生成即可。
Linux 编译
Linux 上编译相对要简单许多,直接参考下面的命令即可。
# 1.下载 rabbitmq-c 源码
git clone --recursive https://github.com/alanxz/rabbitmq-c.git
cd rabbitmq-c
mkdir build && cd build
# -DBUILD_SHARED_LIBS=ON 使 rabbitmq-c 作为共享库(动态库)。
# -DBUILD_STATIC_LIBS=OFF 禁用静态库(SimpleAmqpClient 默认使用动态库)。
# -DCMAKE_INSTALL_PREFIX=/usr/local 指定安装到 /usr/local 目录。
cmake .. -DBUILD_SHARED_LIBS=ON -DBUILD_STATIC_LIBS=OFF -DCMAKE_INSTALL_PREFIX=/usr/local
# 编译 & 安装
make -j$(nproc)
sudo make install
# 2.安装 boost
sudo apt install -y libboost-all-dev
# 3.安装 SimpleAmqpClient
git clone https://github.com/alanxz/SimpleAmqpClient.git
cd SimpleAmqpClient
mkdir build && cd build
# -DCMAKE_INSTALL_PREFIX=/usr/local 指定安装路径。
# -DCMAKE_CXX_STANDARD=14 使用 C++14(部分 Jetson 设备默认 C++11,可能导致编译错误)。
# -DBOOST_ROOT=/usr/local 指定 Boost 路径(如果 Boost 安装在其他位置,请修改)。
cmake .. -DCMAKE_INSTALL_PREFIX=/usr/local -DCMAKE_CXX_STANDARD=14 -DBOOST_ROOT=/usr/local
# 编译 & 安装
make -j$(nproc)
sudo make install
安装结束后会生成 /usr/local/include/rabbitmq-c
、/usr/local/include/SimpleAmqpClient
并且在 /usr/local/include
目录下有4个 amqp 开头的 .h
文件,这四个文件已经被舍弃,在程序中引用时直接引用rabbitmq-c
下的头文件,如下例:
/*
amqp.h is deprecated, use rabbitmq-c/amqp.h instead.
amqp_tcp_socket.h is deprecated, use rabbitmq-c/tcp_socket.h instead.
#include <amqp.h>
#include <amqp_tcp_socket.h>
*/
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/tcp_socket.h>
代码演示
生产者,Linux 平台作为生产者,将数据发送到 RabbitMQ 服务,在 Windows 平台进行消费。
生产者示例:
#include <iostream>
#include <string>
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/tcp_socket.h>
// 用于处理 AMQP 错误并输出错误信息
void message_error(amqp_rpc_reply_t x, const char* context) {
if (x.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Error in " << context << ": " << amqp_error_string2(x.library_error) << std::endl;
return;
}
}
int main(){
const std::string hostname = "xxx.xxx.xxx.xxx"; // RabbitMQ address
const int port = 5672;
const std::string exchange = "NXTest";
const std::string routing_key = "";
// 连接初始化
amqp_connection_state_t conn = amqp_new_connection();
amqp_socket_t *socket = amqp_tcp_socket_new(conn);
if(!socket){
std::cout << "Create TCP socket failed." <<std::endl;
return -1;
}
// 建立与 RabbitMQ 服务器的 TCP 连接。如果 RabbitMQ 服务器启用了 SSL,则不能使用 amqp_socket_open(),需要使用 amqp_ssl_socket_new()。
int status = amqp_socket_open(socket, hostname.c_str(), port);
if(status){
std::cout << "Open TCP socket failed." <<std::endl;
return -1;
}
/* 登录 RabbitMQ
"/": 虚拟主机的路径。RabbitMQ 默认的虚拟主机是 "/"
0: 这里是 channel_max 参数,表示允许在此连接中打开的最大通道数。0 表示没有限制。
131072: frame_max,表示可以接受的最大帧大小,单位是字节。默认值为 131072 字节(128 KB),通常不需要更改,除非你需要传输非常大的消息。
0: heartbeat,表示心跳间隔,单位是秒。0 表示禁用心跳机制。如果你不需要心跳,可以设置为 0,否则可以设置为其他值(例如,10 表示每 10 秒发送一次心跳)。
AMQP_SASL_METHOD_PLAIN: 认证方式,这里使用的是 AMQP_SASL_METHOD_PLAIN,表示简单的用户名和密码认证方式。RabbitMQ 默认使用 PLAIN 方法。
*/
message_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "admin", "admin"), "Logging in");
amqp_channel_open(conn, 1); // 打开通道
/* 同步 RPC 响应检查函数,用于获取上一个 AMQP 命令的执行结果,并判断是否成功执行。本例就是判断 amqp_channel_open 是否执行成功。 */
message_error(amqp_get_rpc_reply(conn), "Opening channel");
/* 声明(创建)一个 AMQP 交换机
conn AMQP 连接对象
1 交换机所在的通道(通常 1)
amqp_cstring_bytes(exchange.c_str()) 交换机名称
amqp_cstring_bytes("direct") 交换机类型(如 "direct", "fanout", "topic", "headers")
0 amqp_boolean_t 是否为被动声明(0=否,1=是)
0 amqp_boolean_t 是否持久化(0=否,1=是)
0 amqp_boolean_t 是否自动删除(0=否,1=是)
0 amqp_boolean_t 是否内部交换机(0=否,1=是)
amqp_empty_table 交换机的额外参数,通常填 amqp_empty_table
*/
amqp_exchange_declare(conn,1,amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes("direct"),0,0,0,0, amqp_empty_table);
message_error(amqp_get_rpc_reply(conn), "Declaring exchange");
while(1){
std::cout << "please input a line message, click enter end: " << std::endl;;
std::string input;
std::getline(std::cin, input); // 读取整行输入
amqp_bytes_t message_bytes = amqp_cstring_bytes(input.c_str());
/* 向 RabbitMQ 发送(发布)消息,生产者通过它将消息发送到交换机,然后 RabbitMQ 依据路由规则(Routing Key)将消息分发到队列。
conn RabbitMQ 连接
1 通道 ID(通常 1)
amqp_cstring_bytes(exchange.c_str()) 交换机名称
amqp_cstring_bytes(routing_key.c_str()) 路由键(Routing Key)
0 amqp_boolean_t 消息强制路由标志(0=可丢弃,1=必须投递)
0 amqp_boolean_t 消息立即投递标志(0=正常,1=仅队列有消费者时才发送)
NULL amqp_basic_properties_t * 消息属性(NULL 表示无特殊属性)
message_bytes 消息体(要发送的数据)
*/
int res = amqp_basic_publish(conn,1,amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes(routing_key.c_str()),0,0,NULL, message_bytes);
if (res < 0) {
std::cerr << "Error publishing message " << input << std::endl;
} else {
std::cout << "Message " << input << " published: " << res << std::endl;
}
}
std::cout << "hello" <<std::endl;
// 清理连接
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
return 0;
}
注意:上例在使用 guest
用户连接时,会报异常,因为 RabbitMQ
默认对 guest
用户做了访问限制,导致 guest
账号无法从远程机器访问 RabbitMQ
。guest
用户只能从 localhost
登录。
消费者示例:
#include <iostream>
#include <string>
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/tcp_socket.h>
// 用于处理 AMQP 错误并输出错误信息
void message_error(amqp_rpc_reply_t x, const char *context) {
if (x.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Error in " << context << ": " << amqp_error_string2(x.library_error) << std::endl;
return;
}
}
int main(int argc, char *argv[]) {
const std::string hostname = "localhost";
const int port = 5672;
const std::string queue = "NXTestQueues";
const std::string exchange = "NXTest";
const std::string routing_key = "";
amqp_connection_state_t conn = amqp_new_connection();
amqp_socket_t *socket = amqp_tcp_socket_new(conn);
if (!socket) {
}
int status = amqp_socket_open(socket, hostname.c_str(), port);
if (status) {
}
// RabbitMQ 登录
message_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in");
// 打开通道
amqp_channel_open(conn, 1);
message_error(amqp_get_rpc_reply(conn), "Opening channel");
// 声明交换机
amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes("direct"), 0, 0, 0, 0, amqp_empty_table);
message_error(amqp_get_rpc_reply(conn), "Declaring exchange");
/* 声明队列
conn RabbitMQ 连接
1 通道 ID(通常 1)
amqp_cstring_bytes(exchange.c_str()) 队列名称
0 amqp_boolean_t 是否被动声明(0=如果不存在则创建,1=只检查是否存在)
0 amqp_boolean_t 是否持久化(0=否,1=是)
0 amqp_boolean_t 是否为排他队列(0=否,1=是,仅当前连接可用,断开即删除)
1 amqp_boolean_t 是否自动删除(0=否,1=是,无消费者时自动删除)
arguments amqp_table_t 额外参数,通常填 amqp_empty_table
*/
amqp_queue_declare_ok_t *q = amqp_queue_declare(conn, 1, amqp_cstring_bytes(exchange.c_str()), 0, 0, 0, 1, amqp_empty_table);
message_error(amqp_get_rpc_reply(conn), "Declaring queue");
// 将队列绑定到交换机,并指定路由键。
amqp_queue_bind(conn, 1, amqp_cstring_bytes(queue.c_str()), amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes(routing_key.c_str()), amqp_empty_table);
message_error(amqp_get_rpc_reply(conn), "Binding queue");
/* 注册消费者,在 RabbitMQ 中订阅(消费)队列中的消息,
它告诉 RabbitMQ:从指定队列消费消息、分配一个消费者标签(consumer tag)、等待服务器发送消息
消费者一旦注册成功,RabbitMQ 会主动推送消息 给消费者,而不是让消费者手动拉取。
conn RabbitMQ 连接
1 amqp_channel_t 通道 ID(通常 1)
amqp_cstring_bytes(queue.c_str()) 队列名称
amqp_empty_bytes 消费者标签(通常填 amqp_empty_bytes 让服务器自动分配)
0 amqp_boolean_t 是否本地消息不会消费(通常填 0)
1 amqp_boolean_t 是否自动应答(1=自动确认,0=手动确认)
0 amqp_boolean_t 是否独占队列(1=只有本消费者能消费,0=共享)
arguments amqp_table_t 额外参数(通常填 amqp_empty_table)
*/
amqp_basic_consume(conn, 1, amqp_cstring_bytes(queue.c_str()), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
message_error(amqp_get_rpc_reply(conn), "Consuming");
while (true) {
amqp_rpc_reply_t res;
amqp_envelope_t envelope;
// amqp_maybe_release_buffers 用于释放 AMQP 连接中的缓冲区内存,避免内存占用过大。
amqp_maybe_release_buffers(conn);
/* 从 RabbitMQ 服务器接收(消费),amqp_consume_message 是客户端库中消息最常用的同步消息消费函数。
conn amqp_connection_state_t RabbitMQ 连接
envelope amqp_envelope_t * 存储接收到的消息
nullptr struct timeval * 超时时间(nullptr = 阻塞模式,直到收到消息)
0 amqp_boolean_t 标志位,通常填 0
*/
res = amqp_consume_message(conn, &envelope, nullptr, 0);
if (res.reply_type == AMQP_RESPONSE_NORMAL) {
std::cout << "Received: " << std::string((char *)envelope.message.body.bytes, envelope.message.body.len) << std::endl;
// 释放 amqp_consume_message() 获取的消息资源,防止内存泄漏。
amqp_destroy_envelope(&envelope);
} else {
std::cerr << "Error consuming message" << std::endl;
break;
}
}
std::cout << "RabbitMQ Test!" << std::endl;
return 0;
}

工作模式
- 简单模式
- 工作队列模式
- 发布订阅模式
- 路由模式
- 通配符模式
- 远程调用模式
TODO
数据传输可靠性分析
消费者扩展:
- 消费者消息确认:当消费者收到消息后有两种确认方式,自动确认与手动确认。
- 自动确认:指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。
- 手动签收:即在业务处理成功再通知签收消息,如果出现异常,则拒签消息,让消息依然保留在队列当中。
TODO
其他属性
- 消费限流:当队列中积压大量数据时,若此时开启一个消费线程,大量数据同时推送过来时,消费者若处理不过来,可能会造成线程崩溃。为了避免这一现象,RabbitMQ 提供了一种 Qos(Quality Of Service,服务质量)服务质量保证功能,即在非自动确认消息的前提下,如果一定数目的消息(预取值)未被确认之前,不再进行消费新的消息。
- 不公平分发
- 优先级队列
- 消息存活时间
- 死信队列
- 延迟队列
TODO
参考链接:https://blog.csdn.net/weixin_45606067/article/details/134734794
集群搭建
TODO