#WebSocket
Un middleware qui offre la prise en charge de WebSocket.
#Introduction au WebSocket
WebSocket est un protocole qui permet une communication en duplex intégral sur une seule connexion TCP. Il simplifie l'échange de données entre clients et serveurs en permettant au serveur de transmettre activement des données au client. Dans le protocole WebSocket, le navigateur et le serveur n'ont besoin d'effectuer qu'une seule poignée de main pour établir une connexion persistante et permettre une transmission bidirectionnelle des données.
Contrairement aux requêtes HTTP traditionnelles, une connexion WebSocket reste ouverte après avoir été établie, évitant ainsi des établissements et fermetures fréquents de connexion, ce qui réduit considérablement la surcharge de communication. Cela rend WebSocket particulièrement adapté aux applications en temps réel, telles que les jeux en ligne, les applications de chat, les tableaux de bord boursiers et d'autres applications nécessitant des mises à jour en temps réel.
Salvo fournit une API concise et puissante via son middleware WebSocket, permettant aux développeurs d'intégrer facilement la fonctionnalité WebSocket dans leurs applications.
Exemple de code
use salvo::prelude::*;
use salvo::websocket::WebSocketUpgrade;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Deserialize, Serialize)]
struct User {
id: usize,
name: String,
}
#[handler]
async fn connect(req: &mut Request, res: &mut Response) -> Result<(), StatusError> {
let user = req.parse_queries::<User>();
WebSocketUpgrade::new()
.upgrade(req, res, |mut ws| async move {
println!("{user:#?} ");
while let Some(msg) = ws.recv().await {
let msg = if let Ok(msg) = msg {
msg
} else {
// client disconnected
return;
};
if ws.send(msg).await.is_err() {
// client disconnected
return;
}
}
})
.await
}
#[handler]
async fn index(res: &mut Response) {
res.render(Text::Html(INDEX_HTML));
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt().init();
let router = Router::new()
.get(index)
.push(Router::with_path("ws").goal(connect));
let acceptor = TcpListener::new("0.0.0.0:8698").bind().await;
Server::new(acceptor).serve(router).await;
}
static INDEX_HTML: &str = r#"<!DOCTYPE html>
<html>
<head>
<title>WS</title>
</head>
<body>
<h1>WS</h1>
<div id="status">
<p><em>Connecting...</em></p>
</div>
<script>
const status = document.getElementById('status');
const msg = document.getElementById('msg');
const submit = document.getElementById('submit');
const ws = new WebSocket(`ws://${location.host}/ws?id=123&name=chris`);
ws.onopen = function() {
status.innerHTML = '<p><em>Connected!</em></p>';
};
</script>
</body>
</html>
"#;
[package]
name = "example-websocket"
version.workspace = true
edition.workspace = true
publish.workspace = true
rust-version.workspace = true
[dependencies]
futures-util.workspace = true
salvo = { workspace = true, features = ["websocket"] }
tokio = { workspace = true, features = ["macros"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
#Exemple d'application de chat
// Copyright (c) 2018-2020 Sean McArthur
// Licensed under the MIT license http://opensource.org/licenses/MIT
//
// port from https://github.com/seanmonstar/warp/blob/master/examples/websocket_chat.rs
use std::collections::HashMap;
use std::sync::LazyLock;
use std::sync::atomic::{AtomicUsize, Ordering};
use futures_util::{FutureExt, StreamExt};
use tokio::sync::{RwLock, mpsc};
use tokio_stream::wrappers::UnboundedReceiverStream;
use salvo::prelude::*;
use salvo::websocket::{Message, WebSocket, WebSocketUpgrade};
type Users = RwLock<HashMap<usize, mpsc::UnboundedSender<Result<Message, salvo::Error>>>>;
static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
static ONLINE_USERS: LazyLock<Users> = LazyLock::new(Users::default);
#[tokio::main]
async fn main() {
tracing_subscriber::fmt().init();
let router = Router::new()
.goal(index)
.push(Router::with_path("chat").goal(user_connected));
let acceptor = TcpListener::new("0.0.0.0:8698").bind().await;
Server::new(acceptor).serve(router).await;
}
#[handler]
async fn user_connected(req: &mut Request, res: &mut Response) -> Result<(), StatusError> {
WebSocketUpgrade::new()
.upgrade(req, res, handle_socket)
.await
}
async fn handle_socket(ws: WebSocket) {
// Use a counter to assign a new unique ID for this user.
let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
tracing::info!("new chat user: {}", my_id);
// Split the socket into a sender and receive of messages.
let (user_ws_tx, mut user_ws_rx) = ws.split();
// Use an unbounded channel to handle buffering and flushing of messages
// to the websocket...
let (tx, rx) = mpsc::unbounded_channel();
let rx = UnboundedReceiverStream::new(rx);
let fut = rx.forward(user_ws_tx).map(|result| {
if let Err(e) = result {
tracing::error!(error = ?e, "websocket send error");
}
});
tokio::task::spawn(fut);
let fut = async move {
ONLINE_USERS.write().await.insert(my_id, tx);
while let Some(result) = user_ws_rx.next().await {
let msg = match result {
Ok(msg) => msg,
Err(e) => {
eprintln!("websocket error(uid={my_id}): {e}");
break;
}
};
user_message(my_id, msg).await;
}
user_disconnected(my_id).await;
};
tokio::task::spawn(fut);
}
async fn user_message(my_id: usize, msg: Message) {
let Ok(msg) = msg.as_str() else {
return;
};
let new_msg = format!("<User#{my_id}>: {msg}");
// New message from this user, send it to everyone else (except same uid)...
for (&uid, tx) in ONLINE_USERS.read().await.iter() {
if my_id != uid
&& let Err(_disconnected) = tx.send(Ok(Message::text(new_msg.clone())))
{
// The tx is disconnected, our `user_disconnected` code
// should be happening in another task, nothing more to
// do here.
tracing::info!("the tx is disconnected, uid={uid}");
}
}
}
async fn user_disconnected(my_id: usize) {
eprintln!("good bye user: {my_id}");
// Stream closed up, so remove from the user list
ONLINE_USERS.write().await.remove(&my_id);
}
#[handler]
async fn index(res: &mut Response) {
res.render(Text::Html(INDEX_HTML));
}
static INDEX_HTML: &str = r#"<!DOCTYPE html>
<html>
<head>
<title>WS Chat</title>
</head>
<body>
<h1>WS Chat</h1>
<div id="chat">
<p><em>Connecting...</em></p>
</div>
<input type="text" id="text" />
<button type="button" id="submit">Submit</button>
<script>
const chat = document.getElementById('chat');
const msg = document.getElementById('msg');
const submit = document.getElementById('submit');
const ws = new WebSocket(`ws://${location.host}/chat`);
ws.onopen = function() {
chat.innerHTML = '<p><em>Connected!</em></p>';
};
ws.onmessage = function(msg) {
showMessage(msg.data);
};
ws.onclose = function() {
chat.getElementsByTagName('em')[0].innerText = 'Disconnected!';
};
submit.onclick = function() {
const msg = text.value;
ws.send(msg);
text.value = '';
showMessage('<You>: ' + msg);
};
function showMessage(data) {
const line = document.createElement('p');
line.innerText = data;
chat.appendChild(line);
}
</script>
</body>
</html>
"#;
[package]
name = "example-websocket-chat"
version.workspace = true
edition.workspace = true
publish.workspace = true
rust-version.workspace = true
[dependencies]
futures-util.workspace = true
salvo = { workspace = true, features = ["websocket"] }
tokio = { workspace = true, features = ["macros"] }
tokio-stream = { workspace = true, features = ["net"] }
tracing.workspace = true
tracing-subscriber.workspace = true