RabbitMQ基础用法


前言

本节主要记录消息队列(RabbitMQ)的安装、配置以及C++版的生产者消费者用法。

RabbitMQ 安装

RabbitMQ 是由 Erlang 语言开发,基于 AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。 AMQP 协议格式如下:

amqp://<username>:<password>@<host>:<port>/<virtual>
username: 用户名
password: 登录密码
host: 服务所在主机地址
port: 服务端口号
virtual: 虚拟路径

软件下载

下载时注意版本对应关系。

配置环境变量

  • Erlang
    • 在系统变量中新增Erlang的安装路径作为ERLANG_HOME: path/erlang(安装目录)
    • 在系统变量中Path变量里新增 %ERLANG_HOME%\bin
  • RabbitMQ
    • 在系统中新增RabbitMQ的安装路径作为RABBITMQ_SERVER: path/rabbitmq(安装目录)
    • 在系统变量中Path变量里新增 %RABBITMQ_SERVER%\sbin

RabbitMQ 启动

检查 rabbitmq_management 是否安装

rabbitmq-plugins list # 查看RabbitMQ中所有插件

启用 rabbitmq_management 插件

rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins enable xxx		# 启用 xxx 插件
rabbitmq-plugins disable xxx	# 禁用 xxx 插件

启动RabbitMQ

rabbitmq-service.bat start		# 启动命令
# 启动后访问 http://localhost:15672,使用默认账号 guest/guest 登录。
rabbitmq-service.bat stop		# 停止命令
rabbitmq-service.bat restart	# 重启命令

常见命令

rabbitmqctl list_users							# 查看rabbitmq_management中的注册用户
rabbitmqctl add_user [username] [password]		# 在rabbitmq_management中创建用户
# 给rabbitmq_management中用户设置 vhost(虚拟主机) 级别的权限
rabbitmqctl set_permissions -p <vhost> <user> "<conf>" "<write>" "<read>"
rabbitmqctl set_user_tag [tag] [...] ...		# 给rabbitmq_management中用户设置标签
rabbitmq-server -restart						# 重启RabbitMQ服务
rabbitmq-server stop							# 关闭RabbitMQ服务

C++库编译

c++ 连接 RabbitMQ 的库目前不多, 主要有三个常用选项:rabbitmq-cSimpleAmqpClientAMQP-CPP

  1. rabbitmq-c

    rabbitmq-c 是 RabbitMQ 官方提供的 C 语言 AMQP 客户端库,底层实现了 AMQP 0.9.1 协议。它的特点包括:

    • 轻量级:无额外依赖,非常适合嵌入式和性能敏感的应用场景。
    • 提供基础的 AMQP 操作,例如连接管理、通道管理、队列声明、消息发布/消费等。
    • 适用于 C 和 C++ 项目,但由于它是用 C 语言开发的,因此其 API 设计较为底层,缺乏一些现代化的封装。

    适合轻量级、嵌入式、对性能要求较高的 C 语言项目。

  2. SimpleAmqpClient

    SimpleAmqpClient 是基于 rabbitmq-c 的 C++ 封装库,旨在为 C++ 开发者提供更现代化、更易用的 API。其特点包括:

    • 基于 rabbitmq-c,提供了一个更现代化的 C++ API,使得与底层 AMQP 协议的交互更加简便。
    • 采用 RAII(资源管理即初始化)方式,自动管理连接和资源,减少手动管理连接的复杂性。
    • 适用于 C++ 开发者,尤其是那些不想直接与 rabbitmq-c 的底层 API 交互的开发者。

    适合现代化 C++ 项目,提供了对 rabbitmq-c 的便捷封装。

  3. AMQP-CPP

    AMQP-CPP 是一个高性能、独立的 C++ 客户端库,其特点包括:

    • 完全独立实现了 AMQP 0.9.1 协议,**不依赖 rabbitmq-c**,且完全使用 C++ 开发。
    • 采用异步设计,基于事件驱动(event-driven)模型,非常适合高并发、高吞吐量的应用。
    • 提供对多线程、多连接的支持,适用于需要高并发和低延迟的场景。
    • 支持基于 Boost.Asiolibev/libuv 的事件循环。

    适用于高并发、多线程和高吞吐量的 C++ 应用。

本节主要使用 rabbitmq-c 库进行测试。

Windows 编译

编译 rabbitmq-c,先下载 git clone https://github.com/alanxz/rabbitmq-c.git,然后通过Cmake进行编译。编译过程:

生成完成后在 build 文件夹直接双击 .sln 文件用VS打开工程源码。本例 build 的是 Release,64 位程序,选中根项目右键生成即可。

Linux 编译

Linux 上编译相对要简单许多,直接参考下面的命令即可。

# 1.下载 rabbitmq-c 源码
git clone --recursive https://github.com/alanxz/rabbitmq-c.git
cd rabbitmq-c
mkdir build && cd build
# -DBUILD_SHARED_LIBS=ON 使 rabbitmq-c 作为共享库(动态库)。
# -DBUILD_STATIC_LIBS=OFF 禁用静态库(SimpleAmqpClient 默认使用动态库)。
# -DCMAKE_INSTALL_PREFIX=/usr/local 指定安装到 /usr/local 目录。
cmake .. -DBUILD_SHARED_LIBS=ON -DBUILD_STATIC_LIBS=OFF -DCMAKE_INSTALL_PREFIX=/usr/local
# 编译 & 安装
make -j$(nproc)
sudo make install

# 2.安装 boost
sudo apt install -y libboost-all-dev

# 3.安装 SimpleAmqpClient 
git clone https://github.com/alanxz/SimpleAmqpClient.git
cd SimpleAmqpClient
mkdir build && cd build
# -DCMAKE_INSTALL_PREFIX=/usr/local 指定安装路径。
# -DCMAKE_CXX_STANDARD=14 使用 C++14(部分 Jetson 设备默认 C++11,可能导致编译错误)。
# -DBOOST_ROOT=/usr/local 指定 Boost 路径(如果 Boost 安装在其他位置,请修改)。
cmake .. -DCMAKE_INSTALL_PREFIX=/usr/local -DCMAKE_CXX_STANDARD=14 -DBOOST_ROOT=/usr/local
# 编译 & 安装
make -j$(nproc)
sudo make install

安装结束后会生成 /usr/local/include/rabbitmq-c/usr/local/include/SimpleAmqpClient 并且在 /usr/local/include 目录下有4个 amqp 开头的 .h 文件,这四个文件已经被舍弃,在程序中引用时直接引用rabbitmq-c 下的头文件,如下例:

/* 
amqp.h is deprecated, use rabbitmq-c/amqp.h instead.
amqp_tcp_socket.h is deprecated, use rabbitmq-c/tcp_socket.h instead.
#include <amqp.h>
#include <amqp_tcp_socket.h>
*/ 

#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/tcp_socket.h>

代码演示

生产者,Linux 平台作为生产者,将数据发送到 RabbitMQ 服务,在 Windows 平台进行消费。

生产者示例:

#include <iostream>
#include <string>
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/tcp_socket.h>


// 用于处理 AMQP 错误并输出错误信息
void message_error(amqp_rpc_reply_t x, const char* context) {
    if (x.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Error in " << context << ": " << amqp_error_string2(x.library_error) << std::endl;
        return;
    }
}

int main(){
    const std::string hostname = "xxx.xxx.xxx.xxx";    // RabbitMQ address
    const int port = 5672;
    const std::string exchange = "NXTest";
    const std::string routing_key = "";
    
    // 连接初始化
    amqp_connection_state_t conn = amqp_new_connection();
    amqp_socket_t *socket = amqp_tcp_socket_new(conn);

    if(!socket){
        std::cout << "Create TCP socket failed." <<std::endl;
        return -1;
    }

    // 建立与 RabbitMQ 服务器的 TCP 连接。如果 RabbitMQ 服务器启用了 SSL,则不能使用 amqp_socket_open(),需要使用 amqp_ssl_socket_new()。
    int status = amqp_socket_open(socket, hostname.c_str(), port);
    if(status){
        std::cout << "Open TCP socket failed." <<std::endl;
        return -1;
    }

    /* 登录 RabbitMQ 
        "/": 虚拟主机的路径。RabbitMQ 默认的虚拟主机是 "/" 
        0: 这里是 channel_max 参数,表示允许在此连接中打开的最大通道数。0 表示没有限制。
        131072: frame_max,表示可以接受的最大帧大小,单位是字节。默认值为 131072 字节(128 KB),通常不需要更改,除非你需要传输非常大的消息。
        0: heartbeat,表示心跳间隔,单位是秒。0 表示禁用心跳机制。如果你不需要心跳,可以设置为 0,否则可以设置为其他值(例如,10 表示每 10 秒发送一次心跳)。
        AMQP_SASL_METHOD_PLAIN: 认证方式,这里使用的是 AMQP_SASL_METHOD_PLAIN,表示简单的用户名和密码认证方式。RabbitMQ 默认使用 PLAIN 方法。
    */
    message_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "admin", "admin"), "Logging in");
    amqp_channel_open(conn, 1); // 打开通道
    /* 同步 RPC 响应检查函数,用于获取上一个 AMQP 命令的执行结果,并判断是否成功执行。本例就是判断 amqp_channel_open 是否执行成功。 */
    message_error(amqp_get_rpc_reply(conn), "Opening channel");

    /* 声明(创建)一个 AMQP 交换机
        conn    AMQP 连接对象
        1       交换机所在的通道(通常 1)
        amqp_cstring_bytes(exchange.c_str())    交换机名称
        amqp_cstring_bytes("direct")            交换机类型(如 "direct", "fanout", "topic", "headers")
        0	amqp_boolean_t	是否为被动声明(0=否,1=是)
        0	amqp_boolean_t	是否持久化(0=否,1=是)
        0	amqp_boolean_t	是否自动删除(0=否,1=是)
        0	amqp_boolean_t	是否内部交换机(0=否,1=是)
        amqp_empty_table    交换机的额外参数,通常填 amqp_empty_table
    */
    amqp_exchange_declare(conn,1,amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes("direct"),0,0,0,0, amqp_empty_table);
    message_error(amqp_get_rpc_reply(conn), "Declaring exchange");

    while(1){
        std::cout << "please input a line message, click enter end: " << std::endl;;
        std::string input;
        std::getline(std::cin, input);  // 读取整行输入

        amqp_bytes_t message_bytes = amqp_cstring_bytes(input.c_str());

        /* 向 RabbitMQ 发送(发布)消息,生产者通过它将消息发送到交换机,然后 RabbitMQ 依据路由规则(Routing Key)将消息分发到队列。
            conn    RabbitMQ 连接
            1       通道 ID(通常 1)
            amqp_cstring_bytes(exchange.c_str())        交换机名称
            amqp_cstring_bytes(routing_key.c_str())     路由键(Routing Key)
            0	amqp_boolean_t	消息强制路由标志(0=可丢弃,1=必须投递)
            0	amqp_boolean_t	消息立即投递标志(0=正常,1=仅队列有消费者时才发送)
            NULL    amqp_basic_properties_t *	消息属性(NULL 表示无特殊属性)
            message_bytes	        消息体(要发送的数据)
        */
        int res = amqp_basic_publish(conn,1,amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes(routing_key.c_str()),0,0,NULL, message_bytes);
        if (res < 0) {
            std::cerr << "Error publishing message " << input << std::endl;
        } else {
            std::cout << "Message " << input << " published: " << res << std::endl;
        }
    }
    std::cout << "hello" <<std::endl;
    // 清理连接
    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);
    return 0;
}

注意:上例在使用 guest 用户连接时,会报异常,因为 RabbitMQ 默认对 guest 用户做了访问限制,导致 guest 账号无法从远程机器访问 RabbitMQguest 用户只能从 localhost 登录。

消费者示例:

#include <iostream>
#include <string>
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/tcp_socket.h>

// 用于处理 AMQP 错误并输出错误信息
void message_error(amqp_rpc_reply_t x, const char *context) {
    if (x.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Error in " << context << ": " << amqp_error_string2(x.library_error) << std::endl;
        return;
    }
}

int main(int argc, char *argv[]) {
    const std::string hostname = "localhost";
    const int port = 5672;
    const std::string queue = "NXTestQueues";
    const std::string exchange = "NXTest";
    const std::string routing_key = "";

    amqp_connection_state_t conn = amqp_new_connection();
    amqp_socket_t *socket = amqp_tcp_socket_new(conn);

    if (!socket) {
    }

    int status = amqp_socket_open(socket, hostname.c_str(), port);
    if (status) {
    }

    // RabbitMQ 登录
    message_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in");
    // 打开通道
    amqp_channel_open(conn, 1);
    message_error(amqp_get_rpc_reply(conn), "Opening channel");

    // 声明交换机
    amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes("direct"), 0, 0, 0, 0, amqp_empty_table);
    message_error(amqp_get_rpc_reply(conn), "Declaring exchange");

    /* 声明队列
        conn	RabbitMQ 连接
        1		通道 ID(通常 1)
        amqp_cstring_bytes(exchange.c_str())	    队列名称
        0	amqp_boolean_t	是否被动声明(0=如果不存在则创建,1=只检查是否存在)
        0	amqp_boolean_t	是否持久化(0=否,1=是)
        0	amqp_boolean_t	是否为排他队列(0=否,1=是,仅当前连接可用,断开即删除)
        1	amqp_boolean_t	是否自动删除(0=否,1=是,无消费者时自动删除)
        arguments	amqp_table_t	额外参数,通常填 amqp_empty_table
    */
    amqp_queue_declare_ok_t *q = amqp_queue_declare(conn, 1, amqp_cstring_bytes(exchange.c_str()), 0, 0, 0, 1, amqp_empty_table);
    message_error(amqp_get_rpc_reply(conn), "Declaring queue");

    // 将队列绑定到交换机,并指定路由键。
    amqp_queue_bind(conn, 1, amqp_cstring_bytes(queue.c_str()), amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes(routing_key.c_str()), amqp_empty_table);
    message_error(amqp_get_rpc_reply(conn), "Binding queue");

    /* 注册消费者,在 RabbitMQ 中订阅(消费)队列中的消息,
       它告诉 RabbitMQ:从指定队列消费消息、分配一个消费者标签(consumer tag)、等待服务器发送消息
       消费者一旦注册成功,RabbitMQ 会主动推送消息 给消费者,而不是让消费者手动拉取。
        conn	RabbitMQ 连接
        1   	amqp_channel_t	通道 ID(通常 1)
        amqp_cstring_bytes(queue.c_str())	    队列名称
        amqp_empty_bytes        消费者标签(通常填 amqp_empty_bytes 让服务器自动分配)
        0	amqp_boolean_t	是否本地消息不会消费(通常填 0)
        1	amqp_boolean_t	是否自动应答(1=自动确认,0=手动确认)
        0	amqp_boolean_t	是否独占队列(1=只有本消费者能消费,0=共享)
        arguments	amqp_table_t	额外参数(通常填 amqp_empty_table)
     */
    amqp_basic_consume(conn, 1, amqp_cstring_bytes(queue.c_str()), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
    message_error(amqp_get_rpc_reply(conn), "Consuming");
    while (true) {
        amqp_rpc_reply_t res;
        amqp_envelope_t envelope;

        // amqp_maybe_release_buffers 用于释放 AMQP 连接中的缓冲区内存,避免内存占用过大。
        amqp_maybe_release_buffers(conn);
        /* 从 RabbitMQ 服务器接收(消费),amqp_consume_message 是客户端库中消息最常用的同步消息消费函数。
            conn	amqp_connection_state_t	RabbitMQ 连接
            envelope	amqp_envelope_t *	存储接收到的消息
            nullptr	struct timeval *	超时时间(nullptr = 阻塞模式,直到收到消息)
            0	amqp_boolean_t	        标志位,通常填 0
        */
        res = amqp_consume_message(conn, &envelope, nullptr, 0);

        if (res.reply_type == AMQP_RESPONSE_NORMAL) {
            std::cout << "Received: " << std::string((char *)envelope.message.body.bytes, envelope.message.body.len) << std::endl;
            // 释放 amqp_consume_message() 获取的消息资源,防止内存泄漏。
            amqp_destroy_envelope(&envelope);
        } else {
            std::cerr << "Error consuming message" << std::endl;
            break;
        }
    }
    std::cout << "RabbitMQ Test!" << std::endl;
    return 0;
}

工作模式

  • 简单模式
  • 工作队列模式
  • 发布订阅模式
  • 路由模式
  • 通配符模式
  • 远程调用模式

TODO

数据传输可靠性分析

消费者扩展:

  • 消费者消息确认:当消费者收到消息后有两种确认方式,自动确认手动确认
    • 自动确认:指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。
    • 手动签收:即在业务处理成功再通知签收消息,如果出现异常,则拒签消息,让消息依然保留在队列当中。

TODO

其他属性

  • 消费限流:当队列中积压大量数据时,若此时开启一个消费线程,大量数据同时推送过来时,消费者若处理不过来,可能会造成线程崩溃。为了避免这一现象,RabbitMQ 提供了一种 Qos(Quality Of Service,服务质量)服务质量保证功能,即在非自动确认消息的前提下,如果一定数目的消息(预取值)未被确认之前,不再进行消费新的消息。
  • 不公平分发
  • 优先级队列
  • 消息存活时间
  • 死信队列
  • 延迟队列

TODO

参考链接:https://blog.csdn.net/weixin_45606067/article/details/134734794

集群搭建

TODO


文章作者: LSJune
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 LSJune !
评论
  目录