当前位置:   article > 正文

仿RabbitMq实现简易消息队列基础篇(Muduo库的使用)

仿RabbitMq实现简易消息队列基础篇(Muduo库的使用)
@TOC

Muduo库简介

Muduo由陈硕⼤佬开发,是⼀个基于⾮阻塞IO和事件驱动的C++⾼并发TCP⽹络编程库。他是一款基于主从Reactor模型的网络库,其使用的线程模型是one loop per thread, 所谓 one loop per thread 指的是:

  • 一个线程只能有一个事件循环(EventLoop),用于相应计时器和IO时间
  • 一个文件描述符只能由一个线程进行读写,换句话说就是一个TCP连接必须归属某个EventLoop管理

Muduo库常见接口介绍

muduo::net::TcpServer类基础介绍

  1. typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;
  2. typedef std::function<void(const TcpConnectionPtr &)> ConnectionCallback;
  3. typedef std::function<void(const TcpConnectionPtr &,
  4. Buffer *,
  5. Timestamp)>
  6. MessageCallback;
  7. class InetAddress : public muduo::copyable
  8. {
  9. public:
  10. InetAddress(StringArg ip, uint16_t port, bool ipv6 = false);
  11. };
  12. class TcpServer : noncopyable
  13. {
  14. public:
  15. enum Option
  16. {
  17. kNoReusePort,
  18. kReusePort,
  19. };
  20. TcpServer(EventLoop *loop,
  21. const InetAddress &listenAddr,
  22. const string &nameArg,
  23. Option option = kNoReusePort);
  24. void setThreadNum(int numThreads);
  25. void start();
  26. /// 当⼀个新连接建⽴成功的时候被调⽤
  27. void setConnectionCallback(const ConnectionCallback &cb)
  28. {
  29. connectionCallback_ = cb;
  30. }
  31. /// 消息的业务处理回调函数---这是收到新连接消息的时候被调⽤的函数
  32. void setMessageCallback(const MessageCallback &cb)
  33. {
  34. messageCallback_ = cb;
  35. }
  36. };

muduo::net::EventLoop 类基础介绍

  1. class EventLoop : noncopyable
  2. {
  3. public:
  4. /// 无限循环。
  5. /// 必须在与对象创建相同的线程中调用。
  6. void loop();
  7. /// 退出循环。
  8. /// 如果通过原始指针调用,这不是 100% 线程安全,
  9. /// 最好通过 shared_ptr<EventLoop> 调用以保证 100% 安全。
  10. void quit();
  11. TimerId runAt(Timestamp time, TimerCallback cb);
  12. /// 在 @c delay 秒后运行回调。
  13. /// 线程安全,可以从其他线程调用。
  14. TimerId runAfter(double delay, TimerCallback cb);
  15. /// 每隔 @c interval 秒运行一次回调。
  16. /// 线程安全,可以从其他线程调用。
  17. TimerId runEvery(double interval, TimerCallback cb);
  18. /// 取消定时器。
  19. /// 线程安全,可以从其他线程调用。
  20. void cancel(TimerId timerId);
  21. private:
  22. std::atomic<bool> quit_;
  23. std::unique_ptr<Poller> poller_;
  24. mutable MutexLock mutex_;
  25. std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);
  26. };

muduo::net::TcpConnection 类基础介绍

  1. class TcpConnection : noncopyable,
  2. public std::enable_shared_from_this<TcpConnection>
  3. {
  4. public:
  5. /// 使用已连接的 sockfd 构造 TcpConnection
  6. ///
  7. /// 用户不应创建此对象。
  8. TcpConnection(EventLoop *loop,
  9. const string &name,
  10. int sockfd,
  11. const InetAddress &localAddr,
  12. const InetAddress &peerAddr);
  13. /// 检查连接是否已建立
  14. bool connected() const { return state_ == kConnected; }
  15. /// 检查连接是否已断开
  16. bool disconnected() const { return state_ == kDisconnected; }
  17. /// 发送消息(右值引用,C++11
  18. void send(string &&message); // C++11
  19. /// 发送消息(指针和长度)
  20. void send(const void *message, int len);
  21. /// 发送消息(StringPiece)
  22. void send(const StringPiece &message);
  23. /// 发送消息(Buffer 指针,交换数据)
  24. void send(Buffer *message); // this one will swap data
  25. /// 关闭连接(非线程安全,不能同时调用)
  26. void shutdown(); // NOT thread safe, no simultaneous calling
  27. /// 设置上下文
  28. void setContext(const boost::any &context)
  29. {
  30. context_ = context;
  31. }
  32. /// 获取上下文(常量引用)
  33. const boost::any &getContext() const
  34. {
  35. return context_;
  36. }
  37. /// 获取可变的上下文指针
  38. boost::any *getMutableContext()
  39. {
  40. return &context_;
  41. }
  42. /// 设置连接回调函数
  43. void setConnectionCallback(const ConnectionCallback &cb)
  44. {
  45. connectionCallback_ = cb;
  46. }
  47. /// 设置消息回调函数
  48. void setMessageCallback(const MessageCallback &cb)
  49. {
  50. messageCallback_ = cb;
  51. }
  52. private:
  53. /// 连接状态枚举
  54. enum StateE
  55. {
  56. kDisconnected,
  57. kConnecting,
  58. kConnected,
  59. kDisconnecting
  60. };
  61. EventLoop *loop_; // 事件循环指针
  62. ConnectionCallback connectionCallback_; // 连接回调函数
  63. MessageCallback messageCallback_; // 消息回调函数
  64. WriteCompleteCallback writeCompleteCallback_; // 写完成回调函数
  65. boost::any context_; // 上下文数据
  66. };

muduo::net::TcpClient类基础介绍

  1. class TcpClient : noncopyable
  2. {
  3. public:
  4. /// 构造函数,通过服务器地址和客户端名称构造TcpClient
  5. TcpClient(EventLoop *loop,
  6. const InetAddress &serverAddr,
  7. const string &nameArg);
  8. /// 析构函数,强制外部析构,以便管理std::unique_ptr成员
  9. ~TcpClient(); // force out-line dtor, for std::unique_ptr members.
  10. /// 连接服务器
  11. void connect();
  12. /// 断开连接
  13. void disconnect();
  14. /// 停止客户端
  15. void stop();
  16. /// 获取客户端对应的通信连接Connection对象的接口
  17. /// 在调用connect后,有可能连接还没有建立成功
  18. TcpConnectionPtr connection() const
  19. {
  20. MutexLockGuard lock(mutex_);
  21. return connection_;
  22. }
  23. /// 设置连接服务器成功时的回调函数
  24. void setConnectionCallback(ConnectionCallback cb)
  25. {
  26. connectionCallback_ = std::move(cb);
  27. }
  28. /// 设置收到服务器发送的消息时的回调函数
  29. void setMessageCallback(MessageCallback cb)
  30. {
  31. messageCallback_ = std::move(cb);
  32. }
  33. private:
  34. EventLoop *loop_; // 事件循环指针
  35. ConnectionCallback connectionCallback_; // 连接回调函数
  36. MessageCallback messageCallback_; // 消息回调函数
  37. WriteCompleteCallback writeCompleteCallback_; // 写完成回调函数
  38. TcpConnectionPtr connection_ GUARDED_BY(mutex_); // 连接对象,受mutex_保护
  39. };
  40. /*
  41. 需要注意的是,因为muduo库不管是服务器端还是客户端都是异步操作,
  42. 对于客户端来说,如果我们在连接还没有完全建立成功的时候发送数据,这是不被允许的。
  43. 因此我们可以使用内置的CountDownLatch类进行同步控制。
  44. */
  45. class CountDownLatch : noncopyable
  46. {
  47. public:
  48. /// 显式构造函数,初始化倒计时计数
  49. explicit CountDownLatch(int count);
  50. /// 等待倒计时完成
  51. void wait()
  52. {
  53. MutexLockGuard lock(mutex_);
  54. while (count_ > 0)
  55. {
  56. condition_.wait();
  57. }
  58. }
  59. /// 倒计时减一
  60. void countDown()
  61. {
  62. MutexLockGuard lock(mutex_);
  63. --count_;
  64. if (count_ == 0)
  65. {
  66. condition_.notifyAll();
  67. }
  68. }
  69. /// 获取当前倒计时计数
  70. int getCount() const;
  71. private:
  72. mutable MutexLock mutex_; // 互斥锁,受保护的成员变量
  73. Condition condition_ GUARDED_BY(mutex_); // 条件变量,受mutex_保护
  74. int count_ GUARDED_BY(mutex_); // 倒计时计数,受mutex_保护
  75. };

muduo::net::Buffer 类基础介绍

  1. class Buffer : public muduo::copyable
  2. {
  3. public:
  4. /// 常量:预留的前置空间大小
  5. static const size_t kCheapPrepend = 8;
  6. /// 常量:初始缓冲区大小
  7. static const size_t kInitialSize = 1024;
  8. /// 显式构造函数,初始化缓冲区大小
  9. explicit Buffer(size_t initialSize = kInitialSize)
  10. : buffer_(kCheapPrepend + initialSize),
  11. readerIndex_(kCheapPrepend),
  12. writerIndex_(kCheapPrepend) {}
  13. /// 交换缓冲区内容
  14. void swap(Buffer &rhs);
  15. /// 可读字节数
  16. size_t readableBytes() const;
  17. /// 可写字节数
  18. size_t writableBytes() const;
  19. /// 返回指向可读数据的指针
  20. const char *peek() const;
  21. /// 查找换行符(CRLF)的指针
  22. const char *findEOL() const;
  23. /// 从指定位置开始查找换行符(CRLF)的指针
  24. const char *findEOL(const char *start) const;
  25. /// 从缓冲区中取出指定长度的数据
  26. void retrieve(size_t len);
  27. /// 从缓冲区中取出一个64位整数
  28. void retrieveInt64();
  29. /// 从缓冲区中取出一个32位整数
  30. void retrieveInt32();
  31. /// 从缓冲区中取出一个16位整数
  32. void retrieveInt16();
  33. /// 从缓冲区中取出一个8位整数
  34. void retrieveInt8();
  35. /// 将缓冲区中的所有数据取出并转换为字符串
  36. string retrieveAllAsString();
  37. /// 将缓冲区中指定长度的数据取出并转换为字符串
  38. string retrieveAsString(size_t len);
  39. /// 向缓冲区追加字符串
  40. void append(const StringPiece &str);
  41. /// 向缓冲区追加指定数据和长度
  42. void append(const char * /*restrict*/ data, size_t len);
  43. /// 向缓冲区追加指定数据和长度
  44. void append(const void * /*restrict*/ data, size_t len);
  45. /// 返回指向可写数据的指针
  46. char *beginWrite();
  47. /// 返回指向可写数据的常量指针
  48. const char *beginWrite() const;
  49. /// 更新已写入的长度
  50. void hasWritten(size_t len);
  51. /// 向缓冲区追加一个64位整数
  52. void appendInt64(int64_t x);
  53. /// 向缓冲区追加一个32位整数
  54. void appendInt32(int32_t x);
  55. /// 向缓冲区追加一个16位整数
  56. void appendInt16(int16_t x);
  57. /// 向缓冲区追加一个8位整数
  58. void appendInt8(int8_t x);
  59. /// 从缓冲区中读取一个64位整数
  60. int64_t readInt64();
  61. /// 从缓冲区中读取一个32位整数
  62. int32_t readInt32();
  63. /// 从缓冲区中读取一个16位整数
  64. int16_t readInt16();
  65. /// 从缓冲区中读取一个8位整数
  66. int8_t readInt8();
  67. /// 查看缓冲区中的一个64位整数
  68. int64_t peekInt64() const;
  69. /// 查看缓冲区中的一个32位整数
  70. int32_t peekInt32() const;
  71. /// 查看缓冲区中的一个16位整数
  72. int16_t peekInt16() const;
  73. /// 查看缓冲区中的一个8位整数
  74. int8_t peekInt8() const;
  75. /// 在缓冲区前面添加一个64位整数
  76. void prependInt64(int64_t x);
  77. /// 在缓冲区前面添加一个32位整数
  78. void prependInt32(int32_t x);
  79. /// 在缓冲区前面添加一个16位整数
  80. void prependInt16(int16_t x);
  81. /// 在缓冲区前面添加一个8位整数
  82. void prependInt8(int8_t x);
  83. /// 在缓冲区前面添加指定数据和长度
  84. void prepend(const void * /*restrict*/ data, size_t len);
  85. private:
  86. std::vector<char> buffer_; // 缓冲区
  87. size_t readerIndex_; // 读索引
  88. size_t writerIndex_; // 写索引
  89. static const char kCRLF[]; // 换行符常量
  90. };

接下来就是通过上面的接口来使用muduo库通过protobuf实现网络通信,这里我们简单写一个计算器

客户端

  1. #include "muduo/proto/dispatcher.h"
  2. #include "muduo/proto/codec.h"
  3. #include "muduo/base/Logging.h"
  4. #include "muduo/base/Mutex.h"
  5. #include "muduo/net/EventLoop.h"
  6. #include "muduo/net/TcpClient.h"
  7. #include "muduo/net/EventLoopThread.h"
  8. #include "muduo/base/CountDownLatch.h"
  9. #include "request.pb.h"
  10. #include <memory>
  11. #include <iostream>
  12. class Client
  13. {
  14. public:
  15. typedef std::shared_ptr<sslx::AddResponse> AddResponsePtr;
  16. typedef std::shared_ptr<sslx::SubResponse> SubResponsePtr;
  17. typedef std::shared_ptr<sslx::MulResponse> MulResponsePtr;
  18. typedef std::shared_ptr<sslx::DivResponse> DivResponsePtr;
  19. typedef std::shared_ptr<sslx::ErrorResponse> ErrorResponsePtr;
  20. typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
  21. Client(const std::string& sip, int port)
  22. :_latch(1),
  23. _client(_loopthread.startLoop(), muduo::net::InetAddress(sip, port), "Client"),
  24. _dispatcher(std::bind(&Client::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),
  25. _codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))
  26. {
  27. // 注册业务回复处理函数
  28. _dispatcher.registerMessageCallback<sslx::AddResponse>(std::bind(&Client::onAdd, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  29. _dispatcher.registerMessageCallback<sslx::SubResponse>(std::bind(&Client::onSub, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  30. _dispatcher.registerMessageCallback<sslx::MulResponse>(std::bind(&Client::onMul, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  31. _dispatcher.registerMessageCallback<sslx::DivResponse>(std::bind(&Client::onDiv, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  32. _dispatcher.registerMessageCallback<sslx::ErrorResponse>(std::bind(&Client::onError, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  33. _client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &_codec, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  34. _client.setConnectionCallback(std::bind(&Client::onConnection, this, std::placeholders::_1));
  35. Connect();
  36. }
  37. void Connect()
  38. {
  39. _client.connect();
  40. _latch.wait();
  41. }
  42. void Add(int num1 , int num2)
  43. {
  44. sslx::AddRequest req;
  45. req.set_num1(num1);
  46. req.set_num2(num2);
  47. send(req);
  48. }
  49. void Sub(int num1 , int num2)
  50. {
  51. sslx::SubRequest req;
  52. req.set_num1(num1);
  53. req.set_num2(num2);
  54. send(req);
  55. }
  56. void Mul(int num1 , int num2)
  57. {
  58. sslx::MulRequest req;
  59. req.set_num1(num1);
  60. req.set_num2(num2);
  61. send(req);
  62. }
  63. void Div(int num1 , int num2)
  64. {
  65. sslx::DivRequest req;
  66. req.set_num1(num1);
  67. req.set_num2(num2);
  68. send(req);
  69. }
  70. private:
  71. bool send(const google::protobuf::Message& msg)
  72. {
  73. //连接状态正常,再发送,否则就返回false
  74. if(_conn->connected())
  75. {
  76. _codec.send(_conn, msg);
  77. return true;
  78. }
  79. return false;
  80. }
  81. void onConnection(const muduo::net::TcpConnectionPtr& conn)
  82. {
  83. if(conn->connected())
  84. {
  85. _conn = conn;
  86. _latch.countDown();
  87. }
  88. else
  89. {
  90. _conn.reset(); // 连接关闭时的操作
  91. }
  92. }
  93. void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& msg, muduo::Timestamp)
  94. {
  95. LOG_INFO << "onUnknowMessage" << msg->GetTypeName();
  96. conn->shutdown();
  97. }
  98. void onAdd(const muduo::net::TcpConnectionPtr& conn, const AddResponsePtr& msg, muduo::Timestamp)
  99. {
  100. std::cout << "加法结果 : " << msg->result() << std::endl;
  101. }
  102. void onSub(const muduo::net::TcpConnectionPtr& conn, const SubResponsePtr& msg, muduo::Timestamp)
  103. {
  104. std::cout << "减法结果 : " << msg->result() << std::endl;
  105. }
  106. void onMul(const muduo::net::TcpConnectionPtr& conn, const MulResponsePtr& msg, muduo::Timestamp)
  107. {
  108. std::cout << "乘法结果 : " << msg->result() << std::endl;
  109. }
  110. void onDiv(const muduo::net::TcpConnectionPtr& conn, const DivResponsePtr& msg, muduo::Timestamp)
  111. {
  112. std::cout << "除法结果 : " << msg->result() << std::endl;
  113. }
  114. void onError(const muduo::net::TcpConnectionPtr& conn, const ErrorResponsePtr& msg, muduo::Timestamp)
  115. {
  116. std::cout << "出现除零错误" << std::endl;
  117. }
  118. private:
  119. muduo::CountDownLatch _latch; //主要同步主线程和网络线程,确保在连接建立之前不会发送请求
  120. muduo::net::EventLoopThread _loopthread; // 提供一个独立的事件循环,用于处理网络事件
  121. muduo::net::TcpConnectionPtr _conn; // 保存当前的TcpConnectionPtr, 用于在连接建立之后发送消息
  122. muduo::net::TcpClient _client; // 管理客户端的连接和重连
  123. ProtobufDispatcher _dispatcher; // 用于根据消息的类型调用相应的处理函数
  124. ProtobufCodec _codec; // 负责Protobuf消息的编解码
  125. };
  126. int main()
  127. {
  128. Client client("127.0.0.1", 8085);
  129. client.Add(11, 11);
  130. client.Sub(11, 11);
  131. sleep(1);
  132. return 0;
  133. }

服务端

  1. #include <muduo/proto/codec.h>
  2. #include "muduo/proto/dispatcher.h"
  3. #include "muduo/base/Logging.h"
  4. #include "muduo/base/Mutex.h"
  5. #include "muduo/net/EventLoop.h"
  6. #include "muduo/net/TcpServer.h"
  7. #include "request.pb.h"
  8. #include <memory>
  9. #include <string>
  10. #include <iostream>
  11. #include <unordered_map>
  12. class Server
  13. {
  14. public:
  15. typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
  16. typedef std::shared_ptr<sslx::AddRequest> AddRequestPtr;
  17. typedef std::shared_ptr<sslx::SubRequest> SubRequestPtr;
  18. typedef std::shared_ptr<sslx::MulRequest> MulRequestPtr;
  19. typedef std::shared_ptr<sslx::DivRequest> DivRequestPtr;
  20. Server(int port)
  21. :_server(&_baseLoop, muduo::net::InetAddress("0.0.0.0", port), "Server", muduo::net::TcpServer::kReusePort),
  22. _dispatcher(std::bind(&Server::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),
  23. _codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))
  24. {
  25. _dispatcher.registerMessageCallback<sslx::AddRequest>(std::bind(&Server::onAdd, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  26. _dispatcher.registerMessageCallback<sslx::SubRequest>(std::bind(&Server::onSub, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  27. _dispatcher.registerMessageCallback<sslx::MulRequest>(std::bind(&Server::onMul, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  28. _dispatcher.registerMessageCallback<sslx::DivRequest>(std::bind(&Server::onDiv, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  29. _server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &_codec, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  30. _server.setConnectionCallback(std::bind(&Server::onConnection, this, std::placeholders::_1));
  31. }
  32. void start()
  33. {
  34. _server.start();
  35. _baseLoop.loop();
  36. }
  37. private:
  38. int Add(int num1, int num2)
  39. {
  40. return num1 + num2;
  41. }
  42. int Sub(int num1, int num2)
  43. {
  44. return num1 - num2;
  45. }
  46. int Mul(int num1, int num2)
  47. {
  48. return num1 * num2;
  49. }
  50. int Div(int num1, int num2)
  51. {
  52. if(num2 == 0)
  53. {
  54. return INT_MIN;
  55. }
  56. return num1 / num2;
  57. }
  58. void onAdd(const muduo::net::TcpConnectionPtr& conn, const AddRequestPtr& msg, muduo::Timestamp)
  59. {
  60. sslx::AddResponse resp;
  61. int ret = Add(msg->num1(), msg->num2());
  62. resp.set_result(ret);
  63. _codec.send(conn, resp);
  64. }
  65. void onSub(const muduo::net::TcpConnectionPtr& conn, const SubRequestPtr& msg, muduo::Timestamp)
  66. {
  67. sslx::SubResponse resp;
  68. int ret = Sub(msg->num1(), msg->num2());
  69. resp.set_result(ret);
  70. _codec.send(conn, resp);
  71. }
  72. void onMul(const muduo::net::TcpConnectionPtr& conn, const MulRequestPtr& msg, muduo::Timestamp)
  73. {
  74. sslx::MulResponse resp;
  75. int ret = Mul(msg->num1(), msg->num2());
  76. resp.set_result(ret);
  77. _codec.send(conn, resp);
  78. }
  79. void onDiv(const muduo::net::TcpConnectionPtr& conn, const DivRequestPtr& msg, muduo::Timestamp)
  80. {
  81. sslx::DivResponse resp1;
  82. sslx::ErrorResponse resp2;
  83. int ret = Div(msg->num1(), msg->num2());
  84. if(ret ==INT_MIN)
  85. {
  86. resp2.set_msg(std::to_string(ret));
  87. _codec.send(conn, resp2);
  88. }
  89. else
  90. {
  91. resp1.set_result(ret);
  92. _codec.send(conn, resp1);
  93. }
  94. }
  95. void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& message, muduo::Timestamp)
  96. {
  97. LOG_INFO << "onUNknownMessage : " << message->GetTypeName();
  98. conn->shutdown();
  99. }
  100. void onConnection(const muduo::net::TcpConnectionPtr& conn)
  101. {
  102. if(conn->connected())
  103. {
  104. LOG_INFO << "新连接建立成功";
  105. }
  106. else
  107. {
  108. LOG_INFO << "连接已关闭";
  109. }
  110. }
  111. private:
  112. muduo::net::EventLoop _baseLoop;
  113. muduo::net::TcpServer _server; //服务器对象
  114. ProtobufDispatcher _dispatcher; // 请求分发对象 --要向其中注册请求处理函数
  115. ProtobufCodec _codec; // protobuf协议处理器--针对收到的请求数据进行protobuf协议处理
  116. };
  117. int main()
  118. {
  119. Server server(8085);
  120. server.start();
  121. return 0;
  122. }

        这里简单梳理一下网路的流程:在进行初始化的时候,客户端和服务端之间会对_codec和_dispatcher还有事件监控循环线程,在建立连接的时候,异步线程会通过setConnectionCallback的回调函数,onConnection函数中的countDown() 函数唤醒主线程开始进行业务处理,在进行业务处理的时候,会将请求通过send发到缓冲区中,响应报文回来之后会给setMessageCallback函数传递信号,然后由loop线程来接收到响应,调用相关的函数来对结果进行处理。

protobuf

  1. syntax = "proto3";
  2. package sslx;
  3. message AddRequest
  4. {
  5. int32 num1 = 1;
  6. int32 num2 = 2;
  7. }
  8. message AddResponse
  9. {
  10. int32 result = 1;
  11. }
  12. message SubRequest
  13. {
  14. int32 num1 = 1;
  15. int32 num2 = 2;
  16. }
  17. message SubResponse
  18. {
  19. int32 result = 1;
  20. }
  21. message MulRequest
  22. {
  23. int32 num1 = 1;
  24. int32 num2 = 2;
  25. }
  26. message MulResponse
  27. {
  28. int32 result = 1;
  29. }
  30. message DivRequest
  31. {
  32. int32 num1 = 1;
  33. int32 num2 = 2;
  34. }
  35. message DivResponse
  36. {
  37. int32 result = 1;
  38. }
  39. message ErrorResponse
  40. {
  41. string msg = 1;
  42. }

makefile

  1. all: client server
  2. client: client.cc request.pb.cc mqthird/include/muduo/proto/codec.cc
  3. g++ -g -o $@ $^ -std=c++11 -I./mqthird/include -L./mqthird/lib -lmuduo_net -lmuduo_base -lpthread -lprotobuf -lz
  4. server: server.cc request.pb.cc mqthird/include/muduo/proto/codec.cc
  5. g++ -g -o $@ $^ -std=c++11 -I./mqthird/include -L./mqthird/lib -lmuduo_net -lmuduo_base -lpthread -lprotobuf -lz
  6. .PHONY:clean
  7. clean:
  8. rm -rf server client

gitee

https://gitee.com/pu-mingbo/master.git

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/998676
推荐阅读
相关标签
  

闽ICP备14008679号