Rust mio库源码情景分析
mio 是 Metal IO,Rust语言生态比较底层的I/O库,官网的介绍:
Mio is a lightweight I/O library for Rust with a focus on adding as little overhead as possible over the OS abstractions.
mio目前已经发布了v0.6.19版本,这次分析代码版本选择 master
分支,commit id
14f37f283576040c8763f45de6c2b2bbcb82436d
我们从官方自带的example进行源码跟踪分析。
use std::collections::HashMap;
use std::io::{self, Read, Write};
use std::str::from_utf8;
use mio::event::Event;
use mio::net::{TcpListener, TcpStream};
use mio::{Events, Interests, Poll, Token};
// Setup some tokens to allow us to identify which event is for which socket.
const SERVER: Token = Token(0);
// Some data we'll send over the connection.
const DATA: &[u8] = b"Hello world!\n";
fn main() -> io::Result<()> {
env_logger::init();
// Create a poll instance.
let mut poll = Poll::new()?;
// Create storage for events.
let mut events = Events::with_capacity(128);
// Setup the TCP server socket.
let addr = "127.0.0.1:13265".parse().unwrap();
let server = TcpListener::bind(addr)?;
// Register the server with poll we can receive events for it.
poll.registry()
.register(&server, SERVER, Interests::READABLE)?;
// Map of `Token` -> `TcpStream`.
let mut connections = HashMap::new();
// Unique token for each incoming connection.
let mut unique_token = Token(SERVER.0 + 1);
println!("You can connect to the server using `nc`:");
println!(" $ nc 127.0.0.1 13265");
println!("You'll see our welcome message and anything you type we'll be printed here.");
loop {
poll.poll(&mut events, None)?;
for event in events.iter() {
match event.token() {
SERVER => {
// Received an event for the TCP server socket.
// Accept an connection.
let (connection, address) = server.accept()?;
println!("Accepted connection from: {}", address);
let token = next(&mut unique_token);
poll.registry().register(
&connection,
token,
Interests::READABLE.add(Interests::WRITABLE),
)?;
connections.insert(token, connection);
}
token => {
// (maybe) received an event for a TCP connection.
let done = if let Some(connection) = connections.get_mut(&token) {
handle_connection_event(&mut poll, connection, event)?
} else {
// Sporadic events happen.
false
};
if done {
connections.remove(&token);
}
}
}
}
}
}
fn next(current: &mut Token) -> Token {
let next = current.0;
current.0 += 1;
Token(next)
}
/// Returns `true` if the connection is done.
fn handle_connection_event(
poll: &mut Poll,
connection: &mut TcpStream,
event: &Event,
) -> io::Result<bool> {
if event.is_writable() {
// We can (maybe) write to the connection.
match connection.write(DATA) {
// We want to write the entire `DATA` buffer in a single go. If we
// write less we'll return a short write error (same as
// `io::Write::write_all` does).
Ok(n) if n < DATA.len() => return Err(io::ErrorKind::WriteZero.into()),
Ok(_) => {
// After we've written something we'll reregister the connection
// to only respond to readable events.
poll.registry()
.reregister(&connection, event.token(), Interests::READABLE)?
}
// Would block "errors" are the OS's way of saying that the
// connection is not actually ready to perform this I/O operation.
Err(ref err) if would_block(err) => {}
// Got interrupted (how rude!), we'll try again.
Err(ref err) if interrupted(err) => {
return handle_connection_event(poll, connection, event)
}
// Other errors we'll consider fatal.
Err(err) => return Err(err),
}
}
if event.is_readable() {
let mut connection_closed = false;
let mut received_data = Vec::with_capacity(4096);
// We can (maybe) read from the connection.
loop {
let mut buf = [0; 256];
match connection.read(&mut buf) {
Ok(0) => {
// Reading 0 bytes means the other side has closed the
// connection or is done writing, then so are we.
connection_closed = true;
break;
}
Ok(n) => received_data.extend_from_slice(&buf[..n]),
// Would block "errors" are the OS's way of saying that the
// connection is not actually ready to perform this I/O operation.
Err(ref err) if would_block(err) => break,
Err(ref err) if interrupted(err) => continue,
// Other errors we'll consider fatal.
Err(err) => return Err(err),
}
}
if let Ok(str_buf) = from_utf8(&received_data) {
println!("Received data: {}", str_buf.trim_end());
} else {
println!("Received (none UTF-8) data: {:?}", &received_data);
}
if connection_closed {
println!("Connection closed");
return Ok(true);
}
}
Ok(false)
}
fn would_block(err: &io::Error) -> bool {
err.kind() == io::ErrorKind::WouldBlock
}
fn interrupted(err: &io::Error) -> bool {
err.kind() == io::ErrorKind::Interrupted
}
从main()
方法体开始看:
Poll::new()
pub fn new() -> io::Result<Poll> {
sys::Selector::new().map(|selector| Poll {
registry: Registry { selector },
})
}
看一下Poll的结构体
pub struct Poll {
registry: Registry,
}
/// Registers I/O resources.
pub struct Registry {
selector: sys::Selector,
}
sys::Selector
在不同的操作系统上有不同的实现:
/// `Poll` is backed by the selector provided by the operating system.
///
/// | OS | Selector |
/// |---------------|-----------|
/// | Android | [epoll] |
/// | DragonFly BSD | [kqueue] |
/// | FreeBSD | [kqueue] |
/// | Linux | [epoll] |
/// | NetBSD | [kqueue] |
/// | OpenBSD | [kqueue] |
/// | Solaris | [epoll] |
/// | Windows | [IOCP] |
/// | iOS | [kqueue] |
/// | macOS | [kqueue] |
///
我们挑选Linux的epoll跟踪。源文件在/src/sys/unix/epoll.rs
impl Selector {
pub fn new() -> io::Result<Selector> {
// According to libuv `EPOLL_CLOEXEC` is not defined on Android API <
// 21. But `EPOLL_CLOEXEC` is an alias for `O_CLOEXEC` on all platforms,
// so we use that instead.
syscall!(epoll_create1(libc::O_CLOEXEC)).map(|ep| Selector {
#[cfg(debug_assertions)]
id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
ep,
})
}
...
}
这段代码调用了linux的api epoll_create1()
该接口返回一个int值,表示指向epoll实例的文件描述符。就是代码中的ep
。还涉及到id
自增。 NEXT_ID
的定义:
/// Unique id for use as `SelectorId`.
#[cfg(debug_assertions)]
static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
看一下syscall!
宏。定义的文件在`/src/sys/unix/mod.rs
// Macro must be defined before any modules that uses them.
macro_rules! syscall {
($fn: ident ( $($arg: expr),* $(,)* ) ) => {{
let res = unsafe { libc::$fn($($arg, )*) };
if res == -1 {
Err(io::Error::last_os_error())
} else {
Ok(res)
}
}};
}
就是指定函数名和实际参数,调用libc
下的函数。
TcpListener
接着是 let server = TcpListener::bind(addr)?;
看一看这个bind
方法
/// 1. Create a new TCP socket.
/// 2. Set the `SO_REUSEADDR` option on the socket on Unix.
/// 3. Bind the socket to the specified address.
/// 4. Calls `listen` on the socket to prepare it to receive new connections.
pub fn bind(addr: SocketAddr) -> io::Result<TcpListener> {
sys::TcpListener::bind(addr).map(|sys| TcpListener {
sys,
#[cfg(debug_assertions)]
selector_id: SelectorId::new(),
})
}
注释说明该方法完成了socket编程的4个步骤。由于是调用sys::TcpListener::bind()
,跟进去看
pub fn bind(addr: SocketAddr) -> io::Result<TcpListener> {
new_ip_socket(addr, libc::SOCK_STREAM).and_then(|socket| {
// Set SO_REUSEADDR (mirrors what libstd does).
syscall!(setsockopt(
socket,
libc::SOL_SOCKET,
libc::SO_REUSEADDR,
&1 as *const libc::c_int as *const libc::c_void,
size_of::<libc::c_int>() as libc::socklen_t,
))
.and_then(|_| {
let (raw_addr, raw_addr_length) = socket_addr(&addr);
syscall!(bind(socket, raw_addr, raw_addr_length))
})
.and_then(|_| syscall!(listen(socket, 1024)))
.map_err(|err| {
// Close the socket if we hit an error, ignoring the error
// from closing since we can't pass back two errors.
let _ = unsafe { libc::close(socket) };
err
})
.map(|_| TcpListener {
inner: unsafe { net::TcpListener::from_raw_fd(socket) },
})
})
}
回到主方法继续看
poll.registry().register()
pub fn registry(&self) -> &Registry {
&self.registry
}
真正完成register动作的是&self.registry
pub fn register<S>(&self, source: &S, token: Token, interests: Interests) -> io::Result<()>
where
S: event::Source + ?Sized,
{
trace!(
"registering event source with poller: token={:?}, interests={:?}",
token,
interests
);
source.register(self, token, interests)
}
又调用了source.register()
,看一看source trait
的定义:
pub trait Source {
/// Register `self` with the given `Registry` instance.
///
/// This function should not be called directly. Use [`Registry::register`]
/// instead. Implementors should handle registration by delegating the call
/// to another `Source` type.
///
/// [`Registry::register`]: crate::Registry::register
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()>;
/// Re-register `self` with the given `Registry` instance.
///
/// This function should not be called directly. Use
/// [`Registry::reregister`] instead. Implementors should handle
/// re-registration by either delegating the call to another `Source` type.
///
/// [`Registry::reregister`]: crate::Registry::reregister
fn reregister(&self, registry: &Registry, token: Token, interests: Interests)
-> io::Result<()>;
/// Deregister `self` from the given `Registry` instance.
///
/// This function should not be called directly. Use
/// [`Registry::deregister`] instead. Implementors should handle
/// deregistration by delegating the call to another `Source` type.
///
/// [`Registry::deregister`]: crate::Registry::deregister
fn deregister(&self, registry: &Registry) -> io::Result<()>;
}
三个方法,注册、再次注册、反注册。
example里传入的 source
是 &server:TcpListener
。
impl event::Source for TcpListener {
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
#[cfg(debug_assertions)]
self.selector_id.associate_selector(registry)?;
self.sys.register(registry, token, interests)
}
fn reregister(
&self,
registry: &Registry,
token: Token,
interests: Interests,
) -> io::Result<()> {
self.sys.reregister(registry, token, interests)
}
fn deregister(&self, registry: &Registry) -> io::Result<()> {
self.sys.deregister(registry)
}
}
其中register()
主要涉及到:
self.selector_id.associate_selector(registry)?;
self.sys.register(registry, token, interests)
先看第一个。
impl SelectorId {
pub fn new() -> SelectorId {
SelectorId {
id: AtomicUsize::new(0),
}
}
pub fn associate_selector(&self, registry: &Registry) -> io::Result<()> {
let selector_id = self.id.load(Ordering::SeqCst);
if selector_id != 0 && selector_id != registry.selector.id() {
Err(io::Error::new(
io::ErrorKind::Other,
"socket already registered",
))
} else {
self.id.store(registry.selector.id(), Ordering::SeqCst);
Ok(())
}
}
}
这里就是把registry.selector.id()
赋值给SelectorId.id
。注意后者是AtomicUsize
。
看第二个。看看sys
的类型:
pub struct TcpListener {
sys: sys::TcpListener,
#[cfg(debug_assertions)]
selector_id: SelectorId,
}
sys::TcpListener
的register
方法:
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
SourceFd(&self.as_raw_fd()).register(registry, token, interests)
}
SourceFd()
先看SourceFd()
的第一个入参&self.as_raw_fd()
impl AsRawFd for TcpListener {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}
这里的inner
是net::TcpListener
类型。跟踪到std::net::TcpListner
的实现:
#[stable(feature = "rust1", since = "1.0.0")]
impl AsRawFd for net::TcpListener {
fn as_raw_fd(&self) -> RawFd { *self.as_inner().socket().as_inner() }
}
impl AsInner<net_imp::TcpListener> for TcpListener {
fn as_inner(&self) -> &net_imp::TcpListener { &self.0 }
}
注意到 use crate::sys_common::net as net_imp;
这里稍微梳理一下TcpListener
的依赖关系 。 mio提供的TcpListener
-->sys::TcpListener
--> net::TcpListener
-->sys_common::net::TcpListener
sys_common::net::TcpListener
的socket()
方法
pub fn socket(&self) -> &Socket { &self.inner }
std库的/src/libstd/sys/unix/net.rs
找到socket
的定义:
pub struct Socket(FileDesc);
impl AsInner<c_int> for Socket {
fn as_inner(&self) -> &c_int { self.0.as_inner() }
}
pub struct FileDesc {
fd: c_int,
}
impl AsInner<c_int> for FileDesc {
fn as_inner(&self) -> &c_int { &self.fd }
}
回到SourceFd(&self.as_raw_fd()).register(registry, token, interests)
/// Adapter for [`RawFd`] providing an [`event::Source`] implementation.
///
/// `SourceFd` enables registering any type with an FD with [`Poll`].
///
/// While only implementations for TCP and UDP are provided, Mio supports
/// registering any FD that can be registered with the underlying OS selector.
/// `SourceFd` provides the necessary bridge.
/// ...
pub struct SourceFd<'a>(pub &'a RawFd);
impl<'a> event::Source for SourceFd<'a> {
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
poll::selector(registry).register(*self.0, token, interests)
}
fn reregister(
&self,
registry: &Registry,
token: Token,
interests: Interests,
) -> io::Result<()> {
poll::selector(registry).reregister(*self.0, token, interests)
}
fn deregister(&self, registry: &Registry) -> io::Result<()> {
poll::selector(registry).deregister(*self.0)
}
}
SourceFd
也实现了event::Source
trait。注释中说明很详细,SourceFd
主要是做一个桥接,使得可以注册一个文件描述符到系统的selector
。
为什么要这么绕?因为sys::selector
需要一个文件描述符参数。
poll::selector()
方法的定义:
// ===== Accessors for internal usage =====
pub fn selector(registry: &Registry) -> &sys::Selector {
®istry.selector
}
通过前面的代码我们知道®istry.selector
是sys::Selector
。跟进去看看
pub fn register(&self, fd: RawFd, token: Token, interests: Interests) -> io::Result<()> {
let mut event = libc::epoll_event {
events: interests_to_epoll(interests),
u64: usize::from(token) as u64,
};
syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_ADD, fd, &mut event)).map(|_| ())
}
最终是调用linux系统的
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
其中的参数
self.ep
是sys::Selector::new()
方法里调用int epoll_create1(int flags);
返回的int值,是表示epoll
实例的文件描述符。
op
参数传入的是libc::EPOLL_CTL_ADD
表示本次操作是add
一个fd到interest list
fd
就是刚才分析的SourceFd
持有的RawFd
epoll_event *
参数传入了&mut event
是libc::epoll_event
结构体。
pub struct epoll_event {
pub events: uint32_t,
pub u64: uint64_t,
}
Linux 结构体相关定义:
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
uint32_t
的bit位表示事件类型,和epoll_data_t
存放用户的自定义参数,这里是Token
。
看一下向系统注册了什么interests
fn interests_to_epoll(interests: Interests) -> u32 {
let mut kind = EPOLLET;
if interests.is_readable() {
kind = kind | EPOLLIN | EPOLLRDHUP;
}
if interests.is_writable() {
kind |= EPOLLOUT;
}
kind as u32
}
EPOLLET
表示设置 Edge Triggered 边缘触发模式。
epoll
默认采用Level Triggered水平触发模式。关于两种模式更详细的介绍,可以参考 http://man7.org/linux/man-pages/man7/epoll.7.html
最开始的example里,interests
的实际参数是Interests::READABLE
所以会走到:
if interests.is_readable() {
kind = kind | EPOLLIN | EPOLLRDHUP;
}
这里还添加了EPOLLIN | EPOLLRDHUP
事件。
EPOLLIN
就是文件处于可读性状态,EPOLLRDHUP
表示socket节点关闭了连接,或者关闭了写连接(TCP是双工模式,任何一方都可以同时读写,所以任何一方也可以只关闭读或者写的连接状态)。
从这里我们知道,除了文件可读,文件关闭也会触发就绪事件。
跟踪到这里,我们总算看到了将TcpListener
关联的socket
注册到了epoll
的interest list
。
poll.poll()
总算进入了核心方法
poll.poll(&mut events, None)?;
/// Wait for readiness events
///
/// Blocks the current thread and waits for readiness events for any of the
/// [`event::Source`]s that have been registered with this `Poll` instance.
/// The function will block until either at least one readiness event has
/// been received or `timeout` has elapsed. A `timeout` of `None` means that
/// `poll` will block until a readiness event has been received.
///
pub fn poll(&mut self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
self.registry.selector.select(events.sys(), timeout)
}
注释里说明,如果没有就绪事件的话,该方法阻塞当前线程。timeout
的实参是None
,表示该方法会一直阻塞直到有事件到达。所以外面用一个loop{}
循环包裹该方法。
跟进去真正干活的的select()
方法
pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
let timeout = timeout
.map(|to| cmp::min(to.as_millis(), libc::c_int::max_value() as u128) as libc::c_int)
.unwrap_or(-1);
events.clear();
syscall!(epoll_wait(
self.ep,
events.as_mut_ptr(),
events.capacity() as i32,
timeout,
))
.map(|n_events| {
// This is safe because `epoll_wait` ensures that `n_events` are
// assigned.
unsafe { events.set_len(n_events as usize) };
})
}
events.clear()
然后调用系统的epoll_wait
。如果有就绪事件,返回事件个数,并且将事件设置在events
里。系统的epoll_wait
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
event consume
回到example的代码,如果有客户端连接到server
,代码中的 events
被赋值,
for event in events.iter() {
match event.token() {
SERVER => {
...
},
token =>{
...
}
}
通过匹配event.token()
进入SERVER
的代码块(一开始注册的token就是SERVER
)。看里面的逻辑:
let (connection, address) = server.accept()?;
println!("Accepted connection from: {}", address);
let token = next(&mut unique_token);
poll.registry().register(
&connection,
token,
Interests::READABLE.add(Interests::WRITABLE),
)?;
connections.insert(token, connection);
- 通过
server.accept()
获取connection
实例和客户端address
- 将这个新的
connection
注册到poll
- 将
connection
插入HashMap
方便后面使用
一个一个看。 先看TcpListener
的accept()
:
pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
self.inner.accept().and_then(|(inner, addr)| {
inner
.set_nonblocking(true)
.map(|()| (TcpStream::new(inner), addr))
})
}
跟进去看它的inner
(net::TcpListener)的accept()
方法:
#[stable(feature = "rust1", since = "1.0.0")]
pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
// On WASM, `TcpStream` is uninhabited (as it's unsupported) and so
// the `a` variable here is technically unused.
#[cfg_attr(target_arch = "wasm32", allow(unused_variables))]
self.0.accept().map(|(a, b)| (TcpStream(a), b))
}
又会像之前一个进入sys_common::net::TcpListener
的方法,不再重复跟踪。
创建一个新的token
,然后注册到Poll
。注意register()
方法的参数,event::Source
是&connection:TcpStream
,
Interests
是Interests::READABLE.add(Interests::WRITABLE)
可读就绪与可写就绪。看一看TcpStream
对event::Source
的实现:
impl event::Source for TcpStream {
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
#[cfg(debug_assertions)]
self.selector_id.associate_selector(registry)?;
self.sys.register(registry, token, interests)
}
fn reregister(
&self,
registry: &Registry,
token: Token,
interests: Interests,
) -> io::Result<()> {
self.sys.reregister(registry, token, interests)
}
fn deregister(&self, registry: &Registry) -> io::Result<()> {
self.sys.deregister(registry)
}
}
在register()
方法里,设置了self.selector_id
。通过self.sys.register()
注册到selector
。
pub struct TcpStream {
sys: sys::TcpStream,
#[cfg(debug_assertions)]
selector_id: SelectorId,
}
跟进去sys::TcpStream::register()
impl event::Source for TcpStream {
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
SourceFd(&self.as_raw_fd()).register(registry, token, interests)
}
fn reregister(
&self,
registry: &Registry,
token: Token,
interests: Interests,
) -> io::Result<()> {
SourceFd(&self.as_raw_fd()).reregister(registry, token, interests)
}
fn deregister(&self, registry: &Registry) -> io::Result<()> {
SourceFd(&self.as_raw_fd()).deregister(registry)
}
}
进入了SourceFd()
,上面已经分析过,不再重复。
将connection
插入HashMap
方便后面使用,不需要解释。
通过将TcpStream
注册到Poll
,一旦连接建立稳定,将触发读就绪和写就绪事件。通过event.token()
分流到下面的代码:
token => {
// (maybe) received an event for a TCP connection.
let done = if let Some(connection) = connections.get_mut(&token) {
handle_connection_event(&mut poll, connection, event)?
} else {
// Sporadic events happen.
false
};
if done {
connections.remove(&token);
}
}
if let Some(connection) = connections.get_mut(&token)
通过&token
从HashMap
获取保存的connection
然后进入handle_connection_event()
,该方法判断event.is_writable()
和event.is_readable()
然后分别处理。对一个TcpStream
哪一个事件会先到达?不确定。代码中的两个if
没有先后的依赖关系。
/// Returns `true` if the connection is done.
fn handle_connection_event(
poll: &mut Poll,
connection: &mut TcpStream,
event: &Event,
) -> io::Result<bool> {
if event.is_writable() {
...
}
if event.is_readable() {
...
}
Ok(false)
}
我们从if event.is_writable()
看
if event.is_writable() {
// We can (maybe) write to the connection.
match connection.write(DATA) {
// We want to write the entire `DATA` buffer in a single go. If we
// write less we'll return a short write error (same as
// `io::Write::write_all` does).
Ok(n) if n < DATA.len() => return Err(io::ErrorKind::WriteZero.into()),
Ok(_) => {
// After we've written something we'll reregister the connection
// to only respond to readable events.
poll.registry()
.reregister(&connection, event.token(), Interests::READABLE)?
}
// Would block "errors" are the OS's way of saying that the
// connection is not actually ready to perform this I/O operation.
Err(ref err) if would_block(err) => {}
// Got interrupted (how rude!), we'll try again.
Err(ref err) if interrupted(err) => {
return handle_connection_event(poll, connection, event)
}
// Other errors we'll consider fatal.
Err(err) => return Err(err),
}
}
如果可写就绪,就立即connection.write(DATA)
。客户端会收到"Hello world!\n"
字符,该方法的签名
fn write(&mut self, buf: &[u8]) -> io::Result<usize>;
如果写入字节长度小于DATA.len()
就返回Err。
如果写入成功,重新注册该connection
,只关心Interests::READABLE
可读就绪事件。
// After we've written something we'll reregister the connection
// to only respond to readable events.
poll.registry()
.reregister(&connection, event.token(), Interests::READABLE)?
如果返回的错误是阻塞则什么也不做。
如果返回的是被打断,则递归调用handle_connection_event()
接着看if event.is_readable()
if event.is_readable() {
let mut connection_closed = false;
let mut received_data = Vec::with_capacity(4096);
// We can (maybe) read from the connection.
loop {
let mut buf = [0; 256];
match connection.read(&mut buf) {
Ok(0) => {
// Reading 0 bytes means the other side has closed the
// connection or is done writing, then so are we.
connection_closed = true;
break;
}
Ok(n) => received_data.extend_from_slice(&buf[..n]),
// Would block "errors" are the OS's way of saying that the
// connection is not actually ready to perform this I/O operation.
Err(ref err) if would_block(err) => break,
Err(ref err) if interrupted(err) => continue,
// Other errors we'll consider fatal.
Err(err) => return Err(err),
}
}
if let Ok(str_buf) = from_utf8(&received_data) {
println!("Received data: {}", str_buf.trim_end());
} else {
println!("Received (none UTF-8) data: {:?}", &received_data);
}
if connection_closed {
println!("Connection closed");
return Ok(true);
}
}
有一个loop {}
,其中 connection.read(&mut buf)
方法的签名:
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
(&self.sys).read(buf)
}
如果返回结果为0,表示全部读取完毕。connection_closed = true;
对方已经关闭连接。
如果返回n,说明读取到客户端的数据,将数据追加到received_data
。
如果是阻塞,则跳出loop
循环。
如果是被打断,则继续循环。
将读取到的数据打印出来,如果connection_closed = true;
则返回Ok(true)
,否则后面会默认返回 Ok(false)
。
大部分情况下,可读就绪,但是客户端没有任何数据输入会走阻塞,跳出loop
那个分支。
回到外层的方法:
let done = ... {
handle_connection_event(&mut poll, connection, event)?
} else {
// Sporadic events happen.
false
};
if done {
connections.remove(&token);
}
如果刚才返回了Ok(true)
,会connections.remove(&token);
Waker
上面的分析没涉及到的,有一个/src/waker.rs
需要提一下。Waker
允许跨线程唤醒Poll
。
#[derive(Debug)]
pub struct Waker {
inner: sys::Waker,
}
impl Waker {
/// Create a new `Waker`.
pub fn new(registry: &Registry, token: Token) -> io::Result<Waker> {
sys::Waker::new(poll::selector(®istry), token).map(|inner| Waker { inner })
}
/// Wake up the [`Poll`] associated with this `Waker`.
///
/// [`Poll`]: crate::Poll
pub fn wake(&self) -> io::Result<()> {
self.inner.wake()
}
}
定义比较简单,也就两个方法。我们看看Linux下的实现:
#[cfg(any(target_os = "linux", target_os = "android"))]
mod eventfd {
use crate::sys::Selector;
use crate::{Interests, Token};
use std::fs::File;
use std::io::{self, Read, Write};
use std::os::unix::io::FromRawFd;
/// Waker backed by `eventfd`.
///
/// `eventfd` is effectively an 64 bit counter. All writes must be of 8
/// bytes (64 bits) and are converted (native endian) into an 64 bit
/// unsigned integer and added to the count. Reads must also be 8 bytes and
/// reset the count to 0, returning the count.
#[derive(Debug)]
pub struct Waker {
fd: File,
}
impl Waker {
pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK)).and_then(|fd| {
// Turn the file descriptor into a file first so we're ensured
// it's closed when dropped, e.g. when register below fails.
let file = unsafe { File::from_raw_fd(fd) };
selector
.register(fd, token, Interests::READABLE)
.map(|()| Waker { fd: file })
})
}
pub fn wake(&self) -> io::Result<()> {
let buf: [u8; 8] = 1u64.to_ne_bytes();
match (&self.fd).write(&buf) {
Ok(_) => Ok(()),
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
// Writing only blocks if the counter is going to overflow.
// So we'll reset the counter to 0 and wake it again.
self.reset()?;
self.wake()
}
Err(err) => Err(err),
}
}
/// Reset the eventfd object, only need to call this if `wake` fails.
fn reset(&self) -> io::Result<()> {
let mut buf: [u8; 8] = 0u64.to_ne_bytes();
match (&self.fd).read(&mut buf) {
Ok(_) => Ok(()),
// If the `Waker` hasn't been awoken yet this will return a
// `WouldBlock` error which we can safely ignore.
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Ok(()),
Err(err) => Err(err),
}
}
}
}
#[cfg(any(target_os = "linux", target_os = "android"))]
pub use self::eventfd::Waker;
可以看到,Waker
的new()
是调用了系统的eventfd()
方法。
int eventfd(unsigned int initval, int flags);
eventfd()
方法创建一个文件描述符用于事件(event)通知。既可以用于用户空间的应用间通知,还可以用于内核空间事件通知用户空间的应用。实际上是内核空间维护的一个64位的int counter,表示“eventfd object”。
initval
参数是初始化的值,flags
包括:
EFD_CLOEXEC
表示fork子进程时不继承。
EFD_NONBLOCK
通常会设置成O_NONBLOCK
,如果不设置,read
可能会阻塞。
EFD_SEMAPHORE
支持semophore语义的read。
这个"eventfd object"的相关操作很简单,write()
会修改counter
的值(累加),read()
读取counter
的值,然后清零(如果是semophore模式,就减1)。
上面的源码中,通过eventfd()
创建了fd
,然后将该fd
注册到了sys::selector
(实际上就是epoll
),Interests
是可读就绪。
selector
.register(fd, token, Interests::READABLE)
.map(|()| Waker { fd: file })
接着看它的wake()
pub fn wake(&self) -> io::Result<()> {
let buf: [u8; 8] = 1u64.to_ne_bytes();
match (&self.fd).write(&buf) {
Ok(_) => Ok(()),
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
// Writing only blocks if the counter is going to overflow.
// So we'll reset the counter to 0 and wake it again.
self.reset()?;
self.wake()
}
Err(err) => Err(err),
}
}
就是很简单地写入一个64位的1。会被加到当前的 counter
上。
write(&buf)
的结果正常都会 OK
。但是当超过了counter
的最大值0xfffffffffffffffe
会导致int_64
溢出,本次写操作会阻塞直到有读操作,如果文件设置了非阻塞(上面源码设置的libc::EFD_NONBLOCK
),会返回EAGAIN
的错误。
这里只处理了io::ErrorKind::WouldBlock
的错误,是不是存在问题?
并不是。按照POSIX的标准,Linux里面对错误的定义是:
All the error names specified by POSIX.1 must have distinct values, with the exception of EAGAIN and EWOULDBLOCK, which may be the same. On Linux, these two have the same value on all architectures.
是同一个错误值,而Rust目前只定义了io::ErrorKind::WouldBlock
来统一表示EWOULDBLOCK
和EAGAIN
,在有些场景下会引发混乱。
在freeBSD
、macOS
系统上,通过kqueue()
,kevent()
实现Waker
。
openbsd
,solaris
系统上通过pipe
实现Waker
。
windows
环境下通过 IOCP
( I/O Completion Port
)实现。
最后
本文对mio的核心流程做了一个概要性分析,由于只关注核心流程,很多细节并没有展开。
mio的代码整体上小巧精悍,大都是对底层操作系统的很薄的一层包装。这也符合自身的定位。
with a focus on adding as little overhead as possible
核心逻辑围绕sys::selector
、event::source
展开。Poll
是一个Facade Pattern
。
本次分析没有涉及到udp, 可自行分析,代码在 /src/net/udp.rs
window平台相关代码在/src/sys/windows
。
分析完mio,也就为分析tokio代码打好了基础。