赞
踩
c++连接RabbitMq的库目前不多,很多朋友直接使用Rabbitmq-c封闭了类,供c++使用,也是一种方法。
经过选型和使用,我在项目中使用了AMQP-CPP,本文主要介绍AMQP-CPP库的使用。
AMQP-CPP是用于与RabbitMq消息中间件通信的c++库。它能解析从RabbitMq服务发送来的数据,也可以生成发向RabbitMq的数据包。
该库使用分层架构,使得用户可以按需实现网络层。这里,AMQP-CPP库就不会向RabbitMq建立网络连接,所有的网络io由用户完成。
当然,AMQP-CPP提供了可选的网络层接口,它预定义了TCP和TLS模块,用户就不用自己实现网络io。
AMQP-CPP完全异步,没有阻塞式的系统调用,不使用线程就能够应用在高性能应用中。注意:它需要c++11的支持。
AMQP-CPP有一个可选的仅支持Linux的TCP模块,它的作用是处理网络层io操作,如果需要这个模块,链接时需要加上 -lpthread -ldl
。
支持两种编译方式:make, cmake。
目前Windows上不支持共享库。Linux支持静态库和共享库。在Windows上编译时需要定义 NOMINMAX
宏。
有两个头文件需要使用:
记住:TCP模块仅支持Linux系统!
安装完成后,编译应用程序时使用-lamqpcpp
,对于使用TCP模块的应用,还需要 -lpthread -ldl
。
如前所述,AMQP-CPP有两种模式,本文主要介绍Linux下带有TCP模块的使用。关于Windows下的使用,请参考 AMQP-CPP在Windows下的使用及网络层Boost Asio实现 。
AMQP::TcpHandler
的类。它负责网络层的TCP连接。需要重写相关的函数,其中必须重写monitor()
函数,如下:#include <amqpcpp.h> #include <amqpcpp/linux_tcp.h> class MyTcpHandler : public AMQP::TcpHandler { /** * Method that is called by the AMQP library when a new connection * is associated with the handler. This is the first call to your handler * @param connection The connection that is attached to the handler */ virtual void onAttached(AMQP::TcpConnection *connection) override { // @todo // add your own implementation, for example initialize things // to handle the connection. } /** * Method that is called by the AMQP library when the TCP connection * has been established. After this method has been called, the library * still has take care of setting up the optional TLS layer and of * setting up the AMQP connection on top of the TCP layer., This method * is always paired with a later call to onLost(). * @param connection The connection that can now be used */ virtual void onConnected(AMQP::TcpConnection *connection) override { // @todo // add your own implementation (probably not needed) } /** * Method that is called when the secure TLS connection has been established. * This is only called for amqps:// connections. It allows you to inspect * whether the connection is secure enough for your liking (you can * for example check the server certicate). The AMQP protocol still has * to be started. * @param connection The connection that has been secured * @param ssl SSL structure from openssl library * @return bool True if connection can be used */ virtual bool onSecured(AMQP::TcpConnection *connection, const SSL *ssl) override { // @todo // add your own implementation, for example by reading out the // certificate and check if it is indeed yours return true; } /** * Method that is called by the AMQP library when the login attempt * succeeded. After this the connection is ready to use. * @param connection The connection that can now be used */ virtual void onReady(AMQP::TcpConnection *connection) override { // @todo // add your own implementation, for example by creating a channel // instance, and start publishing or consuming } /** * Method that is called by the AMQP library when a fatal error occurs * on the connection, for example because data received from RabbitMQ * could not be recognized, or the underlying connection is lost. This * call is normally followed by a call to onLost() (if the error occured * after the TCP connection was established) and onDetached(). * @param connection The connection on which the error occured * @param message A human readable error message */ virtual void onError(AMQP::TcpConnection *connection, const char *message) override { // @todo // add your own implementation, for example by reporting the error // to the user of your program and logging the error } /** * Method that is called when the AMQP protocol is ended. This is the * counter-part of a call to connection.close() to graceful shutdown * the connection. Note that the TCP connection is at this time still * active, and you will also receive calls to onLost() and onDetached() * @param connection The connection over which the AMQP protocol ended */ virtual void onClosed(AMQP::TcpConnection *connection) override { // @todo // add your own implementation (probably not necessary, but it could // be useful if you want to do some something immediately after the // amqp connection is over, but do not want to wait for the tcp // connection to shut down } /** * Method that is called when the TCP connection was closed or lost. * This method is always called if there was also a call to onConnected() * @param connection The connection that was closed and that is now unusable */ virtual void onLost(AMQP::TcpConnection *connection) override { // @todo // add your own implementation (probably not necessary) } /** * Final method that is called. This signals that no further calls to your * handler will be made about the connection. * @param connection The connection that can be destructed */ virtual void onDetached(AMQP::TcpConnection *connection) override { // @todo // add your own implementation, like cleanup resources or exit the application } /** * Method that is called by the AMQP-CPP library when it wants to interact * with the main event loop. The AMQP-CPP library is completely non-blocking, * and only make "write()" or "read()" system calls when it knows in advance * that these calls will not block. To register a filedescriptor in the * event loop, it calls this "monitor()" method with a filedescriptor and * flags telling whether the filedescriptor should be checked for readability * or writability. * * @param connection The connection that wants to interact with the event loop * @param fd The filedescriptor that should be checked * @param flags Bitwise or of AMQP::readable and/or AMQP::writable */ virtual void monitor(AMQP::TcpConnection *connection, int fd, int flags) override { // @todo // add your own implementation, for example by adding the file // descriptor to the main application event loop (like the select() or // poll() loop). When the event loop reports that the descriptor becomes // readable and/or writable, it is up to you to inform the AMQP-CPP // library that the filedescriptor is active by calling the // connection->process(fd, flags) method. } };
monitor()
函数是AMQP描述符和应用程序进程交互的桥梁。对于libev, libevent, libuv等常用的事件循环库,都有例程可参考。
其它函数都有默认实现,但仍建议自行实现onError(), onDetached()
等记录日志和清理资源。
// create an instance of your own tcp handler MyTcpHandler myHandler; // address of the server AMQP::Address address("amqp://guest:guest@localhost/vhost"); // create a AMQP connection object AMQP::TcpConnection connection(&myHandler, address); // and create a channel AMQP::TcpChannel channel(&connection); // use the channel object to call the AMQP method you like channel.declareExchange("my-exchange", AMQP::fanout); channel.declareQueue("my-queue"); channel.bindQueue("my-exchange", "my-queue", "my-routing-key");
以libev为例,不必自行实现monitor()函数,可以直接使用AMQP::LibEvHandler,示例代码如下:
#include <ev.h> #include <amqpcpp.h> #include <amqpcpp/libev.h> int main() { // access to the event loop auto *loop = EV_DEFAULT; // handler for libev (so we don't have to implement AMQP::TcpHandler!) AMQP::LibEvHandler handler(loop); // make a connection AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://localhost/")); // we need a channel too AMQP::TcpChannel channel(&connection); // create a temporary queue channel.declareQueue(AMQP::exclusive).onSuccess([&connection](const std::string &name, uint32_t messagecount, uint32_t consumercount) { // report the name of the temporary queue std::cout << "declared queue " << name << std::endl; // now we can close the connection connection.close(); }); // run the loop ev_run(loop, 0); // done return 0; }
AMQP::LibEvHandler派生自AMQP::TcpHandler,已经实现了monitor()方法,该方法把fd添加到事件循环。建议不要直接使用它,而是自行创建一个继承自它的类,并实现那些虚方法。
AMQP 支持心跳,如果使能了心跳,客户端和服务端会协商心跳间隔。正常的通信数据通常满足保持连接存活的需求,但协调时间到后,若仍然没有数据,就会发送心跳数据包。
AMQP-CPP 库默认关闭了心跳(服务器通常建议间隔为60s发送心跳),所以在使用中,可以安全地将连接保持在长空闲状态,不必担心连接会因为空闲时间太长而中断。
也可以通过在继承自TcpHandler 的类中实现 onNegotiate() 方法开启心跳,在该函数中返回心跳的间隔。
一旦开启,就得自行调用 connection->heartbeat() 向RabbitMq发送心跳数据,并且在持续无心跳到来时关闭连接。
如果使用AMQP::LibEvHandler的默认实现,心跳已经开启,所有的检查会自动进行。
通道是虚拟连接,一个连接上可以建立多个通道,这个前面的文章已经说过,不再重复。
所有的RabbitMq指令都是通过通道传输,所以连接建立后的第一步,就是建立通道。
因为所有操作是异步的,所以在通道上执行指令的返回值并不能作为操作执行结果的依据,实际上它返回的是 Deferred 类,可以使用它“安装”处理函数。如:
// create a channel (or use TcpChannel if you're using the Tcp module)
Channel myChannel(&connection);
// declare an exchange, and install callbacks for success and failure
myChannel.declareExchange("my-exchange")
.onSuccess([]() {
// by now the exchange is created
})
.onError([](const char *message) {
// something went wrong creating the exchange
});
如果对Lambda还不熟悉,建议阅读 c++中的Lambda表达式
也可以为channel安装处理函数,如:
// create a channel (or use TcpChannel if you use the Tcp module) Channel myChannel(&connection); // install a generic channel-error handler that will be called for every // error that occurs on the channel myChannel.onError([](const char *message) { // report error std::cout << "channel error: " << message << std::endl; }); // install a generic callback that will be called when the channel is ready // for sending the first instruction myChannel.onReady([]() { // send the first instructions (like publishing messages) });
注意,通道上发生的任何错误会使整个通道失效,包括已经发送但尚未执行的指令。这意味着,如果连续发送了多个指令,而第一个指令失败的话,后续指令也不会执行。
通常建议对不同的消息类型,分别建立通道收发消息,这个一个消息的失败不会影响到其他通道的消息。
在声明queue时,常常有特性需要添加,以 Channel::declareQueue() 方法为例:
/** * Declare a queue * * If you do not supply a name, a name will be assigned by the server. * * The flags can be a combination of the following values: * * - durable queue survives a broker restart * - autodelete queue is automatically removed when all connected consumers are gone * - passive only check if the queue exist * - exclusive the queue only exists for this connection, and is automatically removed when connection is gone * * @param name name of the queue * @param flags combination of flags * @param arguments optional arguments */ DeferredQueue &declareQueue(const std::string &name, int flags, const Table &arguments); DeferredQueue &declareQueue(const std::string &name, const Table &arguments); DeferredQueue &declareQueue(const std::string &name, int flags = 0); DeferredQueue &declareQueue(int flags, const Table &arguments); DeferredQueue &declareQueue(const Table &arguments); DeferredQueue &declareQueue(int flags = 0);
同样,可以为它的返回值安装回调函数,如:
// create a custom callback
auto callback = [](const std::string &name, int msgcount, int consumercount) {
// @todo add your own implementation
};
// declare the queue, and install the callback that is called on success
channel.declareQueue("myQueue").onSuccess(callback);
可以在声明queue时指定它的属性,如持久性、自动删除等,只需要把flags位按文档改写即可:AMQP::durable + AMQP::autodelete
。
最简单的发送消息只需要提供三个参数:exchange, routing-key, msg。
还有多种形式可选:
/** * Publish a message to an exchange * * You have to supply the name of an exchange and a routing key. RabbitMQ will * then try to send the message to one or more queues. With the optional flags * parameter you can specify what should happen if the message could not be routed * to a queue. By default, unroutable message are silently discarded. * * This method returns a reference to a DeferredPublisher object. You can use * this returned object to install callbacks that are called when an undeliverable * message is returned, or to set the callback that is called when the server * confirms that the message was received. * * To enable handling returned messages, or to enable publisher-confirms, you must * not only set the callback, but also pass in appropriate flags to enable this * feature. If you do not pass in these flags, your callbacks will not be called. * If you are not at all interested in returned messages or publish-confirms, you * can ignore the flag and the returned object. * * Watch out: the channel returns _the same_ DeferredPublisher object for all * calls to the publish() method. This means that the callbacks that you install * for the first published message are also used for subsequent messages _and_ * it means that if you install a different callback for a later publish * operation, it overwrites your earlier callbacks * * The following flags can be supplied: * * - mandatory If set, server returns messages that are not sent to a queue * - immediate If set, server returns messages that can not immediately be forwarded to a consumer. * * @param exchange the exchange to publish to * @param routingkey the routing key * @param envelope the full envelope to send * @param message the message to send * @param size size of the message * @param flags optional flags */ DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const Envelope &envelope, int flags = 0) { return _implementation->publish(exchange, routingKey, envelope, flags); } DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const std::string &message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message.data(), message.size()), flags); } DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, size_t size, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, size), flags); } DeferredPublisher &publish(const std::string &exchange, const std::string &routingKey, const char *message, int flags = 0) { return _implementation->publish(exchange, routingKey, Envelope(message, strlen(message)), flags); }
也可以使用事务特性,保证所有的指令要么全部执行成功,要么都不执行;
// start a transaction channel.startTransaction(); // publish a number of messages channel.publish("my-exchange", "my-key", "my first message"); channel.publish("my-exchange", "my-key", "another message"); // commit the transactions, and set up callbacks that are called when // the transaction was successful or not channel.commitTransaction() .onSuccess([]() { // all messages were successfully published }) .onError([](const char *message) { // none of the messages were published // now we have to do it all over again });
注意,这里的事务没有数据库中的事务功能那么强大,它不能包含任意的指令,而只在发送和接收消息时才有意义。
从RabbitMq接收消息称为消费。调用 Channel::consume()方法后,RabbitMq就会把消息推送过来。
详细声明如下:
/** * Tell the RabbitMQ server that we're ready to consume messages * * After this method is called, RabbitMQ starts delivering messages to the client * application. The consume tag is a string identifier that will be passed to * each received message, so that you can associate incoming messages with a * consumer. If you do not specify a consumer tag, the server will assign one * for you. * * The following flags are supported: * * - nolocal if set, messages published on this channel are * not also consumed * * - noack if set, consumed messages do not have to be acked, * this happens automatically * * - exclusive request exclusive access, only this consumer can * access the queue * * The callback registered with DeferredConsumer::onSuccess() will be called when the * consumer has started. * * @param queue the queue from which you want to consume * @param tag a consumer tag that will be associated with this consume operation * @param flags additional flags * @param arguments additional arguments * @return bool */ DeferredConsumer &consume(const std::string &queue, const std::string &tag, int flags, const AMQP::Table &arguments); DeferredConsumer &consume(const std::string &queue, const std::string &tag, int flags = 0); DeferredConsumer &consume(const std::string &queue, const std::string &tag, const AMQP::Table &arguments); DeferredConsumer &consume(const std::string &queue, int flags, const AMQP::Table &arguments); DeferredConsumer &consume(const std::string &queue, int flags = 0); DeferredConsumer &consume(const std::string &queue, const AMQP::Table &arguments);
它的回调函数示例如下:
// callback function that is called when the consume operation starts auto startCb = [](const std::string &consumertag) { std::cout << "consume operation started" << std::endl; }; // callback function that is called when the consume operation failed auto errorCb = [](const char *message) { std::cout << "consume operation failed" << std::endl; }; // callback operation when a message was received auto messageCb = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { std::cout << "message received" << std::endl; // acknowledge the message channel.ack(deliveryTag); }; // start consuming from the queue, and install the callbacks channel.consume("my-queue") .onReceived(messageCb) .onSuccess(startCb) .onError(errorCb);
注意,onSuccess()仅表明消费消息的操作开始,而onReceived才代表收到了数据。
AMQP::Message类包含了消息的全部内容,包括真实数据,消息头,甚至exchange和routingkey。
deliveryTag是收到消息后的特定标识符,使用它返回响应。RabbitMq会在接收到响应后删除消息,如果忘记执行 Channel:ack() 方法,RabbitMq会因为内存用尽而崩溃。
消息的消费是持续进行的,直到使用 Channel::cancel() 停止消费。如果关闭通道或者TCP连接,消费也会停止。
可以通过QOS防止消费者被大量消息搞崩溃。QOS决定了消费者可以持有的未发送ack的数据量,RabbitMq会暂存后续的消息。该属性通过 Channel::setQos() 设置。
AMQP-CPP 异步、灵活、高效的特性使得应用者可以按需使用。
它的使用流程也清晰明了,容易上手。
缺点是需要手工做一些事情,如派生类并重写一些函数,如果使用windows则需要自行实现网络层IO操作。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。