diff --git a/tcp/src/bin/server.rs b/tcp/src/bin/server.rs index 59a4388..237420a 100644 --- a/tcp/src/bin/server.rs +++ b/tcp/src/bin/server.rs @@ -1,11 +1,9 @@ // src/main.rs +use io_uring::{opcode, types, IoUring}; + use nix::sys::socket; -use nix::poll; -use nix::sys::socket::MsgFlags; -use std::os::fd::FromRawFd; use std::os::fd::AsRawFd; -use std::os::fd::AsFd; fn main() { let tcp_soc = socket::socket( @@ -27,43 +25,61 @@ fn main() { ).unwrap(); println!("Listening: {:?}", listening); - // waiting in here until socket becomes readable - let pollfd = poll::PollFd::new(tcp_soc.as_fd(), poll::PollFlags::POLLIN); - let mut fds = [pollfd]; + // ASYNC + let mut ring = IoUring::new(8).unwrap(); + let (submitter, mut sq, mut cq) = ring.split(); + let read_e = opcode::Accept::new(types::Fd(rawfd), std::ptr::null_mut(), std::ptr::null_mut()) + .build() + .user_data(0); + + unsafe { + sq.push(&read_e).unwrap(); + } + sq.sync(); + submitter.submit().unwrap(); + std::thread::sleep(std::time::Duration::from_millis(100)); let accepted; loop { - poll::poll(&mut fds, poll::PollTimeout::NONE).unwrap(); - let revents = fds[0].revents(); // what happened? - - if revents.unwrap().contains(poll::PollFlags::POLLIN) { - accepted = socket::accept(rawfd).unwrap(); + cq.sync(); + if let Some(cqe) = cq.next() { + accepted = cqe.result(); println!("Accepted: {:?}", accepted); break; } + std::thread::sleep(std::time::Duration::from_millis(10)); } - let client_fd = unsafe { std::os::fd::OwnedFd::from_raw_fd(accepted) }; - let pollfd2 = poll::PollFd::new(client_fd.as_fd(), poll::PollFlags::POLLIN); - let mut fds2 = [pollfd2]; - + let mut buf = [0u8; 7]; loop { - poll::poll(&mut fds2, poll::PollTimeout::NONE).unwrap(); - let mut buf = [0u8; 7]; - let received = socket::recv( - accepted, - &mut buf, - MsgFlags::empty() - ).unwrap(); - if received != 0 { - // println!("Recv: {}", received); - // println!("Bytes: {:?}", buf); - // println!("Recv msg: {:?}", std::str::from_utf8(&buf).unwrap_or("")); - socket::send( - accepted, - &buf, - MsgFlags::empty() - ); - } - } + let recv_e = opcode::Recv::new(types::Fd(accepted), buf.as_mut_ptr(), buf.len() as u32) + .build() + .user_data(1); + unsafe { + sq.push(&recv_e).unwrap(); + } + + sq.sync(); + submitter.submit().unwrap(); + loop { + cq.sync(); + if let Some(cqe) = cq.next() { + if cqe.user_data() == 1 { + break; + } + } + std::thread::sleep(std::time::Duration::from_millis(10)); + } + + let send_e = opcode::Send::new(types::Fd(accepted), buf.as_ptr(), buf.len() as u32) + .build() + .user_data(2); + + unsafe { + sq.push(&send_e).unwrap(); + } + + sq.sync(); + submitter.submit().unwrap(); + } }