当前位置:   article > 正文

Rust轻量级I/O库mio_rust mio

rust mio

Rust轻量级I/O库mio

mio是rust实现的一个轻量级的I/O库。其实现基本上就是对不同操作系统底层相关API的封装,抽象出统一的接口供上层使用。Linux下为epoll,Windows下为IOCP,OS X下为kqueue

一、关于mio

1、重要特性

  • 非阻塞TCP,UDP
  • I/O事件通知epoll,kqeue,IOCP实现
  • 运行时零分配
  • 平台可扩展

2、基础用法

其使用方法与Linux中epoll差不多,mio底层封装了epoll,使用步骤思路:

  1. 创建Poll
  2. 注册事件
  3. 事件循环等待与处理事件

mio提供可跨平台的sytem selector访问,不同平台如下表,都可调用相同的API。不同平台使用的API开销不尽相同。由于mio是基于readiness(就绪状态)的API,与Linux epoll相似,可以看到很多API在Linux上都可以一对一映射。相比之下,Windows IOCP是基于完成(completion-based)而非基于就绪的API,所以两者间会有较多桥接。 同时mio提供自身版本的TcpListener、TcpStream、UdpSocket,这些API封装了底层平台相关API,并设为非阻塞且实现Evented trait。

OSSelector
Linuxepoll
OS X, iOSkqueue
WindowsIOCP
FreeBSDkqueue
Androidepoll

mio实现的是一个单线程事件循环,并没有实现线程池及多线程事件循环,如果需要线程池及多线程事件循环等需要自己实现。

二、源码分析

先给出mio的源码目录结构,只列出了关键的部分,如下所示:

  1. mio代码目录结构
  2. mio
  3. |---->test
  4. |---->src
  5. |-------->deprecated //事件循环代码
  6. |-------------->event_loop.rs //EventLoop的实现,内部封装了Poll 【1
  7. |-------------->handler.rs //供上层实现的接口
  8. |-------->net
  9. |------------>mod.rs
  10. |------------>tcp.rs
  11. |------------>udp.rs
  12. |-------->sys //不同系统下的实现
  13. |------------>mod.rs
  14. |------------>fuchsia
  15. |------------>unix //Linux下封装的epoll
  16. |------------------>mod.rs
  17. |------------------>epoll.rs 【3
  18. |------------------>awakener.rs
  19. |------------>windows //windows下封装的iocp
  20. |-------->lib.rs
  21. |-------->poll.rs //定义Poll 【2
  22. |-------->channel.rs 【4
  23. |-------->event_imp.rs
  24. |-------->timer.rs 【5
  25. |-------->......
  26. 复制代码

对涉及不同操作系统的部分代码,以Linux操作系统为例。在Linux操作系统中,mio封装了epoll。后面会给出相应的代码。

【1】Eventloop代码分析

结合前面的代码示例给出相应的关键代码如下: EventLoop事件循环定义,可以看到里面封装了Poll,以Linux系统举例,Poll又封装了epoll。在使用Poll或Linux中epoll时,最重要的代码是epoll_wait()等待事件Event并针对每个Event进行不同的处理。这里EventLoopepoll_create()epoll_wait()epoll_ctl()进行进一步的封装,将对Event的处理抽象成Handler,供上层实现具体的逻辑处理。

  1. // Single threaded IO event loop. //这里是单线程事件循环,更多的时候我们需要加线程池,以此为基础,再进行一次封装,供上层使用
  2. pub struct EventLoop<H: Handler> {
  3. run: bool,
  4. poll: Poll,
  5. events: Events, //对应epoll中的epoll_event
  6. timer: Timer<H::Timeout>,
  7. notify_tx: channel::SyncSender<H::Message>,
  8. notify_rx: channel::Receiver<H::Message>,
  9. config: Config,
  10. }
  11. 复制代码

抽象出接口供上层应用实现不同事件的逻辑处理。这里有点类似于回调函数,上层用户需要在此实现业务逻辑代码,实际运行时需要将函数指针传递给底层事件循环,底层事件循环运行时会调用用户传递过来的函数。在Rust中,可能描述的不是很精准,不过可以这样理解。

  1. pub trait Handler: Sized {
  2. type Timeout;
  3. type Message;
  4. /// Invoked when the socket represented by `token` is ready to be operated
  5. /// on. `events` indicates the specific operations that are
  6. /// ready to be performed.
  7. /// This function will only be invoked a single time per socket per event
  8. /// loop tick.
  9. fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: Ready) {
  10. } //【1】
  11. /// Invoked when a message has been received via the event loop's channel.
  12. fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Self::Message) {
  13. } //【2】
  14. /// Invoked when a timeout has completed.
  15. fn timeout(&mut self, event_loop: &mut EventLoop<Self>, timeout: Self::Timeout) {
  16. } //【3】
  17. /// Invoked when `EventLoop` has been interrupted by a signal interrupt.
  18. fn interrupted(&mut self, event_loop: &mut EventLoop<Self>) {
  19. } //【4】
  20. /// Invoked at the end of an event loop tick.
  21. fn tick(&mut self, event_loop: &mut EventLoop<Self>) {
  22. } //【5】
  23. }
  24. 复制代码

这里把Poll进行了封装,主要实现了Eventloop::new()---->Poll::new()---->epoll_create()Eventloop::run()--->Selecter::select()---->epoll_wait(),还有register()reregister()deregister()等等......

  1. impl<H: Handler> EventLoop<H> {
  2. /// Constructs a new `EventLoop` using the default configuration values.
  3. /// The `EventLoop` will not be running.
  4. pub fn new() -> io::Result<EventLoop<H>> {
  5. EventLoop::configured(Config::default())
  6. }
  7. fn configured(config: Config) -> io::Result<EventLoop<H>> {
  8. // Create the IO poller
  9. let poll = Poll::new()?; //Linux内部调用epoll_create()
  10. let timer = timer::Builder::default()
  11. .tick_duration(config.timer_tick)
  12. .num_slots(config.timer_wheel_size)
  13. .capacity(config.timer_capacity)
  14. .build();
  15. // Create cross thread notification queue
  16. let (tx, rx) = channel::sync_channel(config.notify_capacity); //这里创建的是同步管道,可配置同步管道内部的buffer queue bound size.
  17. // Register the notification wakeup FD with the IO poller
  18. poll.register(&rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot())?; //NOTIFY和TIMER由mio实现
  19. poll.register(&timer, TIMER, Ready::readable(), PollOpt::edge())?;
  20. Ok(EventLoop {
  21. run: true,
  22. poll: poll,
  23. timer: timer,
  24. notify_tx: tx,
  25. notify_rx: rx,
  26. config: config,
  27. events: Events::with_capacity(1024),
  28. })
  29. }
  30. /// Keep spinning the event loop indefinitely, and notify the handler whenever
  31. /// any of the registered handles are ready.
  32. pub fn run(&mut self, handler: &mut H) -> io::Result<()> {
  33. self.run = true;
  34. while self.run {
  35. // Execute ticks as long as the event loop is running
  36. self.run_once(handler, None)?; //Linux下调用epoll_wait()
  37. }
  38. Ok(())
  39. }
  40. pub fn run_once(&mut self, handler: &mut H, timeout: Option<Duration>) -> io::Result<()> {
  41. trace!("event loop tick");
  42. // Check the registered IO handles for any new events. Each poll
  43. // is for one second, so a shutdown request can last as long as
  44. // one second before it takes effect.
  45. let events = match self.io_poll(timeout) {
  46. Ok(e) => e,
  47. Err(err) => {
  48. if err.kind() == io::ErrorKind::Interrupted {
  49. handler.interrupted(self); //调用Handler::interrupted() 【4
  50. 0
  51. } else {
  52. return Err(err);
  53. }
  54. }
  55. };
  56. self.io_process(handler, events); //处理就绪的事件,handler为如何处理各种事件的实例
  57. handler.tick(self); //一轮事件处理后,最后调用Handler::tick() 调用【5
  58. Ok(())
  59. }
  60. #[inline]
  61. fn io_poll(&mut self, timeout: Option<Duration>) -> io::Result<usize> {
  62. self.poll.poll(&mut self.events, timeout)
  63. }
  64. // Process IO events that have been previously polled
  65. fn io_process(&mut self, handler: &mut H, cnt: usize) {
  66. let mut i = 0;
  67. trace!("io_process(..); cnt={}; len={}", cnt, self.events.len());
  68. // Iterate over the notifications. Each event provides the token
  69. // it was registered with (which usually represents, at least, the
  70. // handle that the event is about) as well as information about
  71. // what kind of event occurred (readable, writable, signal, etc.)
  72. while i < cnt { //遍历所有就绪的事件,进行处理
  73. let evt = self.events.get(i).unwrap();
  74. trace!("event={:?}; idx={:?}", evt, i);
  75. // mio在epoll之上,增加了NOTIFY和TIMER
  76. match evt.token() {
  77. NOTIFY => self.notify(handler), //channel处理 ,这个epoll中是没有的,mio实现
  78. TIMER => self.timer_process(handler), //Timer处理, 这个epoll中也是没有的,mio实现
  79. _ => self.io_event(handler, evt) //IO事件的处理, 这个epoll有
  80. }
  81. i += 1;
  82. }
  83. }
  84. fn io_event(&mut self, handler: &mut H, evt: Event) {
  85. handler.ready(self, evt.token(), evt.readiness()); //调用Handler::ready() 【1
  86. }
  87. fn notify(&mut self, handler: &mut H) {
  88. for _ in 0..self.config.messages_per_tick {
  89. match self.notify_rx.try_recv() { //从channel中接收数据,内部实现是std::sync::mpsc::sync_channel()
  90. Ok(msg) => handler.notify(self, msg), //调用Handler::notify() 【2
  91. _ => break,
  92. }
  93. }
  94. // Re-register
  95. let _ = self.poll.reregister(&self.notify_rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot()); //PollOpt::oneshot(),必须重新reregister.
  96. }
  97. fn timer_process(&mut self, handler: &mut H) {
  98. while let Some(t) = self.timer.poll() {
  99. handler.timeout(self, t); //调用Handler::timeout() 【3
  100. }
  101. }
  102. /// Registers an IO handle with the event loop.
  103. pub fn register<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()>
  104. where E: Evented
  105. {
  106. self.poll.register(io, token, interest, opt)
  107. }
  108. /// Re-Registers an IO handle with the event loop.
  109. pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()>
  110. where E: Evented
  111. {
  112. self.poll.reregister(io, token, interest, opt)
  113. }
  114. /// Deregisters an IO handle with the event loop.
  115. pub fn deregister<E: ?Sized>(&mut self, io: &E) -> io::Result<()> where E: Evented {
  116. self.poll.deregister(io)
  117. }
  118. /// Returns a sender that allows sending messages to the event loop in a
  119. /// thread-safe way, waking up the event loop if needed.
  120. pub fn channel(&self) -> Sender<H::Message> {
  121. Sender::new(self.notify_tx.clone())
  122. }
  123. /// Schedules a timeout after the requested time interval. When the
  124. /// duration has been reached,
  125. pub fn timeout(&mut self, token: H::Timeout, delay: Duration) -> timer::Result<Timeout> {
  126. self.timer.set_timeout(delay, token)
  127. }
  128. /// If the supplied timeout has not been triggered, cancel it such that it
  129. /// will not be triggered in the future.
  130. pub fn clear_timeout(&mut self, timeout: &Timeout) -> bool {
  131. self.timer.cancel_timeout(&timeout).is_some()
  132. }
  133. /// Tells the event loop to exit after it is done handling all events in the current iteration.
  134. pub fn shutdown(&mut self) {
  135. self.run = false;
  136. }
  137. /// Indicates whether the event loop is currently running. If it's not it has either
  138. /// stopped or is scheduled to stop on the next tick.
  139. pub fn is_running(&self) -> bool {
  140. self.run
  141. }
  142. }
  143. 复制代码

【2】Poll代码分析

Poll屏蔽了不同系统的实现,给出了统一的抽象。Poll的实现代码这里只能列出较为重要的部分代码,有一部分代码省略掉了,详细代码可查看mio/src/poll.rs

  1. pub struct Poll {
  2. // Platform specific IO selector
  3. selector: sys::Selector,
  4. // Custom readiness queue
  5. // The second readiness queue is implemented in user space by `ReadinessQueue`. It provides a way to implement purely user space `Evented` types.
  6. readiness_queue: ReadinessQueue, //区别于系统就绪队列(sys::Selector),这是上层自己实现的就绪队列
  7. // Use an atomic to first check if a full lock will be required. This is a
  8. // fast-path check for single threaded cases avoiding the extra syscall
  9. lock_state: AtomicUsize,
  10. // Sequences concurrent calls to `Poll::poll`
  11. lock: Mutex<()>,
  12. // Wakeup the next waiter
  13. condvar: Condvar,
  14. }
  15. impl Poll {
  16. /// Return a new `Poll` handle.
  17. pub fn new() -> io::Result<Poll> {
  18. is_send::<Poll>();
  19. is_sync::<Poll>();
  20. let poll = Poll {
  21. selector: sys::Selector::new()?,
  22. readiness_queue: ReadinessQueue::new()?,
  23. lock_state: AtomicUsize::new(0),
  24. lock: Mutex::new(()),
  25. condvar: Condvar::new(),
  26. };
  27. // Register the notification wakeup FD with the IO poller
  28. poll.readiness_queue.inner.awakener.register(&poll, AWAKEN, Ready::readable(), PollOpt::edge())?;
  29. Ok(poll)
  30. }
  31. /// Wait for readiness events
  32. ///
  33. /// Blocks the current thread and waits for readiness events for any of the
  34. /// `Evented` handles that have been registered with this `Poll` instance.
  35. /// The function will block until either at least one readiness event has
  36. /// been received or `timeout` has elapsed. A `timeout` of `None` means that
  37. /// `poll` will block until a readiness event has been received.
  38. pub fn poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
  39. self.poll1(events, timeout, false) //Poll::poll()非常最重要的一个方法, poll()-->poll1()-->poll2()
  40. }
  41. fn poll1(&self, events: &mut Events, mut timeout: Option<Duration>, interruptible: bool) -> io::Result<usize> {
  42. let zero = Some(Duration::from_millis(0));
  43. let mut curr = self.lock_state.compare_and_swap(0, 1, SeqCst);
  44. if 0 != curr { ... } //{ ... }代表中间有很多代码被省略掉了.
  45. let ret = self.poll2(events, timeout, interruptible);
  46. // Release the lock
  47. if 1 != self.lock_state.fetch_and(!1, Release) { ... } //{ ... }代表中间有很多代码被省略掉了.
  48. ret
  49. }
  50. #[inline]
  51. fn poll2(&self, events: &mut Events, mut timeout: Option<Duration>, interruptible: bool) -> io::Result<usize> {
  52. // Compute the timeout value passed to the system selector. If the
  53. // readiness queue has pending nodes, we still want to poll the system
  54. // selector for new events, but we don't want to block the thread to
  55. // wait for new events.
  56. if timeout == Some(Duration::from_millis(0)) {
  57. // If blocking is not requested, then there is no need to prepare
  58. // the queue for sleep
  59. //
  60. // The sleep_marker should be removed by readiness_queue.poll().
  61. } else if self.readiness_queue.prepare_for_sleep() {
  62. // The readiness queue is empty. The call to `prepare_for_sleep`
  63. // inserts `sleep_marker` into the queue. This signals to any
  64. // threads setting readiness that the `Poll::poll` is going to
  65. // sleep, so the awakener should be used.
  66. } else {
  67. // The readiness queue is not empty, so do not block the thread.
  68. timeout = Some(Duration::from_millis(0));
  69. }
  70. //poll系统就绪队列
  71. loop {
  72. let now = Instant::now();
  73. // First get selector events
  74. let res = self.selector.select(&mut events.inner, AWAKEN, timeout); //Linux下调用epoll_wait(),就绪事件放入events中
  75. match res {
  76. Ok(true) => {
  77. // Some awakeners require reading from a FD.
  78. self.readiness_queue.inner.awakener.cleanup();
  79. break;
  80. }
  81. Ok(false) => break,
  82. Err(ref e) if e.kind() == io::ErrorKind::Interrupted && !interruptible => {
  83. // Interrupted by a signal; update timeout if necessary and retry
  84. if let Some(to) = timeout {
  85. let elapsed = now.elapsed();
  86. if elapsed >= to {
  87. break;
  88. } else {
  89. timeout = Some(to - elapsed);
  90. }
  91. }
  92. }
  93. Err(e) => return Err(e),
  94. }
  95. }
  96. // Poll custom event queue
  97. self.readiness_queue.poll(&mut events.inner); //Poll用户就绪队列
  98. // Return number of polled events
  99. Ok(events.inner.len())
  100. }
  101. /// Register an `Evented` handle with the `Poll` instance.
  102. pub fn register<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>
  103. where E: Evented {
  104. validate_args(token)?;
  105. // Register interests for this socket
  106. handle.register(self, token, interest, opts)?;
  107. Ok(())
  108. }
  109. /// Re-register an `Evented` handle with the `Poll` instance.
  110. pub fn reregister<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>
  111. where E: Evented {
  112. validate_args(token)?;
  113. // Register interests for this socket
  114. handle.reregister(self, token, interest, opts)?;
  115. Ok(())
  116. }
  117. /// Deregister an `Evented` handle with the `Poll` instance.
  118. pub fn deregister<E: ?Sized>(&self, handle: &E) -> io::Result<()>
  119. where E: Evented {
  120. // Deregister interests for this socket
  121. handle.deregister(self)?;
  122. Ok(())
  123. }
  124. }
  125. 复制代码

【3】Selector代码分析

下面这段代码出自mio/src/sys/unix/epoll.rs是对底层Linux系统epoll的封装抽象,可以看到Selector::new()内部实际上调用了epoll_create()Selector::select()内部实际上调用了epoll_wait()register()reregister()deregister()实内部实际上调用了epoll_ctl()。如果你非常熟悉epoll,就会感觉下面的代码很熟悉,详细代码如下:

  1. pub struct Selector {
  2. id: usize,
  3. epfd: RawFd,
  4. }
  5. impl Selector {
  6. pub fn new() -> io::Result<Selector> {
  7. let epfd = unsafe {
  8. // Emulate `epoll_create` by using `epoll_create1` if it's available
  9. // and otherwise falling back to `epoll_create` followed by a call to
  10. // set the CLOEXEC flag.
  11. dlsym!(fn epoll_create1(c_int) -> c_int);
  12. match epoll_create1.get() {
  13. Some(epoll_create1_fn) => {
  14. cvt(epoll_create1_fn(libc::EPOLL_CLOEXEC))?
  15. }
  16. None => {
  17. let fd = cvt(libc::epoll_create(1024))?;
  18. drop(set_cloexec(fd));
  19. fd
  20. }
  21. }
  22. };
  23. // offset by 1 to avoid choosing 0 as the id of a selector
  24. let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
  25. Ok(Selector {
  26. id: id,
  27. epfd: epfd,
  28. })
  29. }
  30. pub fn id(&self) -> usize {
  31. self.id
  32. }
  33. /// Wait for events from the OS
  34. pub fn select(&self, evts: &mut Events, awakener: Token, timeout: Option<Duration>) -> io::Result<bool> {
  35. let timeout_ms = timeout
  36. .map(|to| cmp::min(millis(to), i32::MAX as u64) as i32)
  37. .unwrap_or(-1);
  38. // Wait for epoll events for at most timeout_ms milliseconds
  39. evts.clear();
  40. unsafe {
  41. let cnt = cvt(libc::epoll_wait(self.epfd,
  42. evts.events.as_mut_ptr(),
  43. evts.events.capacity() as i32,
  44. timeout_ms))?;
  45. let cnt = cnt as usize;
  46. evts.events.set_len(cnt);
  47. for i in 0..cnt {
  48. if evts.events[i].u64 as usize == awakener.into() {
  49. evts.events.remove(i);
  50. return Ok(true);
  51. }
  52. }
  53. }
  54. Ok(false)
  55. }
  56. /// Register event interests for the given IO handle with the OS
  57. pub fn register(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
  58. let mut info = libc::epoll_event {
  59. events: ioevent_to_epoll(interests, opts),
  60. u64: usize::from(token) as u64
  61. };
  62. unsafe {
  63. cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_ADD, fd, &mut info))?;
  64. Ok(())
  65. }
  66. }
  67. /// Register event interests for the given IO handle with the OS
  68. pub fn reregister(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
  69. let mut info = libc::epoll_event {
  70. events: ioevent_to_epoll(interests, opts),
  71. u64: usize::from(token) as u64
  72. };
  73. unsafe {
  74. cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_MOD, fd, &mut info))?;
  75. Ok(())
  76. }
  77. }
  78. /// Deregister event interests for the given IO handle with the OS
  79. pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
  80. // The &info argument should be ignored by the system,
  81. // but linux < 2.6.9 required it to be not null.
  82. // For compatibility, we provide a dummy EpollEvent.
  83. let mut info = libc::epoll_event {
  84. events: 0,
  85. u64: 0,
  86. };
  87. unsafe {
  88. cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_DEL, fd, &mut info))?;
  89. Ok(())
  90. }
  91. }
  92. }
  93. 复制代码

【4】Notify channel代码分析

这个涉及的代码比较多,比较杂,也较为难以理解。

  1. // `ReadinessQueue` is backed by a MPSC queue that supports reuse of linked
  2. // list nodes. This significantly reduces the number of required allocations.
  3. // Each `Registration` / `SetReadiness` pair allocates a single readiness node
  4. // that is used for the lifetime of the registration.
  5. //
  6. // The readiness node also includes a single atomic variable, `state` that
  7. // tracks most of the state associated with the registration. This includes the
  8. // current readiness, interest, poll options, and internal state. When the node
  9. // state is mutated, it is queued in the MPSC channel. A call to
  10. // `ReadinessQueue::poll` will dequeue and process nodes. The node state can
  11. // still be mutated while it is queued in the channel for processing.
  12. // Intermediate state values do not matter as long as the final state is
  13. // included in the call to `poll`. This is the eventually consistent nature of
  14. // the readiness queue.
  15. //
  16. // The readiness node is ref counted using the `ref_count` field. On creation,
  17. // the ref_count is initialized to 3: one `Registration` handle, one
  18. // `SetReadiness` handle, and one for the readiness queue. Since the readiness queue
  19. // doesn't *always* hold a handle to the node, we don't use the Arc type for
  20. // managing ref counts (this is to avoid constantly incrementing and
  21. // decrementing the ref count when pushing & popping from the queue). When the
  22. // `Registration` handle is dropped, the `dropped` flag is set on the node, then
  23. // the node is pushed into the registration queue. When Poll::poll pops the
  24. // node, it sees the drop flag is set, and decrements it's ref count.
  25. //
  26. // The MPSC queue is a modified version of the intrusive MPSC node based queue
  27. // described by 1024cores [1].
  28. #[derive(Clone)]
  29. struct ReadinessQueue {
  30. inner: Arc<ReadinessQueueInner>,
  31. }
  32. struct ReadinessQueueInner {
  33. // Used to wake up `Poll` when readiness is set in another thread.
  34. awakener: sys::Awakener,
  35. // Head of the MPSC queue used to signal readiness to `Poll::poll`.
  36. head_readiness: AtomicPtr<ReadinessNode>,
  37. // Tail of the readiness queue.
  38. //
  39. // Only accessed by Poll::poll. Coordination will be handled by the poll fn
  40. tail_readiness: UnsafeCell<*mut ReadinessNode>,
  41. // Fake readiness node used to punctuate the end of the readiness queue.
  42. // Before attempting to read from the queue, this node is inserted in order
  43. // to partition the queue between nodes that are "owned" by the dequeue end
  44. // and nodes that will be pushed on by producers.
  45. end_marker: Box<ReadinessNode>,
  46. // Similar to `end_marker`, but this node signals to producers that `Poll`
  47. // has gone to sleep and must be woken up.
  48. sleep_marker: Box<ReadinessNode>,
  49. // Similar to `end_marker`, but the node signals that the queue is closed.
  50. // This happens when `ReadyQueue` is dropped and signals to producers that
  51. // the nodes should no longer be pushed into the queue.
  52. closed_marker: Box<ReadinessNode>,
  53. }
  54. 复制代码
  1. /// Node shared by a `Registration` / `SetReadiness` pair as well as the node
  2. /// queued into the MPSC channel.
  3. struct ReadinessNode {
  4. // Node state, see struct docs for `ReadinessState`
  5. //
  6. // This variable is the primary point of coordination between all the
  7. // various threads concurrently accessing the node.
  8. state: AtomicState,
  9. // The registration token cannot fit into the `state` variable, so it is
  10. // broken out here. In order to atomically update both the state and token
  11. // we have to jump through a few hoops.
  12. //
  13. // First, `state` includes `token_read_pos` and `token_write_pos`. These can
  14. // either be 0, 1, or 2 which represent a token slot. `token_write_pos` is
  15. // the token slot that contains the most up to date registration token.
  16. // `token_read_pos` is the token slot that `poll` is currently reading from.
  17. //
  18. // When a call to `update` includes a different token than the one currently
  19. // associated with the registration (token_write_pos), first an unused token
  20. // slot is found. The unused slot is the one not represented by
  21. // `token_read_pos` OR `token_write_pos`. The new token is written to this
  22. // slot, then `state` is updated with the new `token_write_pos` value. This
  23. // requires that there is only a *single* concurrent call to `update`.
  24. //
  25. // When `poll` reads a node state, it checks that `token_read_pos` matches
  26. // `token_write_pos`. If they do not match, then it atomically updates
  27. // `state` such that `token_read_pos` is set to `token_write_pos`. It will
  28. // then read the token at the newly updated `token_read_pos`.
  29. token_0: UnsafeCell<Token>,
  30. token_1: UnsafeCell<Token>,
  31. token_2: UnsafeCell<Token>,
  32. // Used when the node is queued in the readiness linked list. Accessing
  33. // this field requires winning the "queue" lock
  34. next_readiness: AtomicPtr<ReadinessNode>,
  35. // Ensures that there is only one concurrent call to `update`.
  36. //
  37. // Each call to `update` will attempt to swap `update_lock` from `false` to
  38. // `true`. If the CAS succeeds, the thread has obtained the update lock. If
  39. // the CAS fails, then the `update` call returns immediately and the update
  40. // is discarded.
  41. update_lock: AtomicBool,
  42. // Pointer to Arc<ReadinessQueueInner>
  43. readiness_queue: AtomicPtr<()>,
  44. // Tracks the number of `ReadyRef` pointers
  45. ref_count: AtomicUsize,
  46. }
  47. 复制代码
  1. /// Handle to a user space `Poll` registration.
  2. ///
  3. /// `Registration` allows implementing [`Evented`] for types that cannot work
  4. /// with the [system selector]. A `Registration` is always paired with a
  5. /// `SetReadiness`, which allows updating the registration's readiness state.
  6. /// When [`set_readiness`] is called and the `Registration` is associated with a
  7. /// [`Poll`] instance, a readiness event will be created and eventually returned
  8. /// by [`poll`].
  9. pub struct Registration {
  10. inner: RegistrationInner,
  11. }
  12. 复制代码
  1. /// Updates the readiness state of the associated `Registration`.
  2. #[derive(Clone)]
  3. pub struct SetReadiness {
  4. inner: RegistrationInner,
  5. }
  6. 复制代码

未完,待续......

参考文档:Intrusive MPSC node-based queue

【5】Timer定时器代码分析

  1. pub struct Timer<T> {
  2. // Size of each tick in milliseconds
  3. tick_ms: u64,
  4. // Slab of timeout entries
  5. entries: Slab<Entry<T>>,
  6. // Timeout wheel. Each tick, the timer will look at the next slot for
  7. // timeouts that match the current tick.
  8. wheel: Vec<WheelEntry>,
  9. // Tick 0's time instant
  10. start: Instant,
  11. // The current tick
  12. tick: Tick,
  13. // The next entry to possibly timeout
  14. next: Token,
  15. // Masks the target tick to get the slot
  16. mask: u64,
  17. // Set on registration with Poll
  18. inner: LazyCell<Inner>,
  19. }
  20. 复制代码

未完,待续......

三、mio用法示例

下面的2个示例都很简单,其实直接看mio的测试代码mio/test/就好了,不用看下面的2个示例。

1、代码示例1

直接使用Poll示例如下:

  1. #[macro_use]
  2. extern crate log;
  3. extern crate simple_logger;
  4. extern crate mio;
  5. use mio::*;
  6. use mio::tcp::{TcpListener, TcpStream};
  7. use std::io::{Read,Write};
  8. fn main() {
  9. simple_logger::init().unwrap();
  10. // Setup some tokens to allow us to identify which event is for which socket.
  11. const SERVER: Token = Token(0);
  12. const CLIENT: Token = Token(1);
  13. let addr = "127.0.0.1:12345".parse().unwrap();
  14. // Setup the server socket
  15. let server = TcpListener::bind(&addr).unwrap();
  16. // Create a poll instance
  17. let poll = Poll::new().unwrap();
  18. // Start listening for incoming connections
  19. poll.register(&server, SERVER, Ready::readable(), PollOpt::edge()).unwrap();
  20. // Setup the client socket
  21. let sock = TcpStream::connect(&addr).unwrap();
  22. // Register the socket
  23. poll.register(&sock, CLIENT, Ready::readable(), PollOpt::edge()).unwrap();
  24. // Create storage for events
  25. let mut events = Events::with_capacity(1024);
  26. loop {
  27. poll.poll(&mut events, None).unwrap();
  28. for event in events.iter() {
  29. match event.token() {
  30. SERVER => {
  31. // Accept and drop the socket immediately, this will close
  32. // the socket and notify the client of the EOF.
  33. let (stream,addr) = server.accept().unwrap();
  34. info!("Listener accept {:?}",addr);
  35. },
  36. CLIENT => {
  37. // The server just shuts down the socket, let's just exit
  38. // from our event loop.
  39. info!("client response.");
  40. return;
  41. },
  42. _ => unreachable!(),
  43. }
  44. }
  45. }
  46. }
  47. 复制代码

通过上面的代码示例1,我们可以看到其用法与epoll非常相似。

2、代码示例2

上面的代码编程时较为麻烦,下面使用事件循环EventLoop的方式,代码能看起来更清晰一些(相对的):

  1. #[macro_use]
  2. extern crate log;
  3. extern crate simple_logger;
  4. extern crate mio;
  5. use mio::*;
  6. use mio::timer::{Timeout};
  7. use mio::deprecated::{EventLoop, Handler, Sender, EventLoopBuilder};
  8. use std::thread;
  9. use std::time::Duration;
  10. fn main() {
  11. simple_logger::init().unwrap();
  12. let mut event_loop=EventLoop::new().unwrap();
  13. let channel_sender=event_loop.channel();
  14. thread::spawn(move ||{
  15. channel_sender.send(IoMessage::Notify);
  16. thread::sleep_ms(5*1000);
  17. channel_sender.send(IoMessage::End);
  18. });
  19. let timeout = event_loop.timeout(Token(123), Duration::from_millis(3000)).unwrap();
  20. let mut handler=MioHandler::new();
  21. let _ = event_loop.run(&mut handler).unwrap();
  22. }
  23. pub enum IoMessage{
  24. Notify,
  25. End,
  26. }
  27. pub struct MioHandler{
  28. }
  29. impl MioHandler{
  30. pub fn new()->Self{
  31. MioHandler{}
  32. }
  33. }
  34. impl Handler for MioHandler {
  35. type Timeout = Token;
  36. type Message = IoMessage;
  37. /// Invoked when the socket represented by `token` is ready to be operated on.
  38. fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: Ready) {
  39. }
  40. /// Invoked when a message has been received via the event loop's channel.
  41. fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Self::Message) {
  42. match msg {
  43. IoMessage::Notify=>info!("channel notify"),
  44. IoMessage::End=>{
  45. info!("shutdown eventloop.");
  46. event_loop.shutdown();
  47. }
  48. }
  49. }
  50. /// Invoked when a timeout has completed.
  51. fn timeout(&mut self, event_loop: &mut EventLoop<Self>, timeout: Self::Timeout) {
  52. match timeout{
  53. Token(123)=>info!("time out."),
  54. Token(_)=>{},
  55. }
  56. }
  57. /// Invoked when `EventLoop` has been interrupted by a signal interrupt.
  58. fn interrupted(&mut self, event_loop: &mut EventLoop<Self>) {
  59. }
  60. /// Invoked at the end of an event loop tick.
  61. fn tick(&mut self, event_loop: &mut EventLoop<Self>) {
  62. }
  63. }
  64. 复制代码

这个示例说明了超时及channel,围绕EventLoop编程,其实与上一个例子没有什么不同,只是EventLoopPoll做了封装。

参考文档:
【譯】Tokio 內部機制:從頭理解 Rust 非同步 I/O 框架
使用mio开发web framework - base
My Basic Understanding of mio and Asynchronous IO
MIO for Rust
mio-github

 
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/代码探险家/article/detail/847211
推荐阅读
相关标签
  

闽ICP备14008679号