SSE

Middleware that provides support for SSE.

Example

// Copyright (c) 2018-2020 Sean McArthur
// Licensed under the MIT license http://opensource.org/licenses/MIT
//port from https://github.com/seanmonstar/warp/blob/master/examples/sse.rs

use std::convert::Infallible;
use std::time::Duration;

use futures_util::StreamExt;
use salvo::prelude::*;
use salvo::sse::{self, SseEvent};
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;

// create server-sent event
fn sse_counter(counter: u64) -> Result<SseEvent, Infallible> {
    Ok(SseEvent::default().text(counter.to_string()))
}

#[handler]
async fn handle_tick(res: &mut Response) {
    let event_stream = {
        let mut counter: u64 = 0;
        let interval = interval(Duration::from_secs(1));
        let stream = IntervalStream::new(interval);
        stream.map(move |_| {
            counter += 1;
            sse_counter(counter)
        })
    };
    sse::stream(res, event_stream);
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();

    let router = Router::with_path("ticks").get(handle_tick);

    let acceptor = TcpListener::new("127.0.0.1:5800").bind().await;
    Server::new(acceptor).serve(router).await;
}
[package]
name = "example-sse"
version = "0.1.0"
edition = "2021"
publish = false


[dependencies]
futures-util = { version = "0.3", default-features = false }
salvo = {  workspace = true, features = ["sse"] }
tokio = { version = "1", features = ["macros"] }
tokio-stream = { version = "0.1", features = ["net"] }
tracing = "0.1"
tracing-subscriber = "0.3"

SSE Chat

// Copyright (c) 2018-2020 Sean McArthur
// Licensed under the MIT license http://opensource.org/licenses/MIT
//
// port from https://github.com/seanmonstar/warp/blob/master/examples/sse_chat.rs

use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;

use futures_util::StreamExt;
use once_cell::sync::Lazy;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;

use salvo::prelude::*;
use salvo::sse::{SseEvent, SseKeepAlive};

type Users = Mutex<HashMap<usize, mpsc::UnboundedSender<Message>>>;

static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
static ONLINE_USERS: Lazy<Users> = Lazy::new(Users::default);

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();

    let router = Router::new().goal(index).push(
        Router::with_path("chat")
            .get(user_connected)
            .push(Router::with_path("<id>").post(chat_send)),
    );

    let acceptor = TcpListener::new("127.0.0.1:5800").bind().await;
    Server::new(acceptor).serve(router).await;
}

#[derive(Debug)]
enum Message {
    UserId(usize),
    Reply(String),
}

#[handler]
async fn chat_send(req: &mut Request, res: &mut Response) {
    let my_id = req.param::<usize>("id").unwrap();
    let msg = std::str::from_utf8(req.payload().await.unwrap()).unwrap();
    user_message(my_id, msg);
    res.status_code(StatusCode::OK);
}

#[handler]
async fn user_connected(res: &mut Response) {
    // Use a counter to assign a new unique ID for this user.
    let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);

    tracing::info!("new chat user: {}", my_id);

    // Use an unbounded channel to handle buffering and flushing of messages
    // to the event source...
    let (tx, rx) = mpsc::unbounded_channel();
    let rx = UnboundedReceiverStream::new(rx);

    tx.send(Message::UserId(my_id))
        // rx is right above, so this cannot fail
        .unwrap();

    // Save the sender in our list of connected users.
    ONLINE_USERS.lock().unwrap().insert(my_id, tx);

    // Convert messages into Server-Sent Events and returns resulting stream.
    let stream = rx.map(|msg| match msg {
        Message::UserId(my_id) => {
            Ok::<_, salvo::Error>(SseEvent::default().name("user").text(my_id.to_string()))
        }
        Message::Reply(reply) => Ok(SseEvent::default().text(reply)),
    });
    SseKeepAlive::new(stream).stream(res);
}

fn user_message(my_id: usize, msg: &str) {
    let new_msg = format!("<User#{}>: {}", my_id, msg);

    // New message from this user, send it to everyone else (except same uid)...
    //
    // We use `retain` instead of a for loop so that we can reap any user that
    // appears to have disconnected.
    ONLINE_USERS.lock().unwrap().retain(|uid, tx| {
        if my_id == *uid {
            // don't send to same user, but do retain
            true
        } else {
            // If not `is_ok`, the SSE stream is gone, and so don't retain
            tx.send(Message::Reply(new_msg.clone())).is_ok()
        }
    });
}

#[handler]
async fn index(res: &mut Response) {
    res.render(Text::Html(INDEX_HTML));
}

static INDEX_HTML: &str = r#"
<!DOCTYPE html>
<html>
    <head>
        <title>SSE Chat</title>
    </head>
    <body>
        <h1>SSE Chat</h1>
        <div id="chat">
            <p><em>Connecting...</em></p>
        </div>
        <input type="text" id="msg" />
        <button type="button" id="submit">Send</button>
        <script>
        const chat = document.getElementById('chat');
        const msg = document.getElementById('msg');
        const submit = document.getElementById('submit');
        let sse = new EventSource(`http://${location.host}/chat`);
        sse.onopen = function() {
            chat.innerHTML = "<p><em>Connected!</em></p>";
        }
        let userId;
        sse.addEventListener("user", function(msg) {
            userId = msg.data;
        });
        sse.onmessage = function(msg) {
            showMessage(msg.data);
        };
        document.getElementById('submit').onclick = function() {
            var txt = msg.value;
            var xhr = new XMLHttpRequest();
            xhr.open("POST", `http://${window.location.host}/chat/${userId}`, true);
            xhr.send(txt);
            msg.value = '';
            showMessage('<You>: ' + txt);
        };
        function showMessage(data) {
            const line = document.createElement('p');
            line.innerText = data;
            chat.appendChild(line);
        }
        </script>
    </body>
</html>
"#;
[package]
name = "example-sse-chat"
version = "0.1.0"
edition = "2021"
publish = false


[dependencies]
futures-util = { version = "0.3", default-features = false }
once_cell = "1"
salvo = { workspace = true, features = ["sse"] }
tokio = { version = "1", features = ["macros"] }
tokio-stream = { version = "0.1", features = ["net"] }
tracing = "0.1"
tracing-subscriber = "0.3"