目标:
tcp 服务器和客户端udp服务器和客户端多线程实现服务器,异步并发channel
tcp server & client
use std::{prelude::*, net, thread, env};
use std::io::{Read, Write};
fn main() {
let args = env::args().into_iter().collect::
if args.len()>1 {
match args[1].as_str() {
"-s" => tcp_server(),
"-c" => tcp_client(),
_ => println!("unknown cmd {}", args[1]),
}
}else{
println!("Usage:\r\n\t-s Open Tcp Server.\r\n\t-c Open Tcp client to connect the server.")
}
}
fn tcp_server() {
let s = net::TcpListener::bind("0.0.0.0:8000").unwrap();
println!("Listen on addr: {}", s.local_addr().unwrap().to_string());
for req in s.incoming() {
if let Ok(req_s) = req {
_ = req_s.set_nodelay(true);
thread::spawn( move || {
handler_tcp(req_s);
});
}
}
}
fn handler_tcp(mut c: net::TcpStream) {
let mut buf = [0u8;1024];
let info = format!("[{:?}] => client in: {}", thread::current().id(), c.peer_addr().unwrap().to_string());
let n = c.read(&mut buf).unwrap();
println!("{} {}", n, String::from_utf8_lossy(&buf[..n]));
println!("{}", info);
_ = c.write(format!("HTTP/1.1 200 OK\r\n\r\n{}\r\n", info).as_bytes());
}
fn tcp_client(){
let mut c = net::TcpStream::connect("127.0.0.1:8000").unwrap();
c.set_nodelay(true).unwrap();
_ = c.write("GET / HTTP/1.1\r\nAccept: */*\r\n\r\n".as_bytes());
let mut strbuf = String::new();
_ = c.read_to_string(&mut strbuf);
println!("resp: {}", strbuf);
}
异步版本
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
fn main() {
// Listen for incoming TCP connections on localhost port 7878
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
// Block forever, handling each request that arrives at this IP address
for stream in listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
}
fn handle_connection(mut stream: TcpStream) {
// Read the first 1024 bytes of data from the stream
let mut buffer = [0; 1024];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\r\n";
// Respond with greetings or a 404,
// depending on the data in the request
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
// Write response back to the stream,
// and flush the stream to ensure the response is sent back to the client
let response = format!("{status_line}{contents}");
stream.write_all(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
改写
[package]
name = "d5"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
futures="*"
[dependencies.async-std]
version = "*"
features = ["attributes"]
use async_std::net::TcpListener;
use async_std::net::TcpStream;
use futures::stream::StreamExt;
use async_std::task::spawn;
use async_std::prelude::*;
use std::fs;
#[async_std::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
listener
.incoming()
.for_each_concurrent(/* limit */ None, |tcpstream| async move {
let tcpstream = tcpstream.unwrap();
//spawn(handle_connection(stream));
handle_connection(tcpstream).await;
})
.await;
}
async fn handle_connection(mut stream: TcpStream) {
// Read the first 1024 bytes of data from the stream
let mut buffer = [0; 1024];
stream.read(&mut buffer).await.unwrap();
let get = b"GET / HTTP/1.1\r\n";
// Respond with greetings or a 404,
// depending on the data in the request
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
// Write response back to the stream,
// and flush the stream to ensure the response is sent back to the client
let response = format!("{status_line}{contents}");
stream.write_all(response.as_bytes()).await.unwrap();
stream.flush().await.unwrap();
}
UDP
广播使用广播地址255.255.255.255,将消息发送到在同一广播网络上的每个主机,广播仅仅在同一局域网上才能进行,但是广播还是要指明接收者的端口号的 多播,也称为“组播”,与单播一样,多播是允许在广域网即Internet上进行传输的,多播的地址是特定的,D类地址用于多播。即224.0.0.0至239.255.255.255之间的IP地址。 1、局部多播地址:在224.0.0.0~224.0.0.255之间,这是为路由协议和其他用途保留的地址,路由器并不转发属于此范围的IP包。 2、预留多播地址:在224.0.1.0~238.255.255.255之间,可用于全球范围(如Internet)或网络协议。 3、管理权限多播地址:在239.0.0.0~239.255.255.255之间,可供组织内部使用,类似于私有IP地址,不能用于Internet,可限制多播范围。 单播的数据只是收发数据的特定主机进行处理,根据地址不同,可以跨越局域网进行internet通讯。
单播流程:主机A向主机B发送UDP数据报,发送的目的IP为192.168.1.151,端口为 80,目的MAC地址为00:00:00:00:00:02。广播的流程:主机A向整个网络发送广播数据,发送的目的IP为192.168.1.255,端口为 80,目的MAC地址为FF:FF:FF:FF:FF:FF。
use std::{prelude::*, net, thread, env};
use std::io::{Read, Write};
fn main() {
let args = env::args().into_iter().collect::
if args.len()>1 {
match args[1].as_str() {
"-s" => udp_server(),
"-c" => udp_client(),
_ => println!("unknown cmd {}", args[1]),
}
}else{
println!("Usage:\r\n\t-s Open UDP Server.\r\n\t-c Open UDP client to connect the server.")
}
}
fn udp_server() {
let s = net::UdpSocket::bind("0.0.0.0:8000").unwrap();
println!("Listen on addr: {}", s.local_addr().unwrap().to_string());
let mut buf = [0u8; 1024];
loop{
if let Ok((n, addr)) = s.recv_from(&mut buf) {
println!("addr: {}, content: {}", addr.to_string(), String::from_utf8_lossy(&buf[..n]));
_ = s.send_to(format!("ok! you are: {}", addr.to_string()).as_bytes(), addr);
}
}
}
fn udp_client(){
// 不指定地址
let mut c = net::UdpSocket::bind("0.0.0.0:0").unwrap();
println!("client addr: {}", c.local_addr().unwrap().to_string());
let s_addr = "127.0.0.1:8000";
let mut buf = [0u8; 1024];
for i in 0..10 {
_ = c.send_to(format!("client: I'm coming! on {}!", i).as_bytes(), s_addr);
if let Ok((n, addr)) = c.recv_from(&mut buf) {
println!("client: s addr: {}, content: {}", addr.to_string(), String::from_utf8_lossy(&buf[..n]));
}
}
}
Channel
通道(Channel)是一种用于在多个线程之间传递数据的并发原语。通道提供了一种安全且高效的方式,允许线程之间进行通信和同步。
我们可以使用 std::sync::mpsc 模块提供的 channel 函数来创建一个通道。mpsc 是“多个生产者,单个消费者”(Multiple Producers, Single Consumer)的缩写,意味着多个线程可以同时向通道发送数据,但只有一个线程可以从通道接收数据。
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// 创建通道,返回发送者和接收者
// 默认是异步的,不阻塞对方。
// mpsc::sync_channel(0) 就是同步的,缓冲区满了就会阻塞
let (tx, rx) = mpsc::channel();
// thread 1
let tx2 = tx.clone();
thread::spawn(move || {
for i in 0..30 {
let message = format!("2 - {}. Hello from the sender!", i);
tx2.send(message).unwrap();
thread::sleep(Duration::from_secs(1));
}
println!("Thread 2 End");
// drop(tx2);
});
// thread 2
thread::spawn(move || {
for i in 0..20 {
let message = format!("1 - {}. Hello from the sender!", i);
tx.send(message).unwrap();
thread::sleep(Duration::from_secs(1));
}
println!("Thread 1 End");
// drop(tx);
});
// 在主线程接收数据
// for received in rx {
// println!("Got: {}", received);
// }
loop {
if let Ok(r) = rx.recv() {
println!("Received: {}", r);
}else{
println!("Receiced error!");
break;
}
}
println!("End");
}
一个常见的坑
use std::sync::mpsc;
fn main() {
use std::thread;
let (send, recv) = mpsc::channel();
let num_threads = 3;
for i in 0..num_threads {
let thread_send = send.clone();
thread::spawn(move || {
thread_send.send(i).unwrap();
println!("thread {:?} finished", i);
});
}
// 在这里如果不drop,会因为Send的生命周期一直到main结束,所以recv也不会结束,所以无法退出
// drop(send);
for x in recv {
println!("Got: {}", x);
}
println!("finished iterating");
}
精彩链接
发表评论