云计算百科
云计算领域专业知识百科平台

RabbitMq C++客户端的使用

介绍

RabbitMQ 是一个开源的消息代理和队列服务器,用于在分布式系统之间传递消息。它实现了高级消息队列协议(AMQP),同时也支持其他协议如 STOMP、MQTT 等。

核心概念

  • Producer(生产者): 发送消息的应用程序

  • Consumer(消费者): 接收消息的应用程序

  • Queue(队列): 存储消息的缓冲区

  • Exchange(交换机): 接收生产者发送的消息并根据规则路由到队列

  • Binding(绑定): 连接交换机和队列的规则

  • Message(消息): 包含有效载荷(payload)和标签(label)的数据

  • 安装

    安装 RabbitMQ

    sudo apt install rabbitmq-server

    RabbitMQ
    的简单使用
    # 启动服务
    sudo systemctl start rabbitmq-server.service
    # 查看服务状态
    sudo systemctl status rabbitmq-server.service
    # 安装完成的时候默认有个用户 guest ,但是权限不够,要创建一个
    administrator 用户,才可以做为远程登录和发表订阅消息:
    #添加用户
    sudo rabbitmqctl add_user root 123456
    #设置用户 tag
    sudo rabbitmqctl set_user_tags root administrator
    #设置用户权限
    sudo rabbitmqctl set_permissions -p / root "." "." ".*"
    # RabbitMQ 自带了 web 管理界面,执行下面命令开启
    sudo rabbitmq-plugins enable rabbitmq_management

    访问
    webUI
    界面的默认端口为
    15672。

    安装 RabbitMQ C++客户端库

    • C 语言库:https://github.com/alanxz/rabbitmq-c
    • C++库:https://github.com/CopernicaMarketingSoftware/AMQP-CPP/tree/master

    AMQP 是一个开放标准的应用层协议,专为面向消息的中间件设计,RabbitMQ 是其最著名的实现之一。
    我们这里使用
    AMQP-CPP
    库来编写客户端程序。

     
    sudo apt install libev-dev #libev 网络库组件
    git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
    cd AMQP-CPP/
    make
    make install

    安装报错:

    /usr/include/openssl/macros.h:147:4: error: #error
    "OPENSSL_API_COMPAT expresses an impossible API compatibility level"
    147 | # error "OPENSSL_API_COMPAT expresses an impossible API
    compatibility level" | ^~~~~
    In file included from /usr/include/openssl/ssl.h:18,
    from linux_tcp/openssl.h:20,
    from linux_tcp/openssl.cpp:12:
    /usr/include/openssl/bio.h:687:1: error: expected constructor,
    destructor, or type conversion before ‘DEPRECATEDIN_1_1_0’
    687 | DEPRECATEDIN_1_1_0(int BIO_get_port(const char *str, unsigned short *port_ptr))

    这种错误,表示
    ssl
    版本出现问题。

    解决方案:卸载当前的
    ssl
    库,重新进行修复安装

    sudo dpkg -P –force-all libevent-openssl-2.1-7
    sudo dpkg -P –force-all openssl
    sudo dpkg -P –force-all libssl-dev
    sudo apt –fix-broken install

    修复后,重新进行
    make。

    AMQP-CPP 库的简单使用

    介绍

    • AMQP-CPP 是用于与 RabbitMq 消息中间件通信的 c++库。它能解析从 RabbitMq 服务发送来的数据,也可以生成发向 RabbitMq 的数据包。AMQP-CPP 库不会向 RabbitMq 建立网络连接,所有的网络 io 由用户完成。
    • 当然,AMQP-CPP 提供了可选的网络层接口,它预定义了 TCP 模块,用户就不用自己实现网络 io,我们也可以选择 libeventlibevlibuvasio 等异步通信组件,需要手动安装对应的组件。
    • AMQP-CPP 完全异步,没有阻塞式的系统调用,不使用线程就能够应用在高性能应用中。
    • 注意:它需要 c++17 的支持。

    使用

    AMQP-CPP
    的使用有两种模式:

    • 使用默认的 TCP 模块进行网络通信
    • 使用扩展的 libeventlibevlibuvasio 异步通信组件进行通信

    TCP 模式

    该模式下需要实现一个类继承自 AMQP::TcpHandler
    类, 它负责网络层的
    TCP
    连接,重写相关函数, 其中必须重写 monitor 函数。在 monitor 函数中需要实现的是将
    fd
    放入
    eventloop(select

    epoll)
    中监控, 当
    fd 可写可读就绪之后, 调用 AMQP-CPP

    connection->process(fd, flags)
    方法

    TCP 模式使用较为麻烦,不过提供了灵活的网络层集成能力,可以根据项目需求选择合适的网络库进行集成。在实际应用中,建议结合事件循环库(如libuv、Boost.Asio等)使用以获得最佳性能。

    扩展模式


    libev
    为例, 我们不必要自己实现 monitor 函数,可以直接使用 AMQP::LibEvHandler。

    常用类与接口介绍

    Channel

    channel
    是一个虚拟连接,一个连接上可以建立多个通道。并且所有的
    RabbitMq
    指令都是通过 channel
    传输,所以连接建立后的第一步,就是建立
    channel
    。因为所有操作是异步的,所以在 channel
    上执行指令的返回值并不能作为操作执行结果,实际上它返回的是 Deferred
    类,可以使用它安装处理函数。

    namespace AMQP
    {
    using SuccessCallback = std::function<void()>;
    using ErrorCallback = std::function<void(const char *message)>;
    using FinalizeCallback = std::function<void()>;

    using QueueCallback = std::function<void(const std::string &name,
    uint32_t messagecount, uint32_t consumercount)>;
    using DeleteCallback = std::function<void(uint32_t deletedmessages)>;

    using MessageCallback = std::function<void(const Message &message,
    uint64_t deliveryTag, bool redelivered)>;
    // 当使用发布者确认时,当服务器确认消息已被接收和处理时,将调用AckCallback
    using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>;
    // 使用确认包裹通道时,当消息被 ack/nacked 时,会调用这些回调
    using PublishAckCallback = std::function<void()>;
    using PublishNackCallback = std::function<void()>;
    using PublishLostCallback = std::function<void()>;
    class Channel
    {
    Channel(Connection *connection);
    bool connected();

    // 声明交换机,如果提供了一个空名称,则服务器将分配一个名称。
    // @param name 交换机的名称
    // @param type 交换类型
    // enum ExchangeType
    // {
    // fanout, 广播交换,绑定的队列都能拿到消息
    // direct, 直接交换,只将消息交给 routingkey 一致的队列
    // topic, 主题交换,将消息交给符合 bindingkey 规则的队列
    // headers,
    // consistent_hash,
    // message_deduplication
    // };
    // @param flags 交换机标志
    // 以下 flags 可用于交换机:
    // *-durable 持久化,重启后交换机依然有效
    // *-autodelete 删除所有连接的队列后,自动删除交换
    // *-passive 仅被动检查交换机是否存在
    // *-internal 创建内部交换
    // @param arguments 其他参数

    Deferred &declareExchange(const std::string_view &name,
    ExchangeType type, int flags, const Table &arguments);

    // 声明队列,如果不提供名称,服务器将分配一个名称。
    // @param name 队列的名称
    // @param flags 标志组合
    // flags 可以是以下值的组合:
    // -durable 持久队列在代理重新启动后仍然有效
    // -autodelete 当所有连接的使用者都离开时,自动删除队列
    // -passive 仅被动检查队列是否存在*-exclusive 队列仅存在于此连接,并且在连接断开时自动删除
    // @param arguments 可选参数

    DeferredQueue &declareQueue(const std::string_view &name,
    int flags, const Table &arguments);

    // 将队列绑定到交换机
    // @param exchange 源交换机
    // @param queue 目标队列
    // @param routingkey 路由密钥
    // @param arguments 其他绑定参数

    Deferred &bindQueue(const std::string_view &exchange, const std::string_view &queue,
    const std::string_view &routingkey, const Table &arguments);

    // 将消息发布到 exchange,必须提供交换机的名称和路由密钥。然后RabbitMQ 将尝试将消息发送到一个或多个队列。
    // 使用可选的 flags 参数,可以指定如果消息无法路由到队列时应该发生的情况。
    // @param exchange 要发布到的交易所
    // @param routingkey 路由密钥
    // @param envelope 要发送的完整信封
    // @param message 要发送的消息
    // @param size 消息的大小
    // @param flags 可选标志
    // 可以提供以下 flags:
    // -mandatory 如果设置,服务器将返回未发送到队列的消息
    // -immediate 如果设置,服务器将返回无法立即转发给使用者的消息。
    bool publish(const std::string_view &exchange, const std::string_view &routingKey,
    const std::string &message, int flags = 0);

    // 告诉 RabbitMQ 服务器我们已准备好使用消息-也就是订阅队列消息,调用此方法后,RabbitMQ 开始向客户端应用程序传递消息。
    // @param queue 您要使用的队列
    // @param tag 将与此消费操作关联的消费者标记
    // consumer tag 是一个字符串标识符,如果以后想通过 channel::cancel()调用停止它,可以使用它来标识使用者。
    // 如果您没有指定使用者 tag,服务器将为您分配一个。
    // @param flags 其他标记
    // @param arguments 其他参数
    // 支持以下 flags:
    // -nolocal 如果设置了,则不会同时消耗在此通道上发布的消息
    // -noack 如果设置了,则不必对已消费的消息进行确认
    // -exclusive 请求独占访问,只有此使用者可以访问队列

    DeferredConsumer &consume(const std::string_view &queue,
    const std::string_view &tag, int flags, const Table &arguments);

    // 确认接收到的消息,当在 DeferredConsumer::onReceived()方法中接收到消息时,必须确认该消息,
    // 以便 RabbitMQ 将其从队列中删除(除非使用 noack 选项消费)。
    // @param deliveryTag 消息的唯一 delivery 标签
    // @param flags 可选标志
    bool ack(uint64_t deliveryTag, int flags = 0);
    };
    class DeferredConsumer
    {
    // 注册一个回调函数,该函数在消费者启动时被调用。
    DeferredConsumer &onSuccess(const ConsumeCallback &callback);
    // 注册回调函数,用于接收到一个完整消息的时候被调用
    void MessageCallback(const AMQP::Message &message, uint64_t deliveryTag, bool redelivered);
    DeferredConsumer &onReceived(const MessageCallback &callback);
    DeferredConsumer &onMessage(const MessageCallback &callback);
    };
    class Message : public Envelope
    {
    const std::string &exchange();
    const std::string &routingkey();
    };
    class Envelope : public MetaData
    {
    const char *body();
    uint64_t bodySize();
    };
    }

    ev

    typedef struct ev_async
    {
    EV_WATCHER(ev_async);
    EV_ATOMIC_T sent; /* private */
    } ev_async;
    // break type
    enum
    {
    EVBREAK_CANCEL = 0, /* undo unloop */
    EVBREAK_ONE = 1, /* unloop once */
    EVBREAK_ALL = 2 /* unloop all loops */
    };
    struct ev_loop *ev_default_loop(unsigned int flags EV_CPP(= 0));
    #define EV_DEFAULT ev_default_loop(0)
    int ev_run(struct ev_loop *loop);
    void ev_break(struct ev_loop *loop, int32_t break_type);
    void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents);
    void ev_async_init(ev_async *w, callback cb);
    void ev_async_start(struct ev_loop *loop, ev_async *w);
    void ev_async_send(struct ev_loop *loop, ev_async *w);

    使用案例

    二次封装思想:

    实现一台主机将消息发布给另一台主机进行处理的功能,可以对 mq 的操作进行简单的封装,使 mq 的操作更加简便,封装一个 MQClient

    • 提供声明指定交换机与队列,并进行绑定的功能
    • 提供向指定交换机发布消息的功能
    • 提供订阅指定队列消息,并设置回调函数进行消息消费处理的功能

    rabbitmq.hpp

    #pragma once
    #include <ev.h>
    #include <amqpcpp.h>
    #include <amqpcpp/libev.h>
    #include <openssl/ssl.h>
    #include <openssl/opensslv.h>
    #include <iostream>
    #include <functional>
    #include "logger.hpp"

    class MQClient
    {
    public:
    using ptr = std::shared_ptr<MQClient>;
    using MessageCallback = std::function<void(const char *, size_t)>;
    MQClient(const std::string &user, const std::string &password, const std::string &host)
    {
    // 1.实例化底层网络通信框架的IO事件监控句柄
    _loop = EV_DEFAULT;
    // 2.实例化LibEvHandler句柄,将AMQP框架与事件监控关联起来
    _handler = std::make_unique<AMQP::LibEvHandler>(_loop);
    // 3.实例化连接对象
    // amqp://root:2162627569@127.0.0.1:5672/
    std::string url = "amqp://" + user + ":" + password + "@" + host + "/";
    AMQP::Address address(url);
    _connection = std::make_unique<AMQP::TcpConnection>(_handler.get(), address);
    // 4.实例化信道对象
    _channel = std::make_unique<AMQP::TcpChannel>(_connection.get());
    // 5.启动底层网络通信框架,开启IO
    _loop_thread = std::thread([this]()
    { ev_run(_loop, 0); });
    }
    ~MQClient()
    {
    ev_async_init(&_async_watcher, watcher_callback);
    ev_async_start(_loop, &_async_watcher);
    ev_async_send(_loop, &_async_watcher);
    _loop_thread.join();
    _loop = nullptr;
    }
    void declareComponents(const std::string &exchange, const std::string &queue,
    const std::string &routing_key = "routing_key", AMQP::ExchangeType exchange_type = AMQP::ExchangeType::direct)
    {
    // 声明交换机
    _channel->declareExchange(exchange, exchange_type)
    .onError([&exchange](const char *msg)
    {
    LOG_ERROR("{}交换机创建失败:{}",exchange,msg);
    exit(1); })
    .onSuccess([&exchange]()
    { LOG_INFO("{}交换机创建成功!", exchange); });
    // 声明队列
    _channel->declareQueue(queue)
    .onError([&queue](const char *msg)
    {
    LOG_ERROR("{}队列创建失败:{}",queue,msg);
    exit(1); })
    .onSuccess([&queue]()
    { LOG_INFO("{}队列创建成功!", queue); });
    // 6.绑定交换机和队列
    _channel->bindQueue(exchange, queue, routing_key)
    .onError([&exchange, &queue](const char *msg)
    {
    LOG_ERROR("{} – {}绑定失败:{}",exchange,queue,msg);
    exit(1); })
    .onSuccess([&exchange, &queue]()
    { LOG_INFO("{} – {}绑定成功!", exchange, queue); });
    }
    bool publish(const std::string &exchange, const std::string &msg, const std::string &routing_key = "routing_key")
    {
    LOG_DEBUG("向交换机 {}-{} 发布消息!", exchange, routing_key);
    bool ret = _channel->publish(exchange, routing_key, msg);
    if (ret == false)
    {
    LOG_ERROR("{} 发布消息失败:", exchange);
    return false;
    }
    return true;
    }
    void consume(const std::string &queue, const MessageCallback &cb)
    {
    LOG_DEBUG("开始订阅 {} 队列消息!", queue);
    _channel->consume(queue, "consume-tags")
    .onReceived([this, &cb](const AMQP::Message &message, uint32_t deliveryTag, bool redelivered)
    {
    cb(message.body(),message.bodySize());
    _channel->ack(deliveryTag); })
    .onError([&queue](const char *message)
    {
    LOG_ERROR("订阅 {} 队列消息失败: {}", queue, message);
    exit(1); });
    }

    private:
    static void watcher_callback(struct ev_loop *loop, ev_async *watcher, int32_t revents)
    {
    ev_break(loop, EVBREAK_ALL);
    }

    private:
    struct ev_async _async_watcher;
    struct ev_loop *_loop;
    std::unique_ptr<AMQP::LibEvHandler> _handler;
    std::unique_ptr<AMQP::TcpConnection> _connection;
    std::unique_ptr<AMQP::TcpChannel> _channel;
    std::thread _loop_thread;
    };

    consume.cc

    #include "rabbitmq.hpp"
    #include "logger.hpp"
    #include <gflags/gflags.h>

    DEFINE_string(user, "root", "rabbitmq访问用户名");
    DEFINE_string(password, "2162627569", "rabbitmq访问密码");
    DEFINE_string(host, "127.0.0.1:5672", "rabbitmq服务器地址信息 host:port");

    DEFINE_bool(run_mode, false, "程序的运行模式,false-调试; true-发布;");
    DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
    DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级");

    void callback(const char *body, size_t sz)
    {
    std::string msg;
    msg.assign(body, sz);
    LOG_DEBUG("{}", msg);
    }
    int main(int argc, char *argv[])
    {
    google::ParseCommandLineFlags(&argc, &argv, true);
    init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);

    MQClient client(FLAGS_user, FLAGS_password, FLAGS_host);
    client.declareComponents("test-exchange", "test-queue", "test-queue-key");
    client.consume("test-queue", callback);
    std::this_thread::sleep_for(std::chrono::seconds(60));
    return 0;
    }

    publish.cc

    #include "rabbitmq.hpp"
    #include "logger.hpp"
    #include <gflags/gflags.h>

    DEFINE_string(user, "root", "rabbitmq访问用户名");
    DEFINE_string(password, "2162627569", "rabbitmq访问密码");
    DEFINE_string(host, "127.0.0.1:5672", "rabbitmq服务器地址信息 host:port");

    DEFINE_bool(run_mode, false, "程序的运行模式,false-调试; true-发布;");
    DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
    DEFINE_int32(log_level, 0, "发布模式下,用于指定日志输出等级");

    void callback(const char *body, size_t sz)
    {
    std::string msg;
    msg.assign(body, sz);
    LOG_DEBUG("{}", msg);
    }
    int main(int argc, char *argv[])
    {
    google::ParseCommandLineFlags(&argc, &argv, true);
    init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);

    MQClient client(FLAGS_user, FLAGS_password, FLAGS_host);
    client.declareComponents("test-exchange", "test-queue", "test-queue-key");
    for (int i = 0; i < 10; i++)
    {
    std::string msg = "hello world – " + std::to_string(i);
    bool ret = client.publish("test-exchange", msg, "test-queue-key");
    if (ret == false)
    {
    std::cout << "publish 失败!\\n";
    }
    }
    std::this_thread::sleep_for(std::chrono::seconds(3));
    return 0;
    }

    makefile

    all:publish consume
    publish:publish.cc
    g++ -g -o $@ $^ -std=c++17 -lamqpcpp -lev -lspdlog -lfmt -lgflags
    consume:consume.cc
    g++ -g -o $@ $^ -std=c++17 -lamqpcpp -lev -lspdlog -lfmt -lgflags

    .PHONY:clean
    clean:
    rm -f publish consume

    赞(0)
    未经允许不得转载:网硕互联帮助中心 » RabbitMq C++客户端的使用
    分享到: 更多 (0)

    评论 抢沙发

    评论前必须登录!