赞
踩
@TOC
Muduo由陈硕⼤佬开发,是⼀个基于⾮阻塞IO和事件驱动的C++⾼并发TCP⽹络编程库。他是一款基于主从Reactor模型的网络库,其使用的线程模型是one loop per thread, 所谓 one loop per thread 指的是:
- typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;
- typedef std::function<void(const TcpConnectionPtr &)> ConnectionCallback;
- typedef std::function<void(const TcpConnectionPtr &,
- Buffer *,
- Timestamp)>
- MessageCallback;
- class InetAddress : public muduo::copyable
- {
- public:
- InetAddress(StringArg ip, uint16_t port, bool ipv6 = false);
- };
- class TcpServer : noncopyable
- {
- public:
- enum Option
- {
- kNoReusePort,
- kReusePort,
- };
- TcpServer(EventLoop *loop,
- const InetAddress &listenAddr,
- const string &nameArg,
- Option option = kNoReusePort);
- void setThreadNum(int numThreads);
- void start();
- /// 当⼀个新连接建⽴成功的时候被调⽤
- void setConnectionCallback(const ConnectionCallback &cb)
- {
- connectionCallback_ = cb;
- }
- /// 消息的业务处理回调函数---这是收到新连接消息的时候被调⽤的函数
- void setMessageCallback(const MessageCallback &cb)
- {
- messageCallback_ = cb;
- }
- };

- class EventLoop : noncopyable
- {
- public:
- /// 无限循环。
- /// 必须在与对象创建相同的线程中调用。
- void loop();
- /// 退出循环。
- /// 如果通过原始指针调用,这不是 100% 线程安全,
- /// 最好通过 shared_ptr<EventLoop> 调用以保证 100% 安全。
- void quit();
- TimerId runAt(Timestamp time, TimerCallback cb);
- /// 在 @c delay 秒后运行回调。
- /// 线程安全,可以从其他线程调用。
- TimerId runAfter(double delay, TimerCallback cb);
- /// 每隔 @c interval 秒运行一次回调。
- /// 线程安全,可以从其他线程调用。
- TimerId runEvery(double interval, TimerCallback cb);
- /// 取消定时器。
- /// 线程安全,可以从其他线程调用。
- void cancel(TimerId timerId);
-
- private:
- std::atomic<bool> quit_;
- std::unique_ptr<Poller> poller_;
- mutable MutexLock mutex_;
- std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);
- };

- class TcpConnection : noncopyable,
- public std::enable_shared_from_this<TcpConnection>
- {
- public:
- /// 使用已连接的 sockfd 构造 TcpConnection
- ///
- /// 用户不应创建此对象。
- TcpConnection(EventLoop *loop,
- const string &name,
- int sockfd,
- const InetAddress &localAddr,
- const InetAddress &peerAddr);
-
- /// 检查连接是否已建立
- bool connected() const { return state_ == kConnected; }
-
- /// 检查连接是否已断开
- bool disconnected() const { return state_ == kDisconnected; }
-
- /// 发送消息(右值引用,C++11)
- void send(string &&message); // C++11
-
- /// 发送消息(指针和长度)
- void send(const void *message, int len);
-
- /// 发送消息(StringPiece)
- void send(const StringPiece &message);
-
- /// 发送消息(Buffer 指针,交换数据)
- void send(Buffer *message); // this one will swap data
-
- /// 关闭连接(非线程安全,不能同时调用)
- void shutdown(); // NOT thread safe, no simultaneous calling
-
- /// 设置上下文
- void setContext(const boost::any &context)
- {
- context_ = context;
- }
-
- /// 获取上下文(常量引用)
- const boost::any &getContext() const
- {
- return context_;
- }
-
- /// 获取可变的上下文指针
- boost::any *getMutableContext()
- {
- return &context_;
- }
-
- /// 设置连接回调函数
- void setConnectionCallback(const ConnectionCallback &cb)
- {
- connectionCallback_ = cb;
- }
-
- /// 设置消息回调函数
- void setMessageCallback(const MessageCallback &cb)
- {
- messageCallback_ = cb;
- }
-
- private:
- /// 连接状态枚举
- enum StateE
- {
- kDisconnected,
- kConnecting,
- kConnected,
- kDisconnecting
- };
-
- EventLoop *loop_; // 事件循环指针
- ConnectionCallback connectionCallback_; // 连接回调函数
- MessageCallback messageCallback_; // 消息回调函数
- WriteCompleteCallback writeCompleteCallback_; // 写完成回调函数
- boost::any context_; // 上下文数据
- };

- class TcpClient : noncopyable
- {
- public:
- /// 构造函数,通过服务器地址和客户端名称构造TcpClient
- TcpClient(EventLoop *loop,
- const InetAddress &serverAddr,
- const string &nameArg);
-
- /// 析构函数,强制外部析构,以便管理std::unique_ptr成员
- ~TcpClient(); // force out-line dtor, for std::unique_ptr members.
-
- /// 连接服务器
- void connect();
-
- /// 断开连接
- void disconnect();
-
- /// 停止客户端
- void stop();
-
- /// 获取客户端对应的通信连接Connection对象的接口
- /// 在调用connect后,有可能连接还没有建立成功
- TcpConnectionPtr connection() const
- {
- MutexLockGuard lock(mutex_);
- return connection_;
- }
-
- /// 设置连接服务器成功时的回调函数
- void setConnectionCallback(ConnectionCallback cb)
- {
- connectionCallback_ = std::move(cb);
- }
-
- /// 设置收到服务器发送的消息时的回调函数
- void setMessageCallback(MessageCallback cb)
- {
- messageCallback_ = std::move(cb);
- }
-
- private:
- EventLoop *loop_; // 事件循环指针
- ConnectionCallback connectionCallback_; // 连接回调函数
- MessageCallback messageCallback_; // 消息回调函数
- WriteCompleteCallback writeCompleteCallback_; // 写完成回调函数
- TcpConnectionPtr connection_ GUARDED_BY(mutex_); // 连接对象,受mutex_保护
- };
-
- /*
- 需要注意的是,因为muduo库不管是服务器端还是客户端都是异步操作,
- 对于客户端来说,如果我们在连接还没有完全建立成功的时候发送数据,这是不被允许的。
- 因此我们可以使用内置的CountDownLatch类进行同步控制。
- */
- class CountDownLatch : noncopyable
- {
- public:
- /// 显式构造函数,初始化倒计时计数
- explicit CountDownLatch(int count);
-
- /// 等待倒计时完成
- void wait()
- {
- MutexLockGuard lock(mutex_);
- while (count_ > 0)
- {
- condition_.wait();
- }
- }
-
- /// 倒计时减一
- void countDown()
- {
- MutexLockGuard lock(mutex_);
- --count_;
- if (count_ == 0)
- {
- condition_.notifyAll();
- }
- }
-
- /// 获取当前倒计时计数
- int getCount() const;
-
- private:
- mutable MutexLock mutex_; // 互斥锁,受保护的成员变量
- Condition condition_ GUARDED_BY(mutex_); // 条件变量,受mutex_保护
- int count_ GUARDED_BY(mutex_); // 倒计时计数,受mutex_保护
- };

- class Buffer : public muduo::copyable
- {
- public:
- /// 常量:预留的前置空间大小
- static const size_t kCheapPrepend = 8;
-
- /// 常量:初始缓冲区大小
- static const size_t kInitialSize = 1024;
-
- /// 显式构造函数,初始化缓冲区大小
- explicit Buffer(size_t initialSize = kInitialSize)
- : buffer_(kCheapPrepend + initialSize),
- readerIndex_(kCheapPrepend),
- writerIndex_(kCheapPrepend) {}
-
- /// 交换缓冲区内容
- void swap(Buffer &rhs);
-
- /// 可读字节数
- size_t readableBytes() const;
-
- /// 可写字节数
- size_t writableBytes() const;
-
- /// 返回指向可读数据的指针
- const char *peek() const;
-
- /// 查找换行符(CRLF)的指针
- const char *findEOL() const;
-
- /// 从指定位置开始查找换行符(CRLF)的指针
- const char *findEOL(const char *start) const;
-
- /// 从缓冲区中取出指定长度的数据
- void retrieve(size_t len);
-
- /// 从缓冲区中取出一个64位整数
- void retrieveInt64();
-
- /// 从缓冲区中取出一个32位整数
- void retrieveInt32();
-
- /// 从缓冲区中取出一个16位整数
- void retrieveInt16();
-
- /// 从缓冲区中取出一个8位整数
- void retrieveInt8();
-
- /// 将缓冲区中的所有数据取出并转换为字符串
- string retrieveAllAsString();
-
- /// 将缓冲区中指定长度的数据取出并转换为字符串
- string retrieveAsString(size_t len);
-
- /// 向缓冲区追加字符串
- void append(const StringPiece &str);
-
- /// 向缓冲区追加指定数据和长度
- void append(const char * /*restrict*/ data, size_t len);
-
- /// 向缓冲区追加指定数据和长度
- void append(const void * /*restrict*/ data, size_t len);
-
- /// 返回指向可写数据的指针
- char *beginWrite();
-
- /// 返回指向可写数据的常量指针
- const char *beginWrite() const;
-
- /// 更新已写入的长度
- void hasWritten(size_t len);
-
- /// 向缓冲区追加一个64位整数
- void appendInt64(int64_t x);
-
- /// 向缓冲区追加一个32位整数
- void appendInt32(int32_t x);
-
- /// 向缓冲区追加一个16位整数
- void appendInt16(int16_t x);
-
- /// 向缓冲区追加一个8位整数
- void appendInt8(int8_t x);
-
- /// 从缓冲区中读取一个64位整数
- int64_t readInt64();
-
- /// 从缓冲区中读取一个32位整数
- int32_t readInt32();
-
- /// 从缓冲区中读取一个16位整数
- int16_t readInt16();
-
- /// 从缓冲区中读取一个8位整数
- int8_t readInt8();
-
- /// 查看缓冲区中的一个64位整数
- int64_t peekInt64() const;
-
- /// 查看缓冲区中的一个32位整数
- int32_t peekInt32() const;
-
- /// 查看缓冲区中的一个16位整数
- int16_t peekInt16() const;
-
- /// 查看缓冲区中的一个8位整数
- int8_t peekInt8() const;
-
- /// 在缓冲区前面添加一个64位整数
- void prependInt64(int64_t x);
-
- /// 在缓冲区前面添加一个32位整数
- void prependInt32(int32_t x);
-
- /// 在缓冲区前面添加一个16位整数
- void prependInt16(int16_t x);
-
- /// 在缓冲区前面添加一个8位整数
- void prependInt8(int8_t x);
-
- /// 在缓冲区前面添加指定数据和长度
- void prepend(const void * /*restrict*/ data, size_t len);
-
- private:
- std::vector<char> buffer_; // 缓冲区
- size_t readerIndex_; // 读索引
- size_t writerIndex_; // 写索引
- static const char kCRLF[]; // 换行符常量
- };

接下来就是通过上面的接口来使用muduo库通过protobuf实现网络通信,这里我们简单写一个计算器
- #include "muduo/proto/dispatcher.h"
- #include "muduo/proto/codec.h"
- #include "muduo/base/Logging.h"
- #include "muduo/base/Mutex.h"
- #include "muduo/net/EventLoop.h"
- #include "muduo/net/TcpClient.h"
- #include "muduo/net/EventLoopThread.h"
- #include "muduo/base/CountDownLatch.h"
-
- #include "request.pb.h"
- #include <memory>
- #include <iostream>
-
-
- class Client
- {
- public:
- typedef std::shared_ptr<sslx::AddResponse> AddResponsePtr;
- typedef std::shared_ptr<sslx::SubResponse> SubResponsePtr;
- typedef std::shared_ptr<sslx::MulResponse> MulResponsePtr;
- typedef std::shared_ptr<sslx::DivResponse> DivResponsePtr;
- typedef std::shared_ptr<sslx::ErrorResponse> ErrorResponsePtr;
- typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
-
- Client(const std::string& sip, int port)
- :_latch(1),
- _client(_loopthread.startLoop(), muduo::net::InetAddress(sip, port), "Client"),
- _dispatcher(std::bind(&Client::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),
- _codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))
- {
- // 注册业务回复处理函数
- _dispatcher.registerMessageCallback<sslx::AddResponse>(std::bind(&Client::onAdd, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
- _dispatcher.registerMessageCallback<sslx::SubResponse>(std::bind(&Client::onSub, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
- _dispatcher.registerMessageCallback<sslx::MulResponse>(std::bind(&Client::onMul, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
- _dispatcher.registerMessageCallback<sslx::DivResponse>(std::bind(&Client::onDiv, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
- _dispatcher.registerMessageCallback<sslx::ErrorResponse>(std::bind(&Client::onError, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
-
- _client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &_codec, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
- _client.setConnectionCallback(std::bind(&Client::onConnection, this, std::placeholders::_1));
-
- Connect();
- }
-
- void Connect()
- {
- _client.connect();
- _latch.wait();
- }
-
- void Add(int num1 , int num2)
- {
- sslx::AddRequest req;
- req.set_num1(num1);
- req.set_num2(num2);
- send(req);
- }
-
- void Sub(int num1 , int num2)
- {
- sslx::SubRequest req;
- req.set_num1(num1);
- req.set_num2(num2);
- send(req);
- }
-
-
- void Mul(int num1 , int num2)
- {
- sslx::MulRequest req;
- req.set_num1(num1);
- req.set_num2(num2);
- send(req);
- }
-
- void Div(int num1 , int num2)
- {
- sslx::DivRequest req;
- req.set_num1(num1);
- req.set_num2(num2);
- send(req);
- }
- private:
- bool send(const google::protobuf::Message& msg)
- {
- //连接状态正常,再发送,否则就返回false
- if(_conn->connected())
- {
- _codec.send(_conn, msg);
- return true;
- }
- return false;
- }
-
- void onConnection(const muduo::net::TcpConnectionPtr& conn)
- {
- if(conn->connected())
- {
- _conn = conn;
- _latch.countDown();
- }
- else
- {
- _conn.reset(); // 连接关闭时的操作
- }
- }
-
-
- void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& msg, muduo::Timestamp)
- {
- LOG_INFO << "onUnknowMessage" << msg->GetTypeName();
- conn->shutdown();
- }
-
- void onAdd(const muduo::net::TcpConnectionPtr& conn, const AddResponsePtr& msg, muduo::Timestamp)
- {
- std::cout << "加法结果 : " << msg->result() << std::endl;
- }
-
- void onSub(const muduo::net::TcpConnectionPtr& conn, const SubResponsePtr& msg, muduo::Timestamp)
- {
- std::cout << "减法结果 : " << msg->result() << std::endl;
- }
-
- void onMul(const muduo::net::TcpConnectionPtr& conn, const MulResponsePtr& msg, muduo::Timestamp)
- {
- std::cout << "乘法结果 : " << msg->result() << std::endl;
- }
-
- void onDiv(const muduo::net::TcpConnectionPtr& conn, const DivResponsePtr& msg, muduo::Timestamp)
- {
- std::cout << "除法结果 : " << msg->result() << std::endl;
- }
-
- void onError(const muduo::net::TcpConnectionPtr& conn, const ErrorResponsePtr& msg, muduo::Timestamp)
- {
- std::cout << "出现除零错误" << std::endl;
- }
-
- private:
- muduo::CountDownLatch _latch; //主要同步主线程和网络线程,确保在连接建立之前不会发送请求
- muduo::net::EventLoopThread _loopthread; // 提供一个独立的事件循环,用于处理网络事件
- muduo::net::TcpConnectionPtr _conn; // 保存当前的TcpConnectionPtr, 用于在连接建立之后发送消息
- muduo::net::TcpClient _client; // 管理客户端的连接和重连
- ProtobufDispatcher _dispatcher; // 用于根据消息的类型调用相应的处理函数
- ProtobufCodec _codec; // 负责Protobuf消息的编解码
-
- };
-
- int main()
- {
- Client client("127.0.0.1", 8085);
- client.Add(11, 11);
- client.Sub(11, 11);
-
- sleep(1);
- return 0;
- }

- #include <muduo/proto/codec.h>
- #include "muduo/proto/dispatcher.h"
- #include "muduo/base/Logging.h"
- #include "muduo/base/Mutex.h"
- #include "muduo/net/EventLoop.h"
- #include "muduo/net/TcpServer.h"
-
- #include "request.pb.h"
-
- #include <memory>
- #include <string>
- #include <iostream>
- #include <unordered_map>
-
- class Server
- {
- public:
- typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
- typedef std::shared_ptr<sslx::AddRequest> AddRequestPtr;
- typedef std::shared_ptr<sslx::SubRequest> SubRequestPtr;
- typedef std::shared_ptr<sslx::MulRequest> MulRequestPtr;
- typedef std::shared_ptr<sslx::DivRequest> DivRequestPtr;
-
- Server(int port)
- :_server(&_baseLoop, muduo::net::InetAddress("0.0.0.0", port), "Server", muduo::net::TcpServer::kReusePort),
- _dispatcher(std::bind(&Server::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),
- _codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))
- {
- _dispatcher.registerMessageCallback<sslx::AddRequest>(std::bind(&Server::onAdd, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
- _dispatcher.registerMessageCallback<sslx::SubRequest>(std::bind(&Server::onSub, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
- _dispatcher.registerMessageCallback<sslx::MulRequest>(std::bind(&Server::onMul, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
- _dispatcher.registerMessageCallback<sslx::DivRequest>(std::bind(&Server::onDiv, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
-
- _server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &_codec, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
- _server.setConnectionCallback(std::bind(&Server::onConnection, this, std::placeholders::_1));
- }
-
- void start()
- {
- _server.start();
- _baseLoop.loop();
- }
- private:
-
- int Add(int num1, int num2)
- {
- return num1 + num2;
- }
-
- int Sub(int num1, int num2)
- {
- return num1 - num2;
- }
-
- int Mul(int num1, int num2)
- {
- return num1 * num2;
- }
-
- int Div(int num1, int num2)
- {
- if(num2 == 0)
- {
- return INT_MIN;
- }
- return num1 / num2;
- }
-
- void onAdd(const muduo::net::TcpConnectionPtr& conn, const AddRequestPtr& msg, muduo::Timestamp)
- {
- sslx::AddResponse resp;
- int ret = Add(msg->num1(), msg->num2());
- resp.set_result(ret);
- _codec.send(conn, resp);
- }
-
- void onSub(const muduo::net::TcpConnectionPtr& conn, const SubRequestPtr& msg, muduo::Timestamp)
- {
- sslx::SubResponse resp;
- int ret = Sub(msg->num1(), msg->num2());
- resp.set_result(ret);
- _codec.send(conn, resp);
- }
-
- void onMul(const muduo::net::TcpConnectionPtr& conn, const MulRequestPtr& msg, muduo::Timestamp)
- {
- sslx::MulResponse resp;
- int ret = Mul(msg->num1(), msg->num2());
- resp.set_result(ret);
- _codec.send(conn, resp);
- }
-
- void onDiv(const muduo::net::TcpConnectionPtr& conn, const DivRequestPtr& msg, muduo::Timestamp)
- {
- sslx::DivResponse resp1;
- sslx::ErrorResponse resp2;
- int ret = Div(msg->num1(), msg->num2());
- if(ret ==INT_MIN)
- {
- resp2.set_msg(std::to_string(ret));
- _codec.send(conn, resp2);
- }
- else
- {
- resp1.set_result(ret);
- _codec.send(conn, resp1);
- }
- }
-
- void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& message, muduo::Timestamp)
- {
- LOG_INFO << "onUNknownMessage : " << message->GetTypeName();
- conn->shutdown();
- }
-
- void onConnection(const muduo::net::TcpConnectionPtr& conn)
- {
- if(conn->connected())
- {
- LOG_INFO << "新连接建立成功";
- }
- else
- {
- LOG_INFO << "连接已关闭";
- }
- }
-
- private:
- muduo::net::EventLoop _baseLoop;
- muduo::net::TcpServer _server; //服务器对象
- ProtobufDispatcher _dispatcher; // 请求分发对象 --要向其中注册请求处理函数
- ProtobufCodec _codec; // protobuf协议处理器--针对收到的请求数据进行protobuf协议处理
-
- };
-
- int main()
- {
- Server server(8085);
- server.start();
- return 0;
- }

这里简单梳理一下网路的流程:在进行初始化的时候,客户端和服务端之间会对_codec和_dispatcher还有事件监控循环线程,在建立连接的时候,异步线程会通过setConnectionCallback的回调函数,onConnection函数中的countDown() 函数唤醒主线程开始进行业务处理,在进行业务处理的时候,会将请求通过send发到缓冲区中,响应报文回来之后会给setMessageCallback函数传递信号,然后由loop线程来接收到响应,调用相关的函数来对结果进行处理。
- syntax = "proto3";
-
- package sslx;
-
- message AddRequest
- {
- int32 num1 = 1;
- int32 num2 = 2;
- }
-
- message AddResponse
- {
- int32 result = 1;
- }
-
- message SubRequest
- {
- int32 num1 = 1;
- int32 num2 = 2;
- }
-
- message SubResponse
- {
- int32 result = 1;
- }
-
- message MulRequest
- {
- int32 num1 = 1;
- int32 num2 = 2;
- }
-
- message MulResponse
- {
- int32 result = 1;
- }
-
- message DivRequest
- {
- int32 num1 = 1;
- int32 num2 = 2;
- }
-
- message DivResponse
- {
- int32 result = 1;
- }
-
- message ErrorResponse
- {
- string msg = 1;
- }

- all: client server
- client: client.cc request.pb.cc mqthird/include/muduo/proto/codec.cc
- g++ -g -o $@ $^ -std=c++11 -I./mqthird/include -L./mqthird/lib -lmuduo_net -lmuduo_base -lpthread -lprotobuf -lz
- server: server.cc request.pb.cc mqthird/include/muduo/proto/codec.cc
- g++ -g -o $@ $^ -std=c++11 -I./mqthird/include -L./mqthird/lib -lmuduo_net -lmuduo_base -lpthread -lprotobuf -lz
- .PHONY:clean
- clean:
- rm -rf server client
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。