目标:

tcp 服务器和客户端udp服务器和客户端多线程实现服务器,异步并发channel

tcp server & client

use std::{prelude::*, net, thread, env};

use std::io::{Read, Write};

fn main() {

let args = env::args().into_iter().collect::>();

if args.len()>1 {

match args[1].as_str() {

"-s" => tcp_server(),

"-c" => tcp_client(),

_ => println!("unknown cmd {}", args[1]),

}

}else{

println!("Usage:\r\n\t-s Open Tcp Server.\r\n\t-c Open Tcp client to connect the server.")

}

}

fn tcp_server() {

let s = net::TcpListener::bind("0.0.0.0:8000").unwrap();

println!("Listen on addr: {}", s.local_addr().unwrap().to_string());

for req in s.incoming() {

if let Ok(req_s) = req {

_ = req_s.set_nodelay(true);

thread::spawn( move || {

handler_tcp(req_s);

});

}

}

}

fn handler_tcp(mut c: net::TcpStream) {

let mut buf = [0u8;1024];

let info = format!("[{:?}] => client in: {}", thread::current().id(), c.peer_addr().unwrap().to_string());

let n = c.read(&mut buf).unwrap();

println!("{} {}", n, String::from_utf8_lossy(&buf[..n]));

println!("{}", info);

_ = c.write(format!("HTTP/1.1 200 OK\r\n\r\n{}\r\n", info).as_bytes());

}

fn tcp_client(){

let mut c = net::TcpStream::connect("127.0.0.1:8000").unwrap();

c.set_nodelay(true).unwrap();

_ = c.write("GET / HTTP/1.1\r\nAccept: */*\r\n\r\n".as_bytes());

let mut strbuf = String::new();

_ = c.read_to_string(&mut strbuf);

println!("resp: {}", strbuf);

}

异步版本

use std::fs;

use std::io::prelude::*;

use std::net::TcpListener;

use std::net::TcpStream;

fn main() {

// Listen for incoming TCP connections on localhost port 7878

let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

// Block forever, handling each request that arrives at this IP address

for stream in listener.incoming() {

let stream = stream.unwrap();

handle_connection(stream);

}

}

fn handle_connection(mut stream: TcpStream) {

// Read the first 1024 bytes of data from the stream

let mut buffer = [0; 1024];

stream.read(&mut buffer).unwrap();

let get = b"GET / HTTP/1.1\r\n";

// Respond with greetings or a 404,

// depending on the data in the request

let (status_line, filename) = if buffer.starts_with(get) {

("HTTP/1.1 200 OK\r\n\r\n", "hello.html")

} else {

("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")

};

let contents = fs::read_to_string(filename).unwrap();

// Write response back to the stream,

// and flush the stream to ensure the response is sent back to the client

let response = format!("{status_line}{contents}");

stream.write_all(response.as_bytes()).unwrap();

stream.flush().unwrap();

}

改写

[package]

name = "d5"

version = "0.1.0"

edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]

futures="*"

[dependencies.async-std]

version = "*"

features = ["attributes"]

use async_std::net::TcpListener;

use async_std::net::TcpStream;

use futures::stream::StreamExt;

use async_std::task::spawn;

use async_std::prelude::*;

use std::fs;

#[async_std::main]

async fn main() {

let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();

listener

.incoming()

.for_each_concurrent(/* limit */ None, |tcpstream| async move {

let tcpstream = tcpstream.unwrap();

//spawn(handle_connection(stream));

handle_connection(tcpstream).await;

})

.await;

}

async fn handle_connection(mut stream: TcpStream) {

// Read the first 1024 bytes of data from the stream

let mut buffer = [0; 1024];

stream.read(&mut buffer).await.unwrap();

let get = b"GET / HTTP/1.1\r\n";

// Respond with greetings or a 404,

// depending on the data in the request

let (status_line, filename) = if buffer.starts_with(get) {

("HTTP/1.1 200 OK\r\n\r\n", "hello.html")

} else {

("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")

};

let contents = fs::read_to_string(filename).unwrap();

// Write response back to the stream,

// and flush the stream to ensure the response is sent back to the client

let response = format!("{status_line}{contents}");

stream.write_all(response.as_bytes()).await.unwrap();

stream.flush().await.unwrap();

}

UDP

广播使用广播地址255.255.255.255,将消息发送到在同一广播网络上的每个主机,广播仅仅在同一局域网上才能进行,但是广播还是要指明接收者的端口号的 多播,也称为“组播”,与单播一样,多播是允许在广域网即Internet上进行传输的,多播的地址是特定的,D类地址用于多播。即224.0.0.0至239.255.255.255之间的IP地址。 1、局部多播地址:在224.0.0.0~224.0.0.255之间,这是为路由协议和其他用途保留的地址,路由器并不转发属于此范围的IP包。 2、预留多播地址:在224.0.1.0~238.255.255.255之间,可用于全球范围(如Internet)或网络协议。 3、管理权限多播地址:在239.0.0.0~239.255.255.255之间,可供组织内部使用,类似于私有IP地址,不能用于Internet,可限制多播范围。 单播的数据只是收发数据的特定主机进行处理,根据地址不同,可以跨越局域网进行internet通讯。

单播流程:主机A向主机B发送UDP数据报,发送的目的IP为192.168.1.151,端口为 80,目的MAC地址为00:00:00:00:00:02。广播的流程:主机A向整个网络发送广播数据,发送的目的IP为192.168.1.255,端口为 80,目的MAC地址为FF:FF:FF:FF:FF:FF。

use std::{prelude::*, net, thread, env};

use std::io::{Read, Write};

fn main() {

let args = env::args().into_iter().collect::>();

if args.len()>1 {

match args[1].as_str() {

"-s" => udp_server(),

"-c" => udp_client(),

_ => println!("unknown cmd {}", args[1]),

}

}else{

println!("Usage:\r\n\t-s Open UDP Server.\r\n\t-c Open UDP client to connect the server.")

}

}

fn udp_server() {

let s = net::UdpSocket::bind("0.0.0.0:8000").unwrap();

println!("Listen on addr: {}", s.local_addr().unwrap().to_string());

let mut buf = [0u8; 1024];

loop{

if let Ok((n, addr)) = s.recv_from(&mut buf) {

println!("addr: {}, content: {}", addr.to_string(), String::from_utf8_lossy(&buf[..n]));

_ = s.send_to(format!("ok! you are: {}", addr.to_string()).as_bytes(), addr);

}

}

}

fn udp_client(){

// 不指定地址

let mut c = net::UdpSocket::bind("0.0.0.0:0").unwrap();

println!("client addr: {}", c.local_addr().unwrap().to_string());

let s_addr = "127.0.0.1:8000";

let mut buf = [0u8; 1024];

for i in 0..10 {

_ = c.send_to(format!("client: I'm coming! on {}!", i).as_bytes(), s_addr);

if let Ok((n, addr)) = c.recv_from(&mut buf) {

println!("client: s addr: {}, content: {}", addr.to_string(), String::from_utf8_lossy(&buf[..n]));

}

}

}

Channel

通道(Channel)是一种用于在多个线程之间传递数据的并发原语。通道提供了一种安全且高效的方式,允许线程之间进行通信和同步。

我们可以使用 std::sync::mpsc 模块提供的 channel 函数来创建一个通道。mpsc 是“多个生产者,单个消费者”(Multiple Producers, Single Consumer)的缩写,意味着多个线程可以同时向通道发送数据,但只有一个线程可以从通道接收数据。

use std::sync::mpsc;

use std::thread;

use std::time::Duration;

fn main() {

// 创建通道,返回发送者和接收者

// 默认是异步的,不阻塞对方。

// mpsc::sync_channel(0) 就是同步的,缓冲区满了就会阻塞

let (tx, rx) = mpsc::channel();

// thread 1

let tx2 = tx.clone();

thread::spawn(move || {

for i in 0..30 {

let message = format!("2 - {}. Hello from the sender!", i);

tx2.send(message).unwrap();

thread::sleep(Duration::from_secs(1));

}

println!("Thread 2 End");

// drop(tx2);

});

// thread 2

thread::spawn(move || {

for i in 0..20 {

let message = format!("1 - {}. Hello from the sender!", i);

tx.send(message).unwrap();

thread::sleep(Duration::from_secs(1));

}

println!("Thread 1 End");

// drop(tx);

});

// 在主线程接收数据

// for received in rx {

// println!("Got: {}", received);

// }

loop {

if let Ok(r) = rx.recv() {

println!("Received: {}", r);

}else{

println!("Receiced error!");

break;

}

}

println!("End");

}

一个常见的坑

use std::sync::mpsc;

fn main() {

use std::thread;

let (send, recv) = mpsc::channel();

let num_threads = 3;

for i in 0..num_threads {

let thread_send = send.clone();

thread::spawn(move || {

thread_send.send(i).unwrap();

println!("thread {:?} finished", i);

});

}

// 在这里如果不drop,会因为Send的生命周期一直到main结束,所以recv也不会结束,所以无法退出

// drop(send);

for x in recv {

println!("Got: {}", x);

}

println!("finished iterating");

}

精彩链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。