Rust 實戰 - 使用套接字聯網API(二)

上一節,咱們已經實現了一個最小可運行版本。之因此使用Rust而不是C,是由於Rust具有了必要的抽象能力,還能得到跟C差很少的性能。這一節,咱們對上一節的代碼作必要的封裝,順便還能把unsafe的代碼包裝成safe的API。html

我將上一節的源碼放到了這裏,你能夠去查看。linux

還記得上一節,咱們把使用到的libc中的函數socketbindconnect和結構體sockaddrsockaddr_inin_addr等,在Rust這邊定義了出來。實際上,幾乎libc中的函數,libc這個crate都幫咱們定義好了。你能夠去這裏查看。編譯器和標準庫自己也使用了這個crate,咱們也使用這個。git

首先在Cargo.toml文件的[dependencies]下面加入libc = "0.2":github

[dependencies]
libc = "0.2"

接着在main.rs文件上方加入use libc;,也能夠use libc as c;。或者你直接簡單粗暴use libc::*,並不推薦這樣,除非你明確知道你使用的函數來自哪裏。並將咱們定義的與libc中對用的常量、函數、結構體刪除。再添加libc::c::到咱們使用那些常量、結構體、函數的地方。若是你是直接use libc::*,除了直接刪除那部分代碼外,幾乎什麼都不用作。目前的代碼:api

use std::ffi::c_void;
use libc as c;

fn main() {
    use std::io::Error;
    use std::mem;
    use std::thread;
    use std::time::Duration;

    thread::spawn(|| {

        // server
        unsafe {
            let socket = c::socket(c::AF_INET, c::SOCK_STREAM, c::IPPROTO_TCP);
            if socket < 0 {
                panic!("last OS error: {:?}", Error::last_os_error());
            }

            let servaddr = c::sockaddr_in {
                sin_family: c::AF_INET as u16,
                sin_port: 8080u16.to_be(),
                sin_addr: c::in_addr {
                    s_addr: u32::from_be_bytes([127, 0, 0, 1]).to_be()
                },
                sin_zero: mem::zeroed()
            };

            let result = c::bind(socket, &servaddr as *const c::sockaddr_in as *const c::sockaddr, mem::size_of_val(&servaddr) as u32);
            if result < 0 {
                println!("last OS error: {:?}", Error::last_os_error());
                c::close(socket);
            }

            c::listen(socket, 128);

            loop {
                let mut cliaddr: c::sockaddr_storage = mem::zeroed();
                let mut len = mem::size_of_val(&cliaddr) as u32;

                let client_socket = c::accept(socket, &mut cliaddr as *mut c::sockaddr_storage as *mut c::sockaddr, &mut len);
                if client_socket < 0 {
                    println!("last OS error: {:?}", Error::last_os_error());
                    break;
                }

                thread::spawn(move || {
                    loop {
                        let mut buf = [0u8; 64];
                        let n = c::read(client_socket, &mut buf as *mut _ as *mut c_void, buf.len());
                        if n <= 0 {
                            break;
                        }

                        println!("{:?}", String::from_utf8_lossy(&buf[0..n as usize]));

                        let msg = b"Hi, client!";
                        let n = c::write(client_socket, msg as *const _ as *const c_void, msg.len());
                        if n <= 0 {
                            break;
                        }
                    }

                    c::close(client_socket);
                });
            }

            c::close(socket);
        }

    });

    thread::sleep(Duration::from_secs(1));

    // client
    unsafe {
        let socket = c::socket(c::AF_INET, c::SOCK_STREAM, c::IPPROTO_TCP);
        if socket < 0 {
            panic!("last OS error: {:?}", Error::last_os_error());
        }

        let servaddr = c::sockaddr_in {
            sin_family: c::AF_INET as u16,
            sin_port: 8080u16.to_be(),
            sin_addr: c::in_addr {
                s_addr: u32::from_be_bytes([127, 0, 0, 1]).to_be()
            },
            sin_zero: mem::zeroed()
        };

        let result = c::connect(socket, &servaddr as *const c::sockaddr_in as *const c::sockaddr, mem::size_of_val(&servaddr) as u32);
        if result < 0 {
            println!("last OS error: {:?}", Error::last_os_error());
            c::close(socket);
        }

        let msg = b"Hello, server!";
        let n = c::write(socket, msg as *const _ as *const c_void, msg.len());
        if n <= 0 {
            println!("last OS error: {:?}", Error::last_os_error());
            c::close(socket);
        }

        let mut buf = [0u8; 64];
        let n = c::read(socket, &mut buf as *mut _ as *mut c_void, buf.len());
        if n <= 0 {
            println!("last OS error: {:?}", Error::last_os_error());
        }

        println!("{:?}", String::from_utf8_lossy(&buf[0..n as usize]));

        c::close(socket);
    }
}

你編譯運行,應該能獲得與上一節一樣的結果。socket

接下來,咱們嘗試把上面代碼中函數,封裝成更具Rust風格的API,除了TCP外,也還要考慮以後把UDP、UNIX域和SCTP也增長進來。同時,咱們跟標準庫裏 net相關的API保持一致的風格。咱們暫時不考慮跨平臺,只考慮Linux,所以能夠大膽的將一些linux獨有的API添加進來。ide

UNIX中一切皆文件,套接字也不例外。字節流套接字上的read和write函數所表現出來的行爲,不一樣於一般的文件I/O。字節流套接字上調用read和write輸入或輸出字節數可能比請求的要少,這個現象的緣由在於內核中用於套接字的緩衝區可能已經達到了極限。不過,這並非咱們正真關心的。咱們來看看標準庫中 File的實現:函數

pub struct File(FileDesc);

impl File {
    ...
    pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
            self.0.read(buf)
    }

    pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
            self.0.write(buf)
    }

    pub fn duplicate(&self) -> io::Result<File> {
            self.0.duplicate().map(File)
    }
    ...
}

File 是一個元組結構體,標準庫已經實現了readwrite,以及duplicateduplicate頗有用,用於複製出一個新的描述符。咱們繼續看File中"包裹的FileDesc:oop

pub struct FileDesc {
    fd: c_int,
}

impl File {
    ...
    pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
            let ret = cvt(unsafe {
               libc::read(self.fd,
                       buf.as_mut_ptr() as *mut c_void,
                       cmp::min(buf.len(), max_len()))
            })?;
            Ok(ret as usize)
    }

    pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
            let ret = cvt(unsafe {
                    libc::write(self.fd,
                        buf.as_ptr() as *const c_void,
                        cmp::min(buf.len(), max_len()))
            })?;
            Ok(ret as usize)
    }

    pub fn set_cloexec(&self) -> io::Result<()> {
            unsafe {
                    cvt(libc::ioctl(self.fd, libc::FIOCLEX))?;
                    Ok(())
            }
    }

    pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
            unsafe {
                    let v = nonblocking as c_int;
                    cvt(libc::ioctl(self.fd, libc::FIONBIO, &v))?;
                    Ok(())
            }
    }
}

這一層應該是到頭了,你能夠看到,Rust中的File也是直接對libc的封裝,不過你不用擔憂,一開始就提到,Rust 的ABI與C的ABI是兼容的,也就意味着Rust和C互相調用是幾乎是零開銷的。FileDescreadwrite中的實現,與咱們以前對sockfdreadwrite基本是同樣的。除了readwrite外,還有兩個頗有用的方法set_cloexecset_nonblocking性能

我把「依附於」某個類型的函數叫作方法,與普通函數不一樣的是,依附於某個類型的函數,必須經過它所依附的類型調用。Rust經過這種方式來實現OOP,可是與某些語言的OOP不一樣的是,Rust的這種實現是零開銷的。也就是,你將一些函數依附到某個類型上,並不會對運行時形成額外的開銷,這些都在編譯時去處理。

set_cloexec方法會對描述符設置FD_CLOEXEC。咱們常常會碰到須要fork子進程的狀況,並且子進程極可能會繼續exec新的程序。對描述符設置FD_CLOEXEC,就意味着,咱們fork子進程時,父子進程中相同的文件描述符指向系統文件表的同一項,可是,咱們若是調用exec執行另外一個程序,此時會用全新的程序替換子進程的正文。爲了較少沒必要要的麻煩,咱們之後要對打開的描述符設置FD_CLOEXEC,除非遇到特殊狀況。

set_nonblocking用於將描述符設置爲非阻塞模式,若是咱們要使用poll、epoll等api的話。

既然標準庫已經封裝好了FileDesc,我想直接使用的,然而FileDesc在標準庫以外是不可見的。若是使用File的話,set_cloexecset_nonblocking 仍是要咱們再寫一次,可是File並非「我本身」的類型,我無法直接給File附加方法,爲此還須要一個額外的Tarit或者用一個「我本身」的類型,去包裹它。挺繞的。那既然這樣,咱們仍是本身來吧。不過咱們已經有了參考,能夠將標準庫裏的FileDecs直接複製出來,而後去掉與Linux無關的代碼,固然你也能夠自由發揮一下。

要注意的是,這段代碼中還調用了一個函數cvt,咱們把相關代碼也複製過來:

use std::io::{self, ErrorKind};

#[doc(hidden)]
pub trait IsMinusOne {
    fn is_minus_one(&self) -> bool;
}

macro_rules! impl_is_minus_one {
    ($($t:ident)*) => ($(impl IsMinusOne for $t {
        fn is_minus_one(&self) -> bool {
            *self == -1
        }
    })*)
}

impl_is_minus_one! { i8 i16 i32 i64 isize }

pub fn cvt<T: IsMinusOne>(t: T) -> io::Result<T> {
    if t.is_minus_one() {
        Err(io::Error::last_os_error())
    } else {
        Ok(t)
    }
}

pub fn cvt_r<T, F>(mut f: F) -> io::Result<T>
    where T: IsMinusOne,
          F: FnMut() -> T
{
    loop {
        match cvt(f()) {
            Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
            other => return other,
        }
    }
}

還記得上一節咱們使用過的last_os_error()方法麼,這段代碼經過宏impl_is_minus_onei32等常見類型實現了IsMinusOne這個Tarit,而後咱們就可使用cvt函數更便捷得調用last_os_error()取得錯誤。 我將這段代碼放到util.rs文件中,並在main.rs文件上方加入pub mod util;

而後再來看FileDesc最終的實現:

use std::mem;
use std::io;
use std::cmp;
use std::os::unix::io::FromRawFd;

use libc as c;

use crate::util::cvt;

#[derive(Debug)]
pub struct FileDesc(c::c_int);

pub fn max_len() -> usize {
    <c::ssize_t>::max_value() as usize
}

impl FileDesc {
    pub fn raw(&self) -> c::c_int {
        self.0
    }

    pub fn into_raw(self) -> c::c_int {
        let fd = self.0;
        mem::forget(self);
        fd
    }

    pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
        let ret = cvt(unsafe {
            c::read(
                self.0,
                buf.as_mut_ptr() as *mut c::c_void,
                cmp::min(buf.len(), max_len())
            )
        })?;

        Ok(ret as usize)
    }

    pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
        let ret = cvt(unsafe {
            c::write(
                self.0,
                buf.as_ptr() as *const c::c_void,
                cmp::min(buf.len(), max_len())
            )
        })?;

        Ok(ret as usize)
    }

    pub fn get_cloexec(&self) -> io::Result<bool> {
        unsafe {
            Ok((cvt(libc::fcntl(self.0, c::F_GETFD))? & libc::FD_CLOEXEC) != 0)
        }
    }

    pub fn set_cloexec(&self) -> io::Result<()> {
        unsafe {
            cvt(c::ioctl(self.0, c::FIOCLEX))?;
            Ok(())
        }
    }

    pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
        unsafe {
            let v = nonblocking as c::c_int;
            cvt(c::ioctl(self.0, c::FIONBIO, &v))?;
            Ok(())
        }
    }

    pub fn duplicate(&self) -> io::Result<FileDesc> {
        cvt(unsafe { c::fcntl(self.0, c::F_DUPFD_CLOEXEC, 0) }).and_then(|fd| {
            let fd = FileDesc(fd);
            Ok(fd)
        })
    }
}

impl FromRawFd for FileDesc {
    unsafe fn from_raw_fd(fd: c::c_int) -> FileDesc {
        FileDesc(fd)
    }
}

impl Drop for FileDesc {
    fn drop(&mut self) {
        let _ = unsafe { c::close(self.0) };
    }
}

我已經將與Linux不相關的代碼刪除掉了。之因此原有duplicate那麼冗長,是由於舊的Linux內核不支持F_DUPFD_CLOEXEC這個設置。fcntl這個函數,用來設置控制文件描述符的選項,咱們稍後還會遇到用來設置和獲取套接字的getsockoptsetsockopt。還有read_atwrite_at等實現比較複雜的函數,咱們用不到,也將他們刪除。還有impl<'a> Read for &'a FileDesc ,由於內部使了一個Unstable的API,我也將其去掉了。

我自由發揮了一下,把:

pub struct FileDesc {
    fd: c_int,
}

替換成了:

pub struct FileDesc(c::c_int);

它們是等效的。不知你注意到沒有,我把pub fn new(...)函數給去掉了,由於這個函數是unsafe的----若是咱們從此將這些代碼做爲庫讓別人使用的話,他可能傳入了一個不存在的描述符,並由此可能引發程序崩潰----但他們並不必定知道。咱們能夠經過在這個函數前面加unsafe來告訴使用者這個函數是unsafe的: pub unsafe fn new(...)。不過,Rust的開發者們已經考慮到了這一點,咱們用約定俗成的from_raw_fd來代替pub unsafe fn new(...),因而纔有了下面這一段:

impl FromRawFd for FileDesc {
    unsafe fn from_raw_fd(fd: c::c_int) -> FileDesc {
        FileDesc(fd)
    }
}

最後,還利用Rust的drop實現了close函數,也就意味着,描述符離開做用域後,會自動close,就再也不須要咱們手動close了。與之先關的是into_raw方法,意思是把FileDesc轉換爲「未加工的」或者說是「裸的」描述符,也就是C的描述符。這個方法裏面調用了forget,以後變量離開做用域後,就不會調用drop了。當你使用這個方法拿到描述符,使用完請不要忘記手動close或者再次from_raw_fd

pub fn into_raw(self) -> c::c_int {
        let fd = self.0;
        mem::forget(self);
        fd
}

我將這段代碼放到了一個新的文件fd.rs中,並在main.rs文件上方加入pub mod fd;

接着,咱們還需一個Socket類型,將socketbindconnect等函數附加上去。這一步應該簡單多了。同時你也會發現,咱們已經把unsafe的代碼,封裝成了safe的代碼。

use std::io;
use std::mem;
use std::os::unix::io::{RawFd, AsRawFd, FromRawFd};

use libc as c;

use crate::fd::FileDesc;
use crate::util::cvt;

pub struct Socket(FileDesc);

impl Socket {
    pub fn new(family: c::c_int, ty: c::c_int, protocol: c::c_int) -> io::Result<Socket> {
        unsafe {
            cvt(c::socket(family, ty | c::SOCK_CLOEXEC, protocol))
                .map(|fd| Socket(FileDesc::from_raw_fd(fd)))
        }
    }

    pub fn bind(&self, storage: *const c::sockaddr, len: c::socklen_t) -> io::Result<()> {
        self.setsockopt(c::SOL_SOCKET, c::SO_REUSEADDR, 1)?;

        cvt(unsafe { c::bind(self.0.raw(), storage, len) })?;

        Ok(())
    }

    pub fn listen(&self, backlog: c::c_int) -> io::Result<()> {
        cvt(unsafe { c::listen(self.0.raw(), backlog) })?;
        Ok(())
    }

    pub fn accept(&self, storage: *mut c::sockaddr, len: *mut c::socklen_t) -> io::Result<Socket> {
        let fd = cvt(unsafe { c::accept4(self.0.raw(), storage, len, c::SOCK_CLOEXEC) })?;
        Ok(Socket(unsafe { FileDesc::from_raw_fd(fd) }))
    }

    pub fn connect(&self, storage: *const c::sockaddr, len: c::socklen_t) -> io::Result<()> {
        cvt(unsafe { c::connect(self.0.raw(), storage, len) })?;
        Ok(())
    }

    pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
        self.0.read(buf)
    }

    pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
        self.0.write(buf)
    }

    pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
        self.0.set_nonblocking(nonblocking)
    }

    pub fn get_cloexec(&self) -> io::Result<bool> {
        self.0.get_cloexec()
    }

    pub fn set_cloexec(&self) -> io::Result<()> {
        self.0.set_cloexec()
    }

    pub fn setsockopt<T>(&self, opt: libc::c_int, val: libc::c_int, payload: T) -> io::Result<()> {
        unsafe {
            let payload = &payload as *const T as *const libc::c_void;

            cvt(libc::setsockopt(
                self.0.raw(),
                opt,
                val,
                payload,
                mem::size_of::<T>() as libc::socklen_t
            ))?;

            Ok(())
        }
    }

    pub fn getsockopt<T: Copy>(&self, opt: libc::c_int, val: libc::c_int) -> io::Result<T> {
        unsafe {
            let mut slot: T = mem::zeroed();
            let mut len = mem::size_of::<T>() as libc::socklen_t;

            cvt(libc::getsockopt(
                self.0.raw(),
                opt,
                val,
                &mut slot as *mut T as *mut libc::c_void,
                &mut len
            ))?;

            assert_eq!(len as usize, mem::size_of::<T>());
            Ok(slot)
        }
    }
}

impl FromRawFd for Socket {
    unsafe fn from_raw_fd(fd: RawFd) -> Socket {
        Socket(FileDesc::from_raw_fd(fd))
    }
}

impl AsRawFd for Socket {
    fn as_raw_fd(&self) -> RawFd {
        self.0.raw()
    }
}

我已經將上一節中咱們使用到的socket相關的主要的5個函數,外加readwrite,等幾個描述符設置的函數,「依附」到了Socket上。保存在 socket.rs 文件裏。

要說明的是,我在newaccept方法中,經過flags直接爲新建立的描述符設置了SOCK_CLOEXEC選項,若是不想一步設置的話,就須要建立出描述符後,再調用set_cloexec方法。bind中,在調用c::bind以前,我給套接字設置了個選項SO_REUSEADDR,意爲容許重用本地地址,這裏不展開講,若是你細心的話就會發現,上一節的例子,若是沒有正常關閉socket的話,就可能會出現error:98,Address already in use,等一下子纔會好。accept4不是個標準的方法,只有Linux才支持,咱們暫時不考慮兼容性。setsockoptgetsockopt方法中涉及到了類型轉換,結合前面的例子,這裏應該難不倒你了。除了from_raw_fd,我還又給Socket實現了又一個約定俗成的方法as_raw_fd

我已經將遠嗎放到了這裏,你能夠去查看。你還能夠嘗試將上一節的例子,修改爲咱們今天封裝的Socket。這一節到這裏就結束了。

相關文章
相關標籤/搜索