Rust异步实战:从0到1,用Tokio打造一个高性能并发聊天室你是否曾对Discord、Slack这类高并发即时通讯应用的底层技术感到好奇?或者在学习Rust时,面对强大的Tokio异步运行时,感觉理论知识丰富,却不知如何下手实践?别担心!本文将是一篇极致的实战指南,我们将告
你是否曾对 Discord、Slack 这类高并发即时通讯应用的底层技术感到好奇?或者在学习 Rust 时,面对强大的 Tokio 异步运行时,感觉理论知识丰富,却不知如何下手实践?
别担心!本文将是一篇极致的实战指南,我们将告别枯燥的理论。通过从零开始、一步步构建一个功能完善的 TCP 聊天服务器,你不仅能深入理解 Tokio 的核心工作模式,还将学会如何利用 tokio-console
对异步任务进行可视化调试,甚至使用 loom
这一并发测试神器来验证代码的线程安全性。
准备好了吗?让我们一起动手,用代码真正“看见”并征服 Rust 异步世界!
use std::{fmt, net::SocketAddr, sync::Arc};
use anyhow::Result;
use dashmap::DashMap;
use futures::{SinkExt, StreamExt, stream::SplitStream};
use tokio::{
net::{TcpListener, TcpStream},
sync::mpsc,
};
use tokio_util::codec::{Framed, LinesCodec};
use tracing::{info, level_filters::LevelFilter, warn};
use tracing_subscriber::{Layer as _, fmt::Layer, layer::SubscriberExt, util::SubscriberInitExt};
const MAX_MESSAGES: usize = 128;
#[derive(Debug, Default)]
struct State {
peers: DashMap<SocketAddr, mpsc::Sender<Arc<Message>>>,
}
#[derive(Debug)]
struct Peer {
username: String,
stream: SplitStream<Framed<TcpStream, LinesCodec>>,
}
#[derive(Debug)]
enum Message {
UserJoined(String),
UserLeft(String),
Chat { sender: String, content: String },
}
#[tokio::main]
async fn main() -> Result<()> {
let layer = Layer::new().with_filter(LevelFilter::INFO);
tracing_subscriber::registry().with(layer).init();
let addr = "0.0.0.0:8080";
let listener = TcpListener::bind(addr).await?;
info!("Listening on {}", addr);
let state = Arc::new(State::default());
loop {
let (stream, addr) = listener.accept().await?;
info!("Accepted connection from {}", addr);
let state_cloned = state.clone();
tokio::spawn(async move {
if let Err(e) = handle_client(state_cloned, addr, stream).await {
warn!("Failed to handle client {}: {}", addr, e);
}
});
}
}
async fn handle_client(state: Arc<State>, addr: SocketAddr, stream: TcpStream) -> Result<()> {
let mut stream = Framed::new(stream, LinesCodec::new());
// 按帧发送的, LinesCodec 会在每行末尾加上 \n
stream
.send("Welcome to the chat! Please enter your username:")
.await?;
let username = match stream.next().await {
Some(Ok(username)) => username,
Some(Err(e)) => {
warn!("Failed to receive username from {}: {}", addr, e);
return Err(e.into());
}
None => {
warn!("Client {} disconnected before sending username", addr);
return Ok(());
}
};
let mut peer = state.add(addr, username, stream).await;
// notify others that a new peer has joined
let message = Arc::new(Message::user_joined(&peer.username));
state.broadcast(addr, message).await;
while let Some(line) = peer.stream.next().await {
let line = match line {
Ok(line) => line,
Err(err) => {
warn!("Failed to receive message from {}: {}", addr, err);
break;
}
};
let message = Arc::new(Message::chat(&peer.username, line));
state.broadcast(addr, message).await;
}
// when while loop exit, peer has left the chat or line reading failed
// remove peer from state
state.peers.remove(&addr);
// notify others that peer has left the chat
let message = Arc::new(Message::user_left(&peer.username));
state.broadcast(addr, message).await;
Ok(())
}
impl State {
async fn broadcast(&self, addr: SocketAddr, message: Arc<Message>) {
for peer in self.peers.iter() {
if peer.key() == &addr {
continue;
}
if let Err(e) = peer.value().send(message.clone()).await {
warn!("Failed to send message to {}: {}", peer.key(), e);
// Remove the peer from the state if it's no longer reachable
self.peers.remove(peer.key());
}
}
}
async fn add(
&self,
addr: SocketAddr,
username: String,
stream: Framed<TcpStream, LinesCodec>,
) -> Peer {
let (tx, mut rx) = mpsc::channel(MAX_MESSAGES);
self.peers.insert(addr, tx);
// split the stream into a sender and a receiver
let (mut stream_sender, stream_receiver) = stream.split();
// receive messages from others, and send them to the client
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
if let Err(e) = stream_sender.send(message.to_string()).await {
warn!("Failed to send message to {}: {}", addr, e);
break;
}
}
});
// return peer
Peer {
username,
stream: stream_receiver,
}
}
}
impl Message {
fn user_joined(username: &str) -> Self {
let content = format!("{} has joined the chat", username);
Self::UserJoined(content)
}
fn user_left(username: &str) -> Self {
let content = format!("{} has left the chat", username);
Self::UserLeft(content)
}
fn chat(sender: impl Into<String>, content: impl Into<String>) -> Self {
Self::Chat {
sender: sender.into(),
content: content.into(),
}
}
}
impl fmt::Display for Message {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Message::UserJoined(content) => write!(f, "Server: {}", content),
Message::UserLeft(content) => write!(f, "Server: {}", content),
Message::Chat { sender, content } => write!(f, "{}: {}", sender, content),
}
}
}
这段 Rust 代码实现了一个基于 Tokio 的异步 TCP 聊天服务器。
它的核心逻辑是:
main
函数中,服务器启动并监听 8080
端口,等待客户端连接。tokio::spawn
) 进行处理,这样可以高效地并发管理多个客户端。handle_client
函数负责与单个客户端的完整交互:首先提示客户端输入用户名,然后将其信息(地址和消息发送通道)存入一个全局共享的、线程安全的 State
(使用 DashMap
) 中。broadcast
方法将新用户加入和离开的通知以及聊天消息广播给所有其他连接的客户端。State
结构中的 add
方法巧妙地利用 mpsc
channel(多生产者,单消费者通道)和 stream.split()
,将读写操作分离:一个任务负责从客户端接收消息,另一个任务负责将广播消息发送给该客户端。当客户端断开连接时,服务器会清理其状态并通知其他用户。brew install telnet
rust-ecosystem-learning on main [!?] is 📦 0.1.0 via 🦀 1.88.0 took 2m 42.4s
➜ cargo run --example chat
Compiling rust-ecosystem-learning v0.1.0 (/Users/qiaopengjun/Code/Rust/rust-ecosystem-learning)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 1.21s
Running `target/debug/examples/chat`
2025-07-13T04:33:58.059142Z INFO chat: Listening on 0.0.0.0:8080
2025-07-13T04:39:47.784622Z INFO chat: Accepted connection from 127.0.0.1:58259
2025-07-13T04:40:19.174428Z INFO chat: Accepted connection from 127.0.0.1:58394
2025-07-13T04:42:32.433305Z INFO chat: Accepted connection from 127.0.0.1:58959
# client qiao
rust-ecosystem-learning on main [!?] is 📦 0.1.0 via 🦀 1.88.0
➜ telnet 127.0.0.1 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Welcome to the chat! Please enter your username:
qiao
Server: li has joined the chat
hello world
li: hi qiao
Server: Alice has joined the chat
## li
rust-ecosystem-learning on main [!?] is 📦 0.1.0 via 🦀 1.88.0
➜ telnet 127.0.0.1 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Welcome to the chat! Please enter your username:
li
qiao: hello world
hi qiao
Server: Alice has joined the chat
# Alice
rust-ecosystem-learning on main [!?] is 📦 0.1.0 via 🦀 1.88.0
➜ telnet 127.0.0.1 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Welcome to the chat! Please enter your username:
Alice
这段运行结果表明,你成功启动了 Rust 聊天服务器,并且它能够正确处理多个客户端的并发连接和消息交互。
测试中,三个客户端(用户名为 qiao、li 和 Alice)通过 telnet
命令连接到了在 8080
端口上监听的服务器。交互日志显示,服务器的核心功能运行正常:
这证明了该聊天程序成功实现了基本的多人实时通信功能。
use std::{fmt, net::SocketAddr, sync::Arc};
use anyhow::Result;
use dashmap::DashMap;
use futures::{SinkExt, StreamExt, stream::SplitStream};
use tokio::{
net::{TcpListener, TcpStream},
sync::mpsc,
};
use tokio_util::codec::{Framed, LinesCodec};
use tracing::{info, level_filters::LevelFilter, warn};
use tracing_subscriber::{Layer as _, fmt::Layer, layer::SubscriberExt, util::SubscriberInitExt};
const MAX_MESSAGES: usize = 128;
#[derive(Debug, Default)]
struct State {
peers: DashMap<SocketAddr, mpsc::Sender<Arc<Message>>>,
}
#[derive(Debug)]
struct Peer {
username: String,
stream: SplitStream<Framed<TcpStream, LinesCodec>>,
}
#[derive(Debug)]
enum Message {
UserJoined(String),
UserLeft(String),
Chat { sender: String, content: String },
}
#[tokio::main]
async fn main() -> Result<()> {
let layer = Layer::new().with_filter(LevelFilter::INFO);
tracing_subscriber::registry().with(layer).init();
let addr = "0.0.0.0:8080";
let listener = TcpListener::bind(addr).await?;
info!("Listening on {}", addr);
let state = Arc::new(State::default());
loop {
let (stream, addr) = listener.accept().await?;
info!("Accepted connection from {}", addr);
let state_cloned = state.clone();
tokio::spawn(async move {
if let Err(e) = handle_client(state_cloned, addr, stream).await {
warn!("Failed to handle client {}: {}", addr, e);
}
});
}
}
async fn handle_client(state: Arc<State>, addr: SocketAddr, stream: TcpStream) -> Result<()> {
let mut stream = Framed::new(stream, LinesCodec::new());
// 按帧发送的, LinesCodec 会在每行末尾加上 \n
stream
.send("Welcome to the chat! Please enter your username:")
.await?;
let username = match stream.next().await {
Some(Ok(username)) => username,
Some(Err(e)) => {
warn!("Failed to receive username from {}: {}", addr, e);
return Err(e.into());
}
None => {
warn!("Client {} disconnected before sending username", addr);
return Ok(());
}
};
let mut peer = state.add(addr, username, stream).await;
// notify others that a new peer has joined
let message = Arc::new(Message::user_joined(&peer.username));
info!("\x1b[32m🟢 用户加入: {:?}\x1b[0m", message);
state.broadcast(addr, message).await;
while let Some(line) = peer.stream.next().await {
let line = match line {
Ok(line) => line,
Err(err) => {
warn!("Failed to receive message from {}: {}", addr, err);
break;
}
};
let message = Arc::new(Message::chat(&peer.username, line));
info!("\x1b[34m💬 聊天消息: {:?}\x1b[0m", message);
state.broadcast(addr, message).await;
}
// when while loop exit, peer has left the chat or line reading failed
// remove peer from state
state.peers.remove(&addr);
// notify others that peer has left the chat
let message = Arc::new(Message::user_left(&peer.username));
info!("\x1b[31m🔴 用户离开: {:?}\x1b[0m", message);
state.broadcast(addr, message).await;
Ok(())
}
impl State {
async fn broadcast(&self, addr: SocketAddr, message: Arc<Message>) {
for peer in self.peers.iter() {
if peer.key() == &addr {
continue;
}
if let Err(e) = peer.value().send(message.clone()).await {
warn!("Failed to send message to {}: {}", peer.key(), e);
// Remove the peer from the state if it's no longer reachable
self.peers.remove(peer.key());
}
}
}
async fn add(
&self,
addr: SocketAddr,
username: String,
stream: Framed<TcpStream, LinesCodec>,
) -> Peer {
let (tx, mut rx) = mpsc::channel(MAX_MESSAGES);
self.peers.insert(addr, tx);
// split the stream into a sender and a receiver
let (mut stream_sender, stream_receiver) = stream.split();
// receive messages from others, and send them to the client
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
if let Err(e) = stream_sender.send(message.to_string()).await {
warn!("Failed to send message to {}: {}", addr, e);
break;
}
}
});
// return peer
Peer {
username,
stream: stream_receiver,
}
}
}
impl Message {
fn user_joined(username: &str) -> Self {
let content = format!("{} has joined the chat", username);
Self::UserJoined(content)
}
fn user_left(username: &str) -> Self {
let content = format!("{} has left the chat", username);
Self::UserLeft(content)
}
fn chat(sender: impl Into<String>, content: impl Into<String>) -> Self {
Self::Chat {
sender: sender.into(),
content: content.into(),
}
}
}
impl fmt::Display for Message {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Message::UserJoined(content) => write!(f, "\x1b[32m🟢 [系统] {}\x1b[0m", content),
Message::UserLeft(content) => write!(f, "\x1b[31m🔴 [系统] {}\x1b[0m", content),
Message::Chat { sender, content } => {
write!(f, "\x1b[34m[{}]\x1b[0m {}", sender, content)
}
}
}
}
这项优化主要集中在提升程序的可观察性(Observability)和终端用户体验(UX),而非性能。
它通过两方面的修改实现:
handle_client
函数中,针对用户加入、离开和发送消息等关键事件,增加了带有 ANSI 颜色代码和表情符号的 info!
日志。这使得在监控服务器后台时,不同类型的事件一目了然,极大地提升了调试和监控的效率。Message
类型的 Display
trait 实现,将颜色和格式化信息(如 [系统]
标签)直接编码到发送给客户端的字符串中。这样,用户在自己的终端(如 telnet
)里看到的聊天内容不再是单调的文本,而是色彩分明、重点突出的富文本信息,显著改善了可读性和交互体验。
rust-ecosystem-learning on main [!?] is 📦 0.1.0 via 🦀 1.88.0 took 9m 11.7s
➜ cargo run --example chat
Compiling rust-ecosystem-learning v0.1.0 (/Users/qiaopengjun/Code/Rust/rust-e...
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!