Server-Sent Events (SSE)

Was ist SSE?

Server-Sent Events (SSE) ist eine Webtechnologie, die es Servern ermöglicht, Daten an Clients zu pushen. Im Gegensatz zu WebSocket handelt es sich bei SSE um eine unidirektionale Kommunikationsmethode, bei der Daten nur vom Server zum Client gesendet werden können. Sie eignet sich für Szenarien, in denen der Server Daten in Echtzeit pushen muss, der Client jedoch keine Daten häufig senden muss. SSE basiert auf dem HTTP-Protokoll, ist einfach zu implementieren und verfügt über integrierte Wiederherstellungsmechanismen.

Bietet Middleware-Unterstützung für SSE.

Beispielcode

main.rs
Cargo.toml
// Copyright (c) 2018-2020 Sean McArthur
// Licensed under the MIT license http://opensource.org/licenses/MIT
//port from https://github.com/seanmonstar/warp/blob/master/examples/sse.rs

use std::convert::Infallible;
use std::time::Duration;

use futures_util::StreamExt;
use salvo::prelude::*;
use salvo::sse::{self, SseEvent};
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;

// create server-sent event
fn sse_counter(counter: u64) -> Result<SseEvent, Infallible> {
    Ok(SseEvent::default().text(counter.to_string()))
}

#[handler]
async fn handle_tick(res: &mut Response) {
    let event_stream = {
        let mut counter: u64 = 0;
        let interval = interval(Duration::from_secs(1));
        let stream = IntervalStream::new(interval);
        stream.map(move |_| {
            counter += 1;
            sse_counter(counter)
        })
    };
    sse::stream(res, event_stream);
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();

    let router = Router::with_path("ticks").get(handle_tick);

    let acceptor = TcpListener::new("0.0.0.0:8698").bind().await;
    Server::new(acceptor).serve(router).await;
}

Chat-Anwendungsbeispiel

main.rs
Cargo.toml
// Copyright (c) 2018-2020 Sean McArthur
// Licensed under the MIT license http://opensource.org/licenses/MIT
//
// port from https://github.com/seanmonstar/warp/blob/master/examples/sse_chat.rs

use std::collections::HashMap;
use std::sync::LazyLock;
use std::sync::atomic::{AtomicUsize, Ordering};

use futures_util::StreamExt;
use parking_lot::Mutex;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;

use salvo::prelude::*;
use salvo::sse::{SseEvent, SseKeepAlive};

type Users = Mutex<HashMap<usize, mpsc::UnboundedSender<Message>>>;

static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
static ONLINE_USERS: LazyLock<Users> = LazyLock::new(Users::default);

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();

    let router = Router::new().goal(index).push(
        Router::with_path("chat")
            .get(user_connected)
            .push(Router::with_path("{id}").post(chat_send)),
    );

    let acceptor = TcpListener::new("0.0.0.0:8698").bind().await;
    Server::new(acceptor).serve(router).await;
}

#[derive(Debug)]
enum Message {
    UserId(usize),
    Reply(String),
}

#[handler]
async fn chat_send(req: &mut Request, res: &mut Response) {
    let my_id = req.param::<usize>("id").unwrap();
    let msg = std::str::from_utf8(req.payload().await.unwrap()).unwrap();
    user_message(my_id, msg);
    res.status_code(StatusCode::OK);
}

#[handler]
async fn user_connected(res: &mut Response) {
    // Use a counter to assign a new unique ID for this user.
    let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);

    tracing::info!("new chat user: {}", my_id);

    // Use an unbounded channel to handle buffering and flushing of messages
    // to the event source...
    let (tx, rx) = mpsc::unbounded_channel();
    let rx = UnboundedReceiverStream::new(rx);

    tx.send(Message::UserId(my_id))
        // rx is right above, so this cannot fail
        .unwrap();

    // Save the sender in our list of connected users.
    ONLINE_USERS.lock().insert(my_id, tx);

    // Convert messages into Server-Sent Events and returns resulting stream.
    let stream = rx.map(|msg| match msg {
        Message::UserId(my_id) => {
            Ok::<_, salvo::Error>(SseEvent::default().name("user").text(my_id.to_string()))
        }
        Message::Reply(reply) => Ok(SseEvent::default().text(reply)),
    });
    SseKeepAlive::new(stream).stream(res);
}

fn user_message(my_id: usize, msg: &str) {
    let new_msg = format!("<User#{my_id}>: {msg}");

    // New message from this user, send it to everyone else (except same uid)...
    //
    // We use `retain` instead of a for loop so that we can reap any user that
    // appears to have disconnected.
    ONLINE_USERS.lock().retain(|uid, tx| {
        if my_id == *uid {
            // don't send to same user, but do retain
            true
        } else {
            // If not `is_ok`, the SSE stream is gone, and so don't retain
            tx.send(Message::Reply(new_msg.clone())).is_ok()
        }
    });
}

#[handler]
async fn index(res: &mut Response) {
    res.render(Text::Html(INDEX_HTML));
}

static INDEX_HTML: &str = r#"
<!DOCTYPE html>
<html>
    <head>
        <title>SSE Chat</title>
    </head>
    <body>
        <h1>SSE Chat</h1>
        <div id="chat">
            <p><em>Connecting...</em></p>
        </div>
        <input type="text" id="msg" />
        <button type="button" id="submit">Send</button>
        <script>
        const chat = document.getElementById('chat');
        const msg = document.getElementById('msg');
        const submit = document.getElementById('submit');
        let sse = new EventSource(`http://${location.host}/chat`);
        sse.onopen = function() {
            chat.innerHTML = "<p><em>Connected!</em></p>";
        }
        let userId;
        sse.addEventListener("user", function(msg) {
            userId = msg.data;
        });
        sse.onmessage = function(msg) {
            showMessage(msg.data);
        };
        document.getElementById('submit').onclick = function() {
            var txt = msg.value;
            var xhr = new XMLHttpRequest();
            xhr.open("POST", `http://${window.location.host}/chat/${userId}`, true);
            xhr.send(txt);
            msg.value = '';
            showMessage('<You>: ' + txt);
        };
        function showMessage(data) {
            const line = document.createElement('p');
            line.innerText = data;
            chat.appendChild(line);
        }
        </script>
    </body>
</html>
"#;

Integration mit großen Sprachmodellen

Bei der Integration von KI mithilfe des OpenAI SDK oder anderer SDKs für große Sprachmodelle sind Streaming-Antworten (Stream) eine häufige Anforderung. SSE bietet eine perfekte Lösung, die es ermöglicht, KI-generierte Inhalte in Echtzeit an das Frontend zu liefern und so eine bessere Benutzererfahrung zu bieten.

Wenn Sie beispielsweise die Chat Completion API von OpenAI mit Streaming-Antworten verwenden, kann die SSE-Funktionalität von Salvo genutzt werden, um KI-generierte Inhalte schrittweise an den Client zu senden und so die Verzögerung beim Warten auf eine vollständige Antwort zu vermeiden.