[dependencies]
salvo-websocket = "0.0.4"
#[derive(Debug, Clone, Deserialize)]
struct User {
name: String,
room: String,
}
impl WebSocketHandler for User {
// Connection event
async fn on_connected(&self, ws_id: usize, sender: UnboundedSender<Result<Message, Error>>) {
tracing::info!("{} connected", ws_id);
WS_CONTROLLER.write().await.join_group(self.room.clone(), sender).unwrap();
WS_CONTROLLER.write().await.send_group(
self.room.clone(),
Message::text(format!("{:?} joined!", self.name)
),
).unwrap();
}
// Disconnection event
async fn on_disconnected(&self, ws_id: usize) {
tracing::info!("{} disconnected", ws_id);
}
// Receive message event
async fn on_receive_message(&self, msg: Message) {
tracing::info!("{:?} received", msg);
let msg = if let Ok(s) = msg.to_str() {
s
} else {
return;
};
let new_msg = format!("<User#{}>: {}", self.name, msg);
WS_CONTROLLER.write().await.send_group(self.room.clone(), Message::text(new_msg.clone())).unwrap();
}
async fn on_send_message(&self, msg: Message) -> Result<Message, Error> {
tracing::info!("{:?} sending", msg);
Ok(msg)
}
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt().init();
let router = Router::new()
.push(Router::with_path("chat").handle(user_connected));
tracing::info!("Listening on http://127.0.0.1:5800");
let acceptor = TcpListener::new("127.0.0.1:5800").bind().await; Server::new(acceptor).serve(router).await;
}
#[handler]
async fn user_connected(req: &mut Request, res: &mut Response) -> Result<(), StatusError> {
let user: Result<User, ParseError> = req.parse_queries();
match user {
Ok(user) => {
WebSocketUpgrade::new().upgrade(req, res, |ws| async move {
handle_socket(ws, user).await;
}).await
}
Err(_err) => {
Err(StatusError::bad_request())
}
}
}
For more details, please refer directly to the example