Third-party WebSocket Plugins

salvo-websocket

Add Project Dependencies

[dependencies]
salvo-websocket = "0.0.4"

Define Query Params for WebSocket Connection

#[derive(Debug, Clone, Deserialize)]
struct User {
    name: String,
    room: String,
}

Implement the WebSocketHandler Trait

impl WebSocketHandler for User {
    // Connection event
    async fn on_connected(&self, ws_id: usize, sender: UnboundedSender<Result<Message, Error>>) {
        tracing::info!("{} connected", ws_id);
        WS_CONTROLLER.write().await.join_group(self.room.clone(), sender).unwrap();
        WS_CONTROLLER.write().await.send_group(
            self.room.clone(),
            Message::text(format!("{:?} joined!", self.name)
            ),
        ).unwrap();
    }

    // Disconnection event
    async fn on_disconnected(&self, ws_id: usize) {
        tracing::info!("{} disconnected", ws_id);
    }

    // Receive message event
    async fn on_receive_message(&self, msg: Message) {
        tracing::info!("{:?} received", msg);
        let msg = if let Ok(s) = msg.to_str() {
            s
        } else {
            return;
        };
        let new_msg = format!("<User#{}>: {}", self.name, msg);
        WS_CONTROLLER.write().await.send_group(self.room.clone(), Message::text(new_msg.clone())).unwrap();
    }

    async fn on_send_message(&self, msg: Message) -> Result<Message, Error> {
        tracing::info!("{:?} sending", msg);
        Ok(msg)
    }
}

Write Connection Handling Method

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();
    let router = Router::new()
        .push(Router::with_path("chat").handle(user_connected));
    tracing::info!("Listening on http://127.0.0.1:5800");
    let acceptor = TcpListener::new("127.0.0.1:5800").bind().await; Server::new(acceptor).serve(router).await;
}

#[handler]
async fn user_connected(req: &mut Request, res: &mut Response) -> Result<(), StatusError> {
    let user: Result<User, ParseError> = req.parse_queries();
    match user {
        Ok(user) => {
            WebSocketUpgrade::new().upgrade(req, res, |ws| async move {
                handle_socket(ws, user).await;
            }).await
        }
        Err(_err) => {
            Err(StatusError::bad_request())
        }
    }
}

For more details, please refer directly to the example