Eventi Inviati dal Server (SSE)

Cos'è SSE

Gli Eventi Inviati dal Server (SSE) sono una tecnologia web che consente ai server di inviare dati ai client. A differenza di WebSocket, SSE è un metodo di comunicazione unidirezionale, in cui i dati possono essere inviati solo dal server al client. È adatto per scenari in cui il server deve inviare dati in tempo reale, ma il client non ha bisogno di inviarli frequentemente. SSE si basa sul protocollo HTTP, è facile da implementare e include meccanismi di riconnessione integrati.

Fornisce supporto middleware per SSE.

Codice di Esempio

main.rs
Cargo.toml
// Copyright (c) 2018-2020 Sean McArthur
// Licensed under the MIT license http://opensource.org/licenses/MIT
//port from https://github.com/seanmonstar/warp/blob/master/examples/sse.rs

use std::convert::Infallible;
use std::time::Duration;

use futures_util::StreamExt;
use salvo::prelude::*;
use salvo::sse::{self, SseEvent};
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;

// create server-sent event
fn sse_counter(counter: u64) -> Result<SseEvent, Infallible> {
    Ok(SseEvent::default().text(counter.to_string()))
}

#[handler]
async fn handle_tick(res: &mut Response) {
    let event_stream = {
        let mut counter: u64 = 0;
        let interval = interval(Duration::from_secs(1));
        let stream = IntervalStream::new(interval);
        stream.map(move |_| {
            counter += 1;
            sse_counter(counter)
        })
    };
    sse::stream(res, event_stream);
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();

    let router = Router::with_path("ticks").get(handle_tick);

    let acceptor = TcpListener::new("0.0.0.0:8698").bind().await;
    Server::new(acceptor).serve(router).await;
}

Esempio di Applicazione di Chat

main.rs
Cargo.toml
// Copyright (c) 2018-2020 Sean McArthur
// Licensed under the MIT license http://opensource.org/licenses/MIT
//
// port from https://github.com/seanmonstar/warp/blob/master/examples/sse_chat.rs

use std::collections::HashMap;
use std::sync::LazyLock;
use std::sync::atomic::{AtomicUsize, Ordering};

use futures_util::StreamExt;
use parking_lot::Mutex;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;

use salvo::prelude::*;
use salvo::sse::{SseEvent, SseKeepAlive};

type Users = Mutex<HashMap<usize, mpsc::UnboundedSender<Message>>>;

static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
static ONLINE_USERS: LazyLock<Users> = LazyLock::new(Users::default);

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();

    let router = Router::new().goal(index).push(
        Router::with_path("chat")
            .get(user_connected)
            .push(Router::with_path("{id}").post(chat_send)),
    );

    let acceptor = TcpListener::new("0.0.0.0:8698").bind().await;
    Server::new(acceptor).serve(router).await;
}

#[derive(Debug)]
enum Message {
    UserId(usize),
    Reply(String),
}

#[handler]
async fn chat_send(req: &mut Request, res: &mut Response) {
    let my_id = req.param::<usize>("id").unwrap();
    let msg = std::str::from_utf8(req.payload().await.unwrap()).unwrap();
    user_message(my_id, msg);
    res.status_code(StatusCode::OK);
}

#[handler]
async fn user_connected(res: &mut Response) {
    // Use a counter to assign a new unique ID for this user.
    let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);

    tracing::info!("new chat user: {}", my_id);

    // Use an unbounded channel to handle buffering and flushing of messages
    // to the event source...
    let (tx, rx) = mpsc::unbounded_channel();
    let rx = UnboundedReceiverStream::new(rx);

    tx.send(Message::UserId(my_id))
        // rx is right above, so this cannot fail
        .unwrap();

    // Save the sender in our list of connected users.
    ONLINE_USERS.lock().insert(my_id, tx);

    // Convert messages into Server-Sent Events and returns resulting stream.
    let stream = rx.map(|msg| match msg {
        Message::UserId(my_id) => {
            Ok::<_, salvo::Error>(SseEvent::default().name("user").text(my_id.to_string()))
        }
        Message::Reply(reply) => Ok(SseEvent::default().text(reply)),
    });
    SseKeepAlive::new(stream).stream(res);
}

fn user_message(my_id: usize, msg: &str) {
    let new_msg = format!("<User#{my_id}>: {msg}");

    // New message from this user, send it to everyone else (except same uid)...
    //
    // We use `retain` instead of a for loop so that we can reap any user that
    // appears to have disconnected.
    ONLINE_USERS.lock().retain(|uid, tx| {
        if my_id == *uid {
            // don't send to same user, but do retain
            true
        } else {
            // If not `is_ok`, the SSE stream is gone, and so don't retain
            tx.send(Message::Reply(new_msg.clone())).is_ok()
        }
    });
}

#[handler]
async fn index(res: &mut Response) {
    res.render(Text::Html(INDEX_HTML));
}

static INDEX_HTML: &str = r#"
<!DOCTYPE html>
<html>
    <head>
        <title>SSE Chat</title>
    </head>
    <body>
        <h1>SSE Chat</h1>
        <div id="chat">
            <p><em>Connecting...</em></p>
        </div>
        <input type="text" id="msg" />
        <button type="button" id="submit">Send</button>
        <script>
        const chat = document.getElementById('chat');
        const msg = document.getElementById('msg');
        const submit = document.getElementById('submit');
        let sse = new EventSource(`http://${location.host}/chat`);
        sse.onopen = function() {
            chat.innerHTML = "<p><em>Connected!</em></p>";
        }
        let userId;
        sse.addEventListener("user", function(msg) {
            userId = msg.data;
        });
        sse.onmessage = function(msg) {
            showMessage(msg.data);
        };
        document.getElementById('submit').onclick = function() {
            var txt = msg.value;
            var xhr = new XMLHttpRequest();
            xhr.open("POST", `http://${window.location.host}/chat/${userId}`, true);
            xhr.send(txt);
            msg.value = '';
            showMessage('<You>: ' + txt);
        };
        function showMessage(data) {
            const line = document.createElement('p');
            line.innerText = data;
            chat.appendChild(line);
        }
        </script>
    </body>
</html>
"#;

Integrazione con Modelli Linguistici di Grande Scala

Quando si integra l'IA utilizzando l'SDK di OpenAI o altri SDK di modelli linguistici di grande scala, le risposte in streaming (Stream) sono un requisito comune. SSE fornisce una soluzione perfetta, consentendo la consegna in tempo reale del contenuto generato dall'IA al frontend, offrendo così una migliore esperienza utente.

Ad esempio, quando si utilizza l'API di completamento chat di OpenAI con risposte in streaming, la funzionalità SSE di Salvo può essere sfruttata per inviare gradualmente il contenuto generato dall'IA al client, eliminando il ritardo dovuto all'attesa di una risposta completa.