llm-code-example

---concurrency-limiter---

---concurrency-limiter/Cargo.mdx---

concurrency-limiter/Cargo.toml
[package]
name = "example-concurrency-limiter"
version = "0.1.0"
edition = "2024"


[dependencies]
salvo = { version = "0.77.1", features = ["concurrency-limiter"]}
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"

---concurrency-limiter/src---

---concurrency-limiter/src/main.mdx---

concurrency-limiter/src/main.rs
use std::fs::create_dir_all;
use std::path::Path;

use salvo::prelude::*;

// Handler for serving the index page with upload forms
#[handler]
async fn index(res: &mut Response) {
    res.render(Text::Html(INDEX_HTML));
}

// Handler for processing file uploads with a simulated delay
#[handler]
async fn upload(req: &mut Request, res: &mut Response) {
    // Extract file from the multipart form data
    let file = req.file("file").await;
    // Simulate a long-running operation (10 seconds)
    tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;

    if let Some(file) = file {
        // Generate destination path for the uploaded file
        let dest = format!("temp/{}", file.name().unwrap_or("file"));
        tracing::debug!(dest = %dest, "upload file");

        // Copy file to destination
        if let Err(e) = std::fs::copy(file.path(), Path::new(&dest)) {
            res.status_code(StatusCode::INTERNAL_SERVER_ERROR);
            res.render(Text::Plain(format!("file not found in request: {e}")));
        } else {
            res.render(Text::Plain(format!("File uploaded to {dest}")));
        }
    } else {
        res.status_code(StatusCode::BAD_REQUEST);
        res.render(Text::Plain("file not found in request"));
    }
}

#[tokio::main]
async fn main() {
    // Initialize logging system
    tracing_subscriber::fmt().init();

    // Create temporary directory for file uploads
    create_dir_all("temp").unwrap();

    // Configure router with two upload endpoints:
    // - /limited: Only allows one concurrent upload (with concurrency limiter)
    // - /unlimit: Allows unlimited concurrent uploads
    let router = Router::new()
        .get(index)
        .push(
            Router::new()
                .hoop(max_concurrency(1)) // Limit concurrent requests to 1
                .path("limited")
                .post(upload),
        )
        .push(Router::with_path("unlimit").post(upload));

    // Bind server to port 5800 and start serving
    let acceptor = TcpListener::new("0.0.0.0:5800").bind().await;
    Server::new(acceptor).serve(router).await;
}

// HTML template for the upload forms page
static INDEX_HTML: &str = r#"<!DOCTYPE html>
<html>
    <head>
        <title>Upload file</title>
    </head>
    <body>
        <h1>Upload file</h1>
        <form action="/unlimit" method="post" enctype="multipart/form-data">
            <h3>Unlimit</h3>
            <input type="file" name="file" />
            <input type="submit" value="upload" />
        </form>
        <form action="/limited" method="post" enctype="multipart/form-data">
            <h3>Limited</h3>
            <input type="file" name="file" />
            <input type="submit" value="upload" />
        </form>
    </body>
</html>
"#;

---csrf-session-store---

---csrf-session-store/Cargo.mdx---

csrf-session-store/Cargo.toml
[package]
name = "example-csrf-session-store"
version = "0.1.0"
edition = "2024"

[dependencies]
salvo = { version = "0.77.1", features = ["csrf", "session"] }
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"
serde = { version = "1", features = ["derive"] }

---csrf-session-store/src---

---csrf-session-store/src/main.mdx---

csrf-session-store/src/main.rs
use salvo::csrf::*;
use salvo::prelude::*;
use serde::{Deserialize, Serialize};

// Handler for serving the home page with links to different CSRF protection methods
#[handler]
pub async fn home(res: &mut Response) {
    let html = r#"
    <!DOCTYPE html>
    <html>
    <head><meta charset="UTF-8"><title>Csrf SessionStore</title></head>
    <body>
    <h2>Csrf Exampe: SessionStore</h2>
    <ul>
        <li><a href="/bcrypt/">Bcrypt</a></li>
        <li><a href="/hmac/">Hmac</a></li>
        <li><a href="/aes_gcm/">Aes Gcm</a></li>
        <li><a href="/ccp/">chacha20poly1305</a></li>
    </ul>
    </body>"#;
    res.render(Text::Html(html));
}

// Handler for GET requests that displays a form with CSRF token
#[handler]
pub async fn get_page(depot: &mut Depot, res: &mut Response) {
    let new_token = depot.csrf_token().unwrap_or_default();
    res.render(Text::Html(get_page_html(new_token, "")));
}

// Handler for POST requests that processes form submission with CSRF validation
#[handler]
pub async fn post_page(req: &mut Request, depot: &mut Depot, res: &mut Response) {
    // Define data structure for form submission
    #[derive(Deserialize, Serialize, Debug)]
    struct Data {
        csrf_token: String,
        message: String,
    }
    // Parse the submitted form data into the Data struct
    let data = req.parse_form::<Data>().await.unwrap();
    // Log the received form data for debugging
    tracing::info!("posted data: {:?}", data);
    // Generate a new CSRF token for the next request
    let new_token = depot.csrf_token().unwrap_or_default();
    // Generate HTML response with the new token and display the submitted data
    let html = get_page_html(new_token, &format!("{data:#?}"));
    // Send the HTML response back to the client
    res.render(Text::Html(html));
}

#[tokio::main]
async fn main() {
    // Initialize logging system
    tracing_subscriber::fmt().init();

    // Configure CSRF token finder in form data
    let form_finder = FormFinder::new("csrf_token");

    // Initialize different CSRF protection methods using session store
    let bcrypt_csrf = bcrypt_session_csrf(form_finder.clone());
    let hmac_csrf = hmac_session_csrf(*b"01234567012345670123456701234567", form_finder.clone());
    let aes_gcm_session_csrf =
        aes_gcm_session_csrf(*b"01234567012345670123456701234567", form_finder.clone());
    let ccp_session_csrf =
        ccp_session_csrf(*b"01234567012345670123456701234567", form_finder.clone());

    // Configure session handler with memory store and secret key
    let session_handler = salvo::session::SessionHandler::builder(
        salvo::session::MemoryStore::new(),
        b"secretabsecretabsecretabsecretabsecretabsecretabsecretabsecretab",
    )
    .build()
    .unwrap();

    // Configure router with session handler and different CSRF protection endpoints
    let router = Router::new()
        .get(home)
        .hoop(session_handler)
        // Bcrypt-based CSRF protection
        .push(
            Router::with_hoop(bcrypt_csrf)
                .path("bcrypt")
                .get(get_page)
                .post(post_page),
        )
        // HMAC-based CSRF protection
        .push(
            Router::with_hoop(hmac_csrf)
                .path("hmac")
                .get(get_page)
                .post(post_page),
        )
        // AES-GCM-based CSRF protection
        .push(
            Router::with_hoop(aes_gcm_session_csrf)
                .path("aes_gcm")
                .get(get_page)
                .post(post_page),
        )
        // ChaCha20Poly1305-based CSRF protection
        .push(
            Router::with_hoop(ccp_session_csrf)
                .path("ccp")
                .get(get_page)
                .post(post_page),
        );

    // Start server on port 5800
    let acceptor = TcpListener::new("0.0.0.0:5800").bind().await;
    Server::new(acceptor).serve(router).await;
}

// Helper function to generate HTML page with CSRF token and message
fn get_page_html(csrf_token: &str, msg: &str) -> String {
    format!(
        r#"
    <!DOCTYPE html>
    <html>
    <head><meta charset="UTF-8"><title>Csrf SessionStore</title></head>
    <body>
    <h2>Csrf Exampe: SessionStore</h2>
    <ul>
        <li><a href="/bcrypt/">Bcrypt</a></li>
        <li><a href="/hmac/">Hmac</a></li>
        <li><a href="/aes_gcm/">Aes Gcm</a></li>
        <li><a href="/ccp/">chacha20poly1305</a></li>
    </ul>
    <form action="./" method="post">
        <input type="hidden" name="csrf_token" value="{csrf_token}" />
        <div>
            <label>Message:<input type="text" name="message" /></label>
        </div>
        <button type="submit">Send</button>
    </form>
    <pre>{msg}</pre>
    </body>
    </html>
    "#
    )
}

---webtransport---

---webtransport/certs---

---webtransport/static---

---webtransport/static/client.mdx---

webtransport/static/client.html
<!doctype html>
<html lang="en">
  <title>WebTransport over HTTP/3 client</title>
  <meta charset="utf-8">
  <!-- WebTransport origin trial token. See https://developer.chrome.com/origintrials/#/view_trial/793759434324049921 -->
  <meta http-equiv="origin-trial" content="AkSQvBVsfMTgBtlakApX94hWGyBPQJXerRc2Aq8g/sKTMF+yG62+bFUB2yIxaK1furrNH3KNNeJV00UZSZHicw4AAABceyJvcmlnaW4iOiJodHRwczovL2dvb2dsZWNocm9tZS5naXRodWIuaW86NDQzIiwiZmVhdHVyZSI6IldlYlRyYW5zcG9ydCIsImV4cGlyeSI6MTY0Mzc1OTk5OX0=">
  <script src="client.js"></script>
  <link rel="stylesheet" href="client.css">
  <meta name="viewport" content="width=device-width, initial-scale=1">
  <body>
  <div id="top">
    <div id="explanation">
      This tool can be used to connect to an arbitrary WebTransport server.
      It has several limitations:
      <ul>
        <li>It can only send an entirety of a stream at once.  Once the stream
          is opened, all of the data is immediately sent, and the write side of
          the steam is closed.</li>
        <li>This tool does not listen to server-initiated bidirectional
          streams.</li>
        <li>Stream IDs are different from the one used by QUIC on the wire, as
          the on-the-wire IDs are not exposed via the Web API.</li>
        <li>The <code>WebTransport</code> object can be accessed using the developer console via <code>currentTransport</code>.</li>
      </ul>
    </div>
    <div id="tool">
    <h1>WebTransport over HTTP/3 client</h1>
    <div>
      <h2>Establish WebTransport connection</h2>
      <div class="input-line">
      <label for="url">URL:</label>
      <input type="text" name="url" id="url"
             value="https://0.0.0.0:5800/counter">
      <input type="button" id="connect" value="Connect" onclick="connect()">
      </div>
    </div>
    <div>
      <h2>Send data over WebTransport</h2>
      <form name="sending">
      <textarea name="data" id="data"></textarea>
      <div>
        <input type="radio" name="sendtype" value="datagram"
               id="datagram" checked>
        <label for="datagram">Send a datagram</label>
      </div>
      <div>
        <input type="radio" name="sendtype" value="unidi" id="unidi-stream">
        <label for="unidi-stream">Open a unidirectional stream</label>
      </div>
      <div>
        <input type="radio" name="sendtype" value="bidi" id="bidi-stream">
        <label for="bidi-stream">Open a bidirectional stream</label>
      </div>
      <input type="button" id="send" name="send" value="Send data"
             disabled onclick="sendData()">
      </form>
    </div>
    <div>
      <h2>Event log</h2>
      <ul id="event-log">
      </ul>
    </div>
    </div>
  </div>
  </body>
</html>

---webtransport/Cargo.mdx---

webtransport/Cargo.toml
[package]
name = "example-webtransport"
version = "0.1.0"
edition = "2024"


[dependencies]
anyhow = "1"
futures-util = "0.3"
salvo = { version = "0.77.1", features = ["quinn", "anyhow", "serve-static"] }
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"
serde = "1"
serde_json = "1"
bytes = "1"

---webtransport/src---

---webtransport/src/main.mdx---

webtransport/src/main.rs
use std::time::Duration;

use anyhow::{Context, Result};
use bytes::{BufMut, Bytes, BytesMut};
use salvo::conn::rustls::{Keycert, RustlsConfig};
use salvo::prelude::*;
use salvo::proto::webtransport;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::pin;

macro_rules! log_result {
    ($expr:expr) => {
        if let Err(err) = $expr {
            tracing::error!("{err:?}");
        }
    };
}
async fn echo_stream<T, R>(send: T, recv: R) -> anyhow::Result<()>
where
    T: AsyncWrite,
    R: AsyncRead,
{
    pin!(send);
    pin!(recv);

    tracing::info!("Got stream");
    let mut buf = Vec::new();
    recv.read_to_end(&mut buf).await?;

    let message = Bytes::from(buf);
    send_chunked(send, message).await?;

    Ok(())
}
// Used to test that all chunks arrive properly as it is easy to write an impl which only reads and
// writes the first chunk.
async fn send_chunked(mut send: impl AsyncWrite + Unpin, data: Bytes) -> anyhow::Result<()> {
    for chunk in data.chunks(4) {
        tokio::time::sleep(Duration::from_millis(100)).await;
        tracing::info!("Sending {chunk:?}");
        send.write_all(chunk).await?;
    }

    Ok(())
}

#[handler]
async fn connect(req: &mut Request) -> Result<(), salvo::Error> {
    let session = req.web_transport_mut().await.unwrap();
    let session_id = session.session_id();

    // This will open a bidirectional stream and send a message to the client right after connecting!
    let stream = session.open_bi(session_id).await?;

    tokio::spawn(async move {
        log_result!(open_bidi_test(stream).await);
    });
    loop {
        tokio::select! {
            datagram = session.accept_datagram() => {
                let datagram = datagram?;
                if let Some((_, datagram)) = datagram {
                    tracing::info!("Responding with {datagram:?}");
                    // Put something before to make sure encoding and decoding works and don't just
                    // pass through
                    let mut resp = BytesMut::from(&b"Response: "[..]);
                    resp.put(datagram);

                    session.send_datagram(resp.freeze())?;
                    tracing::info!("Finished sending datagram");
                }
            }
            uni_stream = session.accept_uni() => {
                let (id, stream) = uni_stream?.unwrap();

                let send = session.open_uni(id).await?;
                tokio::spawn( async move { log_result!(echo_stream(send, stream).await); });
            }
            stream = session.accept_bi() => {
                if let Some(webtransport::server::AcceptedBi::BidiStream(_, stream)) = stream? {
                    let (send, recv) = salvo::proto::quic::BidiStream::split(stream);
                    tokio::spawn( async move { log_result!(echo_stream(send, recv).await); });
                }
            }
            else => {
                break
            }
        }
    }

    tracing::info!("Finished handling session");

    Ok(())
}

async fn open_bidi_test<S>(mut stream: S) -> anyhow::Result<()>
where
    S: Unpin + AsyncRead + AsyncWrite,
{
    tracing::info!("Opening bidirectional stream");

    stream
        .write_all(b"Hello from a server initiated bidi stream")
        .await
        .context("Failed to respond")?;

    let mut resp = Vec::new();
    stream.shutdown().await?;
    stream.read_to_end(&mut resp).await?;

    tracing::info!("Got response from client: {resp:?}");

    Ok(())
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();

    let cert = include_bytes!("../certs/cert.pem").to_vec();
    let key = include_bytes!("../certs/key.pem").to_vec();

    let router = Router::new()
        .push(Router::with_path("counter").goal(connect))
        .push(
            Router::with_path("{*path}")
                .get(StaticDir::new(["webtransport/static", "./static"]).defaults("client.html")),
        );

    let config = RustlsConfig::new(Keycert::new().cert(cert.as_slice()).key(key.as_slice()));
    let listener = TcpListener::new(("0.0.0.0", 5800)).rustls(config.clone());

    let acceptor = QuinnListener::new(config, ("0.0.0.0", 5800))
        .join(listener)
        .bind()
        .await;

    Server::new(acceptor).serve(router).await;
}

---catch-error---

---catch-error/Cargo.mdx---

catch-error/Cargo.toml
[package]
name = "example-catch-error"
version = "0.1.0"
edition = "2024"


[dependencies]
anyhow = "1"
eyre = "0.6"
salvo = { version = "0.77.1", features = ["anyhow", "eyre"] }
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"

---catch-error/src---

---catch-error/src/main.mdx---

catch-error/src/main.rs
use salvo::prelude::*;

// Custom error type for demonstration
struct CustomError;

// Implement Writer trait for CustomError to customize error response
#[async_trait]
impl Writer for CustomError {
    async fn write(self, _req: &mut Request, _depot: &mut Depot, res: &mut Response) {
        // Set response status code to 500 and custom error message
        res.status_code(StatusCode::INTERNAL_SERVER_ERROR);
        res.render("custom error");
    }
}

// Handler that returns an anyhow error for testing error handling
#[handler]
async fn handle_anyhow() -> Result<(), anyhow::Error> {
    Err(anyhow::anyhow!("handled anyhow error"))
}

// Handler that returns an eyre error for testing error handling
#[handler]
async fn handle_eyre() -> eyre::Result<()> {
    Err(eyre::Report::msg("handled eyre error"))
}

// Handler that returns our custom error type
#[handler]
async fn handle_custom() -> Result<(), CustomError> {
    Err(CustomError)
}

#[tokio::main]
async fn main() {
    // Initialize logging system
    tracing_subscriber::fmt().init();

    // Set up router with three error handling endpoints:
    // - /anyhow : demonstrates anyhow error handling
    // - /eyre : demonstrates eyre error handling
    // - /custom : demonstrates custom error handling
    let router = Router::new()
        .push(Router::with_path("anyhow").get(handle_anyhow))
        .push(Router::with_path("eyre").get(handle_eyre))
        .push(Router::with_path("custom").get(handle_custom));

    // Bind server to port 5800 and start serving
    let acceptor = TcpListener::new("0.0.0.0:5800").bind().await;
    Server::new(acceptor).serve(router).await;
}

---join-listeners---

---join-listeners/Cargo.mdx---

join-listeners/Cargo.toml
[package]
name = "example-join-listeners"
version = "0.1.0"
edition = "2024"


[dependencies]
salvo = { version = "0.77.1" }
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"

---join-listeners/src---

---join-listeners/src/main.mdx---

join-listeners/src/main.rs
use salvo::prelude::*;

#[handler]
async fn hello() -> &'static str {
    "Hello World"
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();

    let router = Router::new().get(hello);
    let acceptor = TcpListener::new("0.0.0.0:5800")
        .join(TcpListener::new("0.0.0.0:5801"))
        .bind()
        .await;

    Server::new(acceptor).serve(router).await;
}

---oapi-todos---

---oapi-todos/Cargo.mdx---

oapi-todos/Cargo.toml
[package]
name = "example-oapi-todos"
version = "0.1.0"
edition = "2024"


[dependencies]
salvo = { version = "0.77.1", features = ["oapi"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["macros"] }
compact_str = { version = "0.7", features = ["serde"] }
tracing = "0.1"
tracing-subscriber = "0.3"

---oapi-todos/src---

---oapi-todos/src/main.mdx---

oapi-todos/src/main.rs
use std::sync::LazyLock;

use salvo::oapi::{ToSchema, extract::*};
use salvo::prelude::*;
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;

static STORE: LazyLock<Db> = LazyLock::new(new_store);
pub type Db = Mutex<Vec<Todo>>;

pub fn new_store() -> Db {
    Mutex::new(Vec::new())
}

#[derive(Serialize, Deserialize, Clone, Debug, ToSchema)]
pub struct Todo {
    #[salvo(schema(example = 1))]
    pub id: u64,
    #[salvo(schema(example = "Buy coffee"))]
    pub text: String,
    pub completed: bool,
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();

    let router = Router::new().get(index).push(
        Router::with_path("api").push(
            Router::with_path("todos")
                .get(list_todos)
                .post(create_todo)
                .push(
                    Router::with_path("{id}")
                        .patch(update_todo)
                        .delete(delete_todo),
                ),
        ),
    );

    let doc = OpenApi::new("todos api", "0.0.1").merge_router(&router);

    let router = router
        .unshift(doc.into_router("/api-doc/openapi.json"))
        .unshift(
            SwaggerUi::new("/api-doc/openapi.json")
                .title("Todos - SwaggerUI")
                .into_router("/swagger-ui"),
        )
        .unshift(
            Scalar::new("/api-doc/openapi.json")
                .title("Todos - Scalar")
                .into_router("/scalar"),
        )
        .unshift(
            RapiDoc::new("/api-doc/openapi.json")
                .title("Todos - RapiDoc")
                .into_router("/rapidoc"),
        )
        .unshift(
            ReDoc::new("/api-doc/openapi.json")
                .title("Todos - ReDoc")
                .into_router("/redoc"),
        );

    let acceptor = TcpListener::new("0.0.0.0:5800").bind().await;
    Server::new(acceptor).serve(router).await;
}

#[handler]
pub async fn index() -> Text<&'static str> {
    Text::Html(INDEX_HTML)
}

/// List todos.
#[endpoint(
    tags("todos"),
    parameters(
        ("offset", description = "Offset is an optional query paramter."),
    )
)]
pub async fn list_todos(
    offset: QueryParam<usize, false>,
    limit: QueryParam<usize, false>,
) -> Json<Vec<Todo>> {
    let todos = STORE.lock().await;
    let todos: Vec<Todo> = todos
        .clone()
        .into_iter()
        .skip(offset.into_inner().unwrap_or(0))
        .take(limit.into_inner().unwrap_or(usize::MAX))
        .collect();
    Json(todos)
}

/// Create new todo.
#[endpoint(tags("todos"), status_codes(201, 409))]
pub async fn create_todo(req: JsonBody<Todo>) -> Result<StatusCode, StatusError> {
    tracing::debug!(todo = ?req, "create todo");

    let mut vec = STORE.lock().await;

    for todo in vec.iter() {
        if todo.id == req.id {
            tracing::debug!(id = ?req.id, "todo already exists");
            return Err(StatusError::bad_request().brief("todo already exists"));
        }
    }

    vec.push(req.into_inner());
    Ok(StatusCode::CREATED)
}

/// Update existing todo.
#[endpoint(tags("todos"), status_codes(200, 404))]
pub async fn update_todo(
    id: PathParam<u64>,
    updated: JsonBody<Todo>,
) -> Result<StatusCode, StatusError> {
    tracing::debug!(todo = ?updated, id = ?id, "update todo");
    let mut vec = STORE.lock().await;

    for todo in vec.iter_mut() {
        if todo.id == *id {
            *todo = (*updated).clone();
            return Ok(StatusCode::OK);
        }
    }

    tracing::debug!(?id, "todo is not found");
    Err(StatusError::not_found())
}

/// Delete todo.
#[endpoint(tags("todos"), status_codes(200, 401, 404))]
pub async fn delete_todo(id: PathParam<u64>) -> Result<StatusCode, StatusError> {
    tracing::debug!(?id, "delete todo");

    let mut vec = STORE.lock().await;

    let len = vec.len();
    vec.retain(|todo| todo.id != *id);

    let deleted = vec.len() != len;
    if deleted {
        Ok(StatusCode::NO_CONTENT)
    } else {
        tracing::debug!(?id, "todo is not found");
        Err(StatusError::not_found())
    }
}

static INDEX_HTML: &str = r#"<!DOCTYPE html>
<html>
    <head>
        <title>Oapi todos</title>
    </head>
    <body>
        <ul>
        <li><a href="swagger-ui" target="_blank">swagger-ui</a></li>
        <li><a href="scalar" target="_blank">scalar</a></li>
        <li><a href="rapidoc" target="_blank">rapidoc</a></li>
        <li><a href="redoc" target="_blank">redoc</a></li>
        </ul>
    </body>
</html>
"#;

---db-mongodb---

---db-mongodb/Cargo.mdx---

db-mongodb/Cargo.toml
[package]
name = "example-db-mongodb"
version = "0.1.0"
edition = "2024"

[dependencies]
salvo = { version = "0.77.1" }
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"
futures.workspace = true
serde = { version = "1", features = ["derive"] }
serde_json = "1"
mongodb = "2"
thiserror = "1"

---db-mongodb/src---

---db-mongodb/src/main.mdx---

db-mongodb/src/main.rs
use std::sync::OnceLock;

use futures::stream::TryStreamExt;
use mongodb::{
    Client, Collection, IndexModel, bson::Document, bson::doc, bson::oid::ObjectId,
    options::IndexOptions,
};
use salvo::prelude::*;
use serde::{Deserialize, Serialize};

// Database and collection names
const DB_NAME: &str = "myApp";
const COLL_NAME: &str = "users";

use thiserror::Error;

// Custom error type for MongoDB operations
#[derive(Error, Debug)]
pub enum Error {
    #[error("MongoDB Error")]
    ErrorMongo(#[from] mongodb::error::Error),
}

pub type AppResult<T> = Result<T, Error>;

// Implement error response writer for our custom error type
#[async_trait]
impl Writer for Error {
    async fn write(self, _req: &mut Request, _depot: &mut Depot, _res: &mut Response) {}
}

// User model representing the document structure in MongoDB
#[derive(Debug, Deserialize, Serialize)]
struct User {
    _id: Option<ObjectId>,
    first_name: String,
    last_name: String,
    username: String,
    email: String,
}

// Global MongoDB client instance
static MONGODB_CLIENT: OnceLock<Client> = OnceLock::new();

// Helper function to get the MongoDB client instance
#[inline]
pub fn get_mongodb_client() -> &'static Client {
    MONGODB_CLIENT.get().unwrap()
}

// Handler for adding a new user to the database
#[handler]
async fn add_user(req: &mut Request, res: &mut Response) {
    let client = get_mongodb_client();
    let coll_users = client.database(DB_NAME).collection::<Document>(COLL_NAME);
    let new_user = req.parse_json::<User>().await.unwrap();

    // Create BSON document from user data
    let user = doc! {
        "first_name": new_user.first_name,
        "last_name": new_user.last_name,
        "username": new_user.username,
        "email": new_user.email,
    };

    // Insert user document into MongoDB
    let result = coll_users.insert_one(user, None).await;
    match result {
        Ok(id) => res.render(format!("user added with ID {:?}", id.inserted_id)),
        Err(e) => res.render(format!("error {e:?}")),
    }
}

// Handler for retrieving all users from the database
#[handler]
async fn get_users(res: &mut Response) -> AppResult<()> {
    let client = get_mongodb_client();
    let coll_users = client.database(DB_NAME).collection::<User>(COLL_NAME);
    // Find all users and convert cursor to vector
    let mut cursor = coll_users.find(None, None).await?;
    let mut vec_users: Vec<User> = Vec::new();
    while let Some(user) = cursor.try_next().await? {
        vec_users.push(user);
    }
    res.render(Json(vec_users));
    Ok(())
}

// Handler for retrieving a single user by username
#[handler]
async fn get_user(req: &mut Request, res: &mut Response) {
    let client = get_mongodb_client();
    let coll_users: Collection<User> = client.database(DB_NAME).collection(COLL_NAME);

    let username = req.param::<String>("username").unwrap();
    // Find user by username
    match coll_users
        .find_one(doc! { "username": &username }, None)
        .await
    {
        Ok(Some(user)) => res.render(Json(user)),
        Ok(None) => res.render(format!("No user found with username {username}")),
        Err(e) => res.render(format!("error {e:?}")),
    }
}

// Create a unique index on the username field
async fn create_username_index(client: &Client) {
    let options = IndexOptions::builder().unique(true).build();
    let model = IndexModel::builder()
        .keys(doc! { "username": 1 })
        .options(options)
        .build();
    client
        .database(DB_NAME)
        .collection::<User>(COLL_NAME)
        .create_index(model, None)
        .await
        .expect("creating an index should succeed");
}

#[tokio::main]
async fn main() {
    // Initialize logging system
    tracing_subscriber::fmt().init();

    // Get MongoDB connection URI from environment or use default
    let mongodb_uri =
        std::env::var("MONGODB_URI").unwrap_or_else(|_| "mongodb://10.1.1.80:27017".into());

    // Connect to MongoDB and create unique index
    let client = Client::with_uri_str(mongodb_uri)
        .await
        .expect("failed to connect");
    create_username_index(&client).await;

    // Store MongoDB client in global state
    MONGODB_CLIENT.set(client).unwrap();

    // Configure router with user management endpoints:
    // - GET /users : List all users
    // - POST /users : Create new user
    // - GET /users/{username} : Get user by username
    let router = Router::with_path("users")
        .get(get_users)
        .post(add_user)
        .push(Router::with_path("{username}").get(get_user));

    // Start server on port 5800
    let acceptor = TcpListener::new("0.0.0.0:5800").bind().await;
    Server::new(acceptor).serve(router).await;
}

---websocket---

---websocket/Cargo.mdx---

websocket/Cargo.toml
[package]
name = "example-websocket"
version = "0.1.0"
edition = "2024"


[dependencies]
futures-util = "0.3"
salvo = { version = "0.77.1", features = ["websocket"] }
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"
serde = "1"
serde_json = "1"

---websocket/src---

---websocket/src/main.mdx---

websocket/src/main.rs
use salvo::prelude::*;
use salvo::websocket::WebSocketUpgrade;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, Serialize)]
struct User {
    id: usize,
    name: String,
}
#[handler]
async fn connect(req: &mut Request, res: &mut Response) -> Result<(), StatusError> {
    let user = req.parse_queries::<User>();
    WebSocketUpgrade::new()
        .upgrade(req, res, |mut ws| async move {
            println!("{user:#?} ");
            while let Some(msg) = ws.recv().await {
                let msg = if let Ok(msg) = msg {
                    msg
                } else {
                    // client disconnected
                    return;
                };

                if ws.send(msg).await.is_err() {
                    // client disconnected
                    return;
                }
            }
        })
        .await
}

#[handler]
async fn index(res: &mut Response) {
    res.render(Text::Html(INDEX_HTML));
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();
    let router = Router::new()
        .get(index)
        .push(Router::with_path("ws").goal(connect));

    let acceptor = TcpListener::new("0.0.0.0:5800").bind().await;
    Server::new(acceptor).serve(router).await;
}

static INDEX_HTML: &str = r#"<!DOCTYPE html>
<html>
    <head>
        <title>WS</title>
    </head>
    <body>
        <h1>WS</h1>
        <div id="status">
            <p><em>Connecting...</em></p>
        </div>
        <script>
            const status = document.getElementById('status');
            const msg = document.getElementById('msg');
            const submit = document.getElementById('submit');
            const ws = new WebSocket(`ws://${location.host}/ws?id=123&name=chris`);

            ws.onopen = function() {
                status.innerHTML = '<p><em>Connected!</em></p>';
            };
        </script>
    </body>
</html>
"#;

---sse---

---sse/Cargo.mdx---

sse/Cargo.toml
[package]
name = "example-sse"
version = "0.1.0"
edition = "2024"


[dependencies]
futures-util = "0.3"
salvo = { version = "0.77.1", features = ["sse"] }
tokio = { version = "1", features = ["macros"] }
tokio-stream = { version = "0.1", features = ["net"] }
tracing = "0.1"
tracing-subscriber = "0.3"

---sse/src---

---sse/src/main.mdx---

sse/src/main.rs
// Copyright (c) 2018-2020 Sean McArthur
// Licensed under the MIT license http://opensource.org/licenses/MIT
//port from https://github.com/seanmonstar/warp/blob/master/examples/sse.rs

use std::convert::Infallible;
use std::time::Duration;

use futures_util::StreamExt;
use salvo::prelude::*;
use salvo::sse::{self, SseEvent};
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;

// create server-sent event
fn sse_counter(counter: u64) -> Result<SseEvent, Infallible> {
    Ok(SseEvent::default().text(counter.to_string()))
}

#[handler]
async fn handle_tick(res: &mut Response) {
    let event_stream = {
        let mut counter: u64 = 0;
        let interval = interval(Duration::from_secs(1));
        let stream = IntervalStream::new(interval);
        stream.map(move |_| {
            counter += 1;
            sse_counter(counter)
        })
    };
    sse::stream(res, event_stream);
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();

    let router = Router::with_path("ticks").get(handle_tick);

    let acceptor = TcpListener::new("0.0.0.0:5800").bind().await;
    Server::new(acceptor).serve(router).await;
}

---oapi-hello---

---oapi-hello/Cargo.mdx---

oapi-hello/Cargo.toml
[package]
name = "example-oapi-hello"
version = "0.1.0"
edition = "2024"


[dependencies]
salvo = { version = "0.77.1", features = ["oapi"] }
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"

---oapi-hello/src---

---oapi-hello/src/main.mdx---

oapi-hello/src/main.rs
use salvo::oapi::extract::*;
use salvo::prelude::*;

#[endpoint]
async fn hello(name: QueryParam<String, false>) -> String {
    format!("Hello, {}!", name.as_deref().unwrap_or("World"))
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();

    let router = Router::new().push(Router::with_path("hello").get(hello));

    let doc = OpenApi::new("test api", "0.0.1").merge_router(&router);

    let router = router
        .unshift(doc.into_router("/api-doc/openapi.json"))
        .unshift(SwaggerUi::new("/api-doc/openapi.json").into_router("/swagger-ui"));

    let acceptor = TcpListener::new("0.0.0.0:5800").bind().await;
    Server::new(acceptor).serve(router).await;
}

---static-embed-file---

---static-embed-file/static---

---static-embed-file/static/test2.mdx---

static-embed-file/static/test2.txt
copy2

---static-embed-file/static/test1.mdx---

static-embed-file/static/test1.txt
copy1

---static-embed-file/Cargo.mdx---

static-embed-file/Cargo.toml
[package]
name = "example-static-embed-file"
version = "0.1.0"
edition = "2024"


[dependencies]
rust-embed = { version = ">= 6, <= 9" }
salvo = { version = "0.77.1", features = ["serve-static"]}
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"

---static-embed-file/src---

---static-embed-file/src/main.mdx---

static-embed-file/src/main.rs
use rust_embed::RustEmbed;
use salvo::prelude::*;
use salvo::serve_static::EmbeddedFileExt;

#[derive(RustEmbed)]
#[folder = "static"]
struct Assets;

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();

    let router = Router::with_path("{**rest}").get(serve_file);

    let acceptor = TcpListener::new("0.0.0.0:5800").bind().await;
    Server::new(acceptor).serve(router).await;
}

#[handler]
async fn serve_file(req: &mut Request, res: &mut Response) {
    let path = req.param::<String>("rest").unwrap();
    if let Some(file) = Assets::get(&path) {
        file.render(req, res);
    } else {
        res.status_code(StatusCode::NOT_FOUND);
    }
}

---tls-rustls-reload---

---tls-rustls-reload/certs---

---tls-rustls-reload/Cargo.mdx---

tls-rustls-reload/Cargo.toml
[package]
name = "example-tls-rustls-reload"
version = "0.1.0"
edition = "2024"


[dependencies]
async-stream.workspace = true
salvo = { version = "0.77.1", features = ["rustls"] }
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"

---tls-rustls-reload/src---

---tls-rustls-reload/src/main.mdx---

tls-rustls-reload/src/main.rs
use salvo::conn::rustls::{Keycert, RustlsConfig};
use salvo::prelude::*;
use tokio::time::Duration;

#[handler]
async fn hello(res: &mut Response) {
    res.render(Text::Plain("Hello World"));
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();

    let router = Router::new().get(hello);
    let acceptor = TcpListener::new("0.0.0.0:5800")
        .rustls(async_stream::stream! {
            loop {
                yield load_config();
                tokio::time::sleep(Duration::from_secs(60)).await;
            }
        })
        .bind()
        .await;
    Server::new(acceptor).serve(router).await;
}

fn load_config() -> RustlsConfig {
    RustlsConfig::new(
        Keycert::new()
            .cert(include_bytes!("../certs/cert.pem").as_ref())
            .key(include_bytes!("../certs/key.pem").as_ref()),
    )
}

---craft---

---craft/Cargo.mdx---

craft/Cargo.toml
[package]
name = "example-craft"
version = "0.1.0"
edition = "2024"

[dependencies]
salvo = { version = "0.77.1", features = ["craft", "oapi"] }
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"

---craft/src---

---craft/src/main.mdx---

craft/src/main.rs
use salvo::oapi::extract::*;
use salvo::prelude::*;
use std::sync::Arc;

// Options struct holding a state value for calculations
#[derive(Clone)]
pub struct Opts {
    state: i64,
}

// Implement methods for Opts using the craft macro for API generation
#[craft]
impl Opts {
    // Constructor for Opts
    fn new(state: i64) -> Self {
        Self { state }
    }

    // Handler method that adds state value to two query parameters
    #[craft(handler)]
    fn add1(&self, left: QueryParam<i64>, right: QueryParam<i64>) -> String {
        (self.state + *left + *right).to_string()
    }

    // Endpoint method using Arc for shared state
    #[craft(endpoint)]
    pub(crate) fn add2(
        self: ::std::sync::Arc<Self>,
        left: QueryParam<i64>,
        right: QueryParam<i64>,
    ) -> String {
        (self.state + *left + *right).to_string()
    }

    // Static endpoint method with custom error response
    #[craft(endpoint(responses((status_code = 400, description = "Wrong request parameters."))))]
    pub fn add3(left: QueryParam<i64>, right: QueryParam<i64>) -> String {
        (*left + *right).to_string()
    }
}

#[tokio::main]
async fn main() {
    // Create shared state with initial value 1
    let opts = Arc::new(Opts::new(1));

    // Configure router with three endpoints:
    // - /add1: Uses instance method with state
    // - /add2: Uses Arc-wrapped instance method
    // - /add3: Uses static method without state
    let router = Router::new()
        .push(Router::with_path("add1").get(opts.add1()))
        .push(Router::with_path("add2").get(opts.add2()))
        .push(Router::with_path("add3").get(Opts::add3()));

    // Generate OpenAPI documentation
    let doc = OpenApi::new("Example API", "0.0.1").merge_router(&router);

    // Add OpenAPI documentation and Swagger UI routes
    let router = router
        .push(doc.into_router("/api-doc/openapi.json"))
        .push(SwaggerUi::new("/api-doc/openapi.json").into_router("swagger-ui"));

    // Start server on localhost:5800
    let acceptor = TcpListener::new("127.0.0.1:5800").bind().await;
    Server::new(acceptor).serve(router).await;
}

---affix-state---

---affix-state/Cargo.mdx---

affix-state/Cargo.toml
[package]
name = "example-affix-state"
version = "0.1.0"
edition = "2024"

[dependencies]
salvo = { version = "0.77.1", features = ["affix-state"] }
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"

---affix-state/src---

---affix-state/src/main.mdx---

affix-state/src/main.rs
use std::sync::Arc;
use std::sync::Mutex;

use salvo::prelude::*;

// Configuration structure with username and password
#[allow(dead_code)]
#[derive(Default, Clone, Debug)]
struct Config {
    username: String,
    password: String,
}

// State structure to hold a list of fail messages
#[derive(Default, Debug)]
struct State {
    fails: Mutex<Vec<String>>,
}

#[handler]
async fn hello(depot: &mut Depot) -> String {
    // Obtain the Config instance from the depot
    let config = depot.obtain::<Config>().unwrap();
    // Get custom data from the depot
    let custom_data = depot.get::<&str>("custom_data").unwrap();
    // Obtain the shared State instance from the depot
    let state = depot.obtain::<Arc<State>>().unwrap();
    // Lock the fails vector and add a new fail message
    let mut fails_ref = state.fails.lock().unwrap();
    fails_ref.push("fail message".into());
    // Format and return the response string
    format!("Hello World\nConfig: {config:#?}\nFails: {fails_ref:#?}\nCustom Data: {custom_data}")
}

#[tokio::main]
async fn main() {
    // Initialize the tracing subscriber for logging
    tracing_subscriber::fmt().init();

    // Create a Config instance with default username and password
    let config = Config {
        username: "root".to_string(),
        password: "pwd".to_string(),
    };

    // Set up the router with state injection and custom data
    let router = Router::new()
        // Use hoop to inject middleware and data into the request context
        .hoop(
            affix_state::inject(config)
                // Inject a shared State instance into the request context
                .inject(Arc::new(State {
                    fails: Mutex::new(Vec::new()),
                }))
                // Insert custom data into the request context
                .insert("custom_data", "I love this world!"),
        )
        // Register the hello handler for the root path
        .get(hello)
        // Add an additional route for the path "/hello" with the same handler
        .push(Router::with_path("hello").get(hello));

    // Bind the server to port 5800 and start serving
    let acceptor = TcpListener::new("0.0.0.0:5800").bind().await;
    Server::new(acceptor).serve(router).await;
}

---size-limiter---

---size-limiter/Cargo.mdx---

size-limiter/Cargo.toml
[package]
name = "example-size-limiter"
version = "0.1.0"
edition = "2024"


[dependencies]
salvo = { version = "0.77.1", features = ["size-limiter"]}
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"

---size-limiter/src---

---size-limiter/src/main.mdx---

size-limiter/src/main.rs
use std::fs::create_dir_all;
use std::path::Path;

use salvo::prelude::*;

#[handler]
async fn index(res: &mut Response) {
    res.render(Text::Html(INDEX_HTML));
}
#[handler]
async fn upload(req: &mut Request, res: &mut Response) {
    let file = req.file("file").await;
    if let Some(file) = file {
        let dest = format!("temp/{}", file.name().unwrap_or("file"));
        tracing::debug!(dest, "upload file");
        if let Err(e) = std::fs::copy(file.path(), Path::new(&dest)) {
            res.status_code(StatusCode::INTERNAL_SERVER_ERROR);
            res.render(Text::Plain(format!("file not found in request: {e}")));
        } else {
            res.render(Text::Plain(format!("File uploaded to {dest}")));
        }
    } else {
        res.status_code(StatusCode::BAD_REQUEST);
        res.render(Text::Plain("file not found in request"));
    }
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();

    create_dir_all("temp").unwrap();
    let router = Router::new()
        .get(index)
        .push(
            Router::new()
                .hoop(max_size(1024 * 1024 * 10))
                .path("limited")
                .post(upload),
        )
        .push(Router::with_path("unlimit").post(upload));

    let acceptor = TcpListener::new("0.0.0.0:5800").bind().await;
    Server::new(acceptor).serve(router).await;
}

static INDEX_HTML: &str = r#"<!DOCTYPE html>
<html>
    <head>
        <title>Upload file</title>
    </head>
    <body>
        <h1>Upload file</h1>
        <form action="/unlimit" method="post" enctype="multipart/form-data">
            <h3>Unlimit</h3>
            <input type="file" name="file" />
            <input type="submit" value="upload" />
        </form>
        <form action="/limited" method="post" enctype="multipart/form-data">
            <h3>Limited 10MiB</h3>
            <input type="file" name="file" />
            <input type="submit" value="upload" />
        </form>
    </body>
</html>
"#;

---tls-openssl---

---tls-openssl/certs---

---tls-openssl/Cargo.mdx---

tls-openssl/Cargo.toml
[package]
name = "example-tls-openssl"
version = "0.1.0"
edition = "2024"


[dependencies]
salvo = { version = "0.77.1", features = ["openssl"] }
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"

---tls-openssl/src---

---tls-openssl/src/main.mdx---

tls-openssl/src/main.rs
use salvo::conn::openssl::{Keycert, OpensslConfig};
use salvo::prelude::*;

#[handler]
async fn hello(res: &mut Response) {
    res.render(Text::Plain("Hello World"));
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();

    let router = Router::new().get(hello);
    let config = OpensslConfig::new(
        Keycert::new()
            .with_cert(include_bytes!("../certs/cert.pem").as_ref())
            .with_key(include_bytes!("../certs/key.pem").as_ref()),
    );
    let acceptor = TcpListener::new("0.0.0.0:5800")
        .openssl(config)
        .bind()
        .await;
    Server::new(acceptor).serve(router).await;
}

---webtransport-acme-http01---

---webtransport-acme-http01/static---

---webtransport-acme-http01/static/client.mdx---

webtransport-acme-http01/static/client.html
<!doctype html>
<html lang="en">
  <title>WebTransport over HTTP/3 client</title>
  <meta charset="utf-8">
  <!-- WebTransport origin trial token. See https://developer.chrome.com/origintrials/#/view_trial/793759434324049921 -->
  <meta http-equiv="origin-trial" content="AkSQvBVsfMTgBtlakApX94hWGyBPQJXerRc2Aq8g/sKTMF+yG62+bFUB2yIxaK1furrNH3KNNeJV00UZSZHicw4AAABceyJvcmlnaW4iOiJodHRwczovL2dvb2dsZWNocm9tZS5naXRodWIuaW86NDQzIiwiZmVhdHVyZSI6IldlYlRyYW5zcG9ydCIsImV4cGlyeSI6MTY0Mzc1OTk5OX0=">
  <script src="client.js"></script>
  <link rel="stylesheet" href="client.css">
  <meta name="viewport" content="width=device-width, initial-scale=1">
  <body>
  <div id="top">
    <div id="explanation">
      This tool can be used to connect to an arbitrary WebTransport server.
      It has several limitations:
      <ul>
        <li>It can only send an entirety of a stream at once.  Once the stream
          is opened, all of the data is immediately sent, and the write side of
          the steam is closed.</li>
        <li>This tool does not listen to server-initiated bidirectional
          streams.</li>
        <li>Stream IDs are different from the one used by QUIC on the wire, as
          the on-the-wire IDs are not exposed via the Web API.</li>
        <li>The <code>WebTransport</code> object can be accessed using the developer console via <code>currentTransport</code>.</li>
      </ul>
    </div>
    <div id="tool">
    <h1>WebTransport over HTTP/3 client</h1>
    <div>
      <h2>Establish WebTransport connection</h2>
      <div class="input-line">
      <label for="url">URL:</label>
      <input type="text" name="url" id="url"
             value="https://0.0.0.0:5800/counter">
      <input type="button" id="connect" value="Connect" onclick="connect()">
      </div>
    </div>
    <div>
      <h2>Send data over WebTransport</h2>
      <form name="sending">
      <textarea name="data" id="data"></textarea>
      <div>
        <input type="radio" name="sendtype" value="datagram"
               id="datagram" checked>
        <label for="datagram">Send a datagram</label>
      </div>
      <div>
        <input type="radio" name="sendtype" value="unidi" id="unidi-stream">
        <label for="unidi-stream">Open a unidirectional stream</label>
      </div>
      <div>
        <input type="radio" name="sendtype" value="bidi" id="bidi-stream">
        <label for="bidi-stream">Open a bidirectional stream</label>
      </div>
      <input type="button" id="send" name="send" value="Send data"
             disabled onclick="sendData()">
      </form>
    </div>
    <div>
      <h2>Event log</h2>
      <ul id="event-log">
      </ul>
    </div>
    </div>
  </div>
  </body>
</html>

---webtransport-acme-http01/Cargo.mdx---

webtransport-acme-http01/Cargo.toml
[package]
name = "example-webtransport-acme-http01"
version = "0.1.0"
edition = "2024"


[dependencies]
anyhow = "1"
futures-util = "0.3"
salvo = { version = "0.77.1", features = ["acme", "quinn", "anyhow", "serve-static"] }
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"
serde = "1"
serde_json = "1"
bytes = "1"

---webtransport-acme-http01/src---

---webtransport-acme-http01/src/main.mdx---

webtransport-acme-http01/src/main.rs
use std::time::Duration;

use anyhow::{Context, Result};
use bytes::{BufMut, Bytes, BytesMut};
use salvo::prelude::*;
use salvo::proto::webtransport;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::pin;

macro_rules! log_result {
    ($expr:expr) => {
        if let Err(err) = $expr {
            tracing::error!("{err:?}");
        }
    };
}
async fn echo_stream<T, R>(send: T, recv: R) -> anyhow::Result<()>
where
    T: AsyncWrite,
    R: AsyncRead,
{
    pin!(send);
    pin!(recv);

    tracing::info!("Got stream");
    let mut buf = Vec::new();
    recv.read_to_end(&mut buf).await?;

    let message = Bytes::from(buf);
    send_chunked(send, message).await?;

    Ok(())
}
// Used to test that all chunks arrive properly as it is easy to write an impl which only reads and
// writes the first chunk.
async fn send_chunked(mut send: impl AsyncWrite + Unpin, data: Bytes) -> anyhow::Result<()> {
    for chunk in data.chunks(4) {
        tokio::time::sleep(Duration::from_millis(100)).await;
        tracing::info!("Sending {chunk:?}");
        send.write_all(chunk).await?;
    }

    Ok(())
}

#[handler]
async fn connect(req: &mut Request) -> Result<(), salvo::Error> {
    let session = req.web_transport_mut().await.unwrap();
    let session_id = session.session_id();

    // This will open a bidirectional stream and send a message to the client right after connecting!
    let stream = session.open_bi(session_id).await?;

    tokio::spawn(async move {
        log_result!(open_bidi_test(stream).await);
    });
    loop {
        tokio::select! {
            datagram = session.accept_datagram() => {
                let datagram = datagram?;
                if let Some((_, datagram)) = datagram {
                    tracing::info!("Responding with {datagram:?}");
                    // Put something before to make sure encoding and decoding works and don't just
                    // pass through
                    let mut resp = BytesMut::from(&b"Response: "[..]);
                    resp.put(datagram);

                    session.send_datagram(resp.freeze())?;
                    tracing::info!("Finished sending datagram");
                }
            }
            uni_stream = session.accept_uni() => {
                let (id, stream) = uni_stream?.unwrap();

                let send = session.open_uni(id).await?;
                tokio::spawn( async move { log_result!(echo_stream(send, stream).await); });
            }
            stream = session.accept_bi() => {
                if let Some(webtransport::server::AcceptedBi::BidiStream(_, stream)) = stream? {
                    let (send, recv) = salvo::proto::quic::BidiStream::split(stream);
                    tokio::spawn( async move { log_result!(echo_stream(send, recv).await); });
                }
            }
            else => {
                break
            }
        }
    }

    tracing::info!("Finished handling session");

    Ok(())
}

async fn open_bidi_test<S>(mut stream: S) -> anyhow::Result<()>
where
    S: Unpin + AsyncRead + AsyncWrite,
{
    tracing::info!("Opening bidirectional stream");

    stream
        .write_all(b"Hello from a server initiated bidi stream")
        .await
        .context("Failed to respond")?;

    let mut resp = Vec::new();
    stream.shutdown().await?;
    stream.read_to_end(&mut resp).await?;

    tracing::info!("Got response from client: {resp:?}");

    Ok(())
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();

    let mut router = Router::new()
        .push(Router::with_path("counter").goal(connect))
        .push(
            Router::with_path("{*path}")
                .get(StaticDir::new(["webtransport/static", "./static"]).defaults("client.html")),
        );

    let listener = TcpListener::new("0.0.0.0:443")
        .acme()
        .cache_path("temp/letsencrypt")
        .add_domain("test.salvo.rs")
        .http01_challenge(&mut router)
        .quinn("0.0.0.0:443");
    let acceptor = listener.join(TcpListener::new("0.0.0.0:80")).bind().await;
    Server::new(acceptor).serve(router).await;
}

---acme-http01-quinn---

---acme-http01-quinn/Cargo.mdx---

acme-http01-quinn/Cargo.toml
[package]
name = "example-acme-http01-quinn"
version = "0.1.0"
edition = "2024"

[dependencies]
salvo = { version = "0.77.1", features = ["acme", "quinn"] }
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"

---acme-http01-quinn/src---

---acme-http01-quinn/src/main.mdx---

acme-http01-quinn/src/main.rs
use salvo::prelude::*;

// Handler function that returns "Hello World" for any request
#[handler]
async fn hello() -> &'static str {
    "Hello World"
}

#[tokio::main]
async fn main() {
    // Initialize logging
    tracing_subscriber::fmt().init();

    // Create a router and register the hello handler
    let mut router = Router::new().get(hello);

    // Set up a TCP listener on port 443 for HTTPS
    let listener = TcpListener::new("0.0.0.0:443")
        .acme() // Enable ACME for automatic SSL certificate management
        .cache_path("temp/letsencrypt") // Path to store the certificate cache
        .add_domain("test.salvo.rs") // replace with your domain
        .http01_challenge(&mut router) // Add routes to handle ACME challenge requests
        .quinn("0.0.0.0:443"); // Enable QUIC/HTTP3 support on the same port

    // Create an acceptor that listens on both port 80 (HTTP) and port 443 (HTTPS)
    let acceptor = listener.join(TcpListener::new("0.0.0.0:80")).bind().await;

    // Start the server with the configured acceptor and router
    Server::new(acceptor).serve(router).await;
}

---caching-headers---

---caching-headers/Cargo.mdx---

caching-headers/Cargo.toml
[package]
name = "example-caching-headers"
version = "0.1.0"
edition = "2024"

[dependencies]
salvo = { version = "0.77.1", features = ["caching-headers", "compression"] }
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"

---caching-headers/src---

---caching-headers/src/main.mdx---

caching-headers/src/main.rs
use salvo::prelude::*;

// Handler that returns a simple "Hello World" response
#[handler]
async fn hello() -> &'static str {
    "Hello World"
}

#[tokio::main]
async fn main() {
    // Initialize logging system
    tracing_subscriber::fmt().init();

    // Set up router with caching headers and compression middleware
    // CachingHeader must be before Compression to properly set cache control headers
    let router = Router::with_hoop(CachingHeaders::new())
        .hoop(Compression::new().min_length(0)) // Enable compression for all responses
        .get(hello);

    // Bind server to port 5800 and start serving
    let acceptor = TcpListener::new("0.0.0.0:5800").bind().await;
    Server::new(acceptor).serve(router).await;
}

---basic-auth---

---basic-auth/Cargo.mdx---

basic-auth/Cargo.toml
[package]
name = "example-basic-auth"
version = "0.1.0"
edition = "2024"


[dependencies]
salvo = { version = "0.77.1", features = ["basic-auth", "test"] }
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"

---basic-auth/src---

---basic-auth/src/main.mdx---

basic-auth/src/main.rs
use salvo::basic_auth::{BasicAuth, BasicAuthValidator};
use salvo::prelude::*;

// Custom validator implementing BasicAuthValidator trait
struct Validator;
impl BasicAuthValidator for Validator {
    // Validate username and password combination
    async fn validate(&self, username: &str, password: &str, _depot: &mut Depot) -> bool {
        username == "root" && password == "pwd"
    }
}

// Simple handler that returns "Hello" for authenticated requests
#[handler]
async fn hello() -> &'static str {
    "Hello"
}

// Create router with basic authentication middleware
fn route() -> Router {
    // Initialize basic authentication handler with our validator
    let auth_handler = BasicAuth::new(Validator);
    // Apply authentication middleware to the router
    Router::with_hoop(auth_handler).goal(hello)
}

#[tokio::main]
async fn main() {
    // Initialize logging
    tracing_subscriber::fmt().init();

    // Bind server to port 5800 and start serving
    let acceptor = TcpListener::new("0.0.0.0:5800").bind().await;
    Server::new(acceptor).serve(route()).await;
}

#[cfg(test)]
mod tests {
    use salvo::prelude::*;
    use salvo::test::{ResponseExt, TestClient};

    #[tokio::test]
    async fn test_basic_auth() {
        // Create a service instance from our router for testing purposes
        let service = Service::new(super::route());

        // Test case 1: Verify successful authentication with valid credentials
        let content = TestClient::get("http://0.0.0.0:5800/")
            .basic_auth("root", Some("pwd")) // Use correct username/password
            .send(&service) // Send the request to the service
            .await
            .take_string() // Extract response body as string
            .await
            .unwrap();
        // Verify response contains expected "Hello" message
        assert!(content.contains("Hello"));

        // Test case 2: Verify authentication failure with invalid password
        let content = TestClient::get("http://0.0.0.0:5800/")
            .basic_auth("root", Some("pwd2")) // Use incorrect password
            .send(&service) // Send the request to the service
            .await
            .take_string() // Extract response body as string
            .await
            .unwrap();
        // Verify response contains "Unauthorized" error
        assert!(content.contains("Unauthorized"));
    }
}

---rate-limiter-dynamic---

---rate-limiter-dynamic/Cargo.mdx---

rate-limiter-dynamic/Cargo.toml
[package]
name = "example-rate-limiter-dynamic"
version = "0.1.0"
edition = "2024"

[dependencies]
salvo = { version = "0.77.1", features = ["rate-limiter"] }
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"

---rate-limiter-dynamic/src---

---rate-limiter-dynamic/src/main.mdx---

rate-limiter-dynamic/src/main.rs
use std::borrow::Borrow;
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::LazyLock;

use salvo::Error;
use salvo::prelude::*;
use salvo::rate_limiter::{
    CelledQuota, MokaStore, QuotaGetter, RateIssuer, RateLimiter, SlidingGuard,
};

static USER_QUOTAS: LazyLock<HashMap<String, CelledQuota>> = LazyLock::new(|| {
    let mut map = HashMap::new();
    map.insert("user1".into(), CelledQuota::per_second(1, 1));
    map.insert("user2".into(), CelledQuota::set_seconds(1, 1, 5));
    map.insert("user3".into(), CelledQuota::set_seconds(1, 1, 10));
    map
});

struct UserIssuer;
impl RateIssuer for UserIssuer {
    type Key = String;
    async fn issue(&self, req: &mut Request, _depot: &Depot) -> Option<Self::Key> {
        req.query::<Self::Key>("user")
    }
}

struct CustomQuotaGetter;
impl QuotaGetter<String> for CustomQuotaGetter {
    type Quota = CelledQuota;
    type Error = Error;

    async fn get<Q>(&self, key: &Q) -> Result<Self::Quota, Self::Error>
    where
        String: Borrow<Q>,
        Q: Hash + Eq + Sync,
    {
        USER_QUOTAS
            .get(key)
            .cloned()
            .ok_or_else(|| Error::other("user not found"))
    }
}

#[handler]
async fn limited() -> &'static str {
    "Limited page"
}
#[handler]
async fn home() -> Text<&'static str> {
    Text::Html(HOME_HTML)
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();

    let limiter = RateLimiter::new(
        SlidingGuard::new(),
        MokaStore::new(),
        UserIssuer,
        CustomQuotaGetter,
    );
    let router = Router::new()
        .get(home)
        .push(Router::with_path("limited").hoop(limiter).get(limited));
    let acceptor = TcpListener::new("0.0.0.0:5800").bind().await;
    Server::new(acceptor).serve(router).await;
}

static HOME_HTML: &str = r#"
<!DOCTYPE html>
<html>
    <head>
        <title>Rate Limiter Dynmaic</title>
    </head>
    <body>
        <h2>Rate Limiter Dynamic</h2>
        <p>
            This example shows how to set limit for different users.
        </p>
        <p>
            <a href="/limited?user=user1" target="_blank">Limited page for user1: 1/second</a>
        </p>
        <p>
            <a href="/limited?user=user2" target="_blank">Limited page for user2: 1/5seconds</a>
        </p>
        <p>
            <a href="/limited?user=user3" target="_blank">Limited page for user3: 1/10seconds</a>
        </p>
    </body>
</html>
"#;

---oapi-generics---

---oapi-generics/Cargo.mdx---

oapi-generics/Cargo.toml
[package]
name = "example-oapi-generics"
version = "0.1.0"
edition = "2024"


[dependencies]
salvo = { version = "0.77.1", features = ["oapi"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"

---oapi-generics/src---

---oapi-generics/src/main.mdx---

oapi-generics/src/main.rs
use salvo::oapi::extract::*;
use salvo::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, ToSchema, Debug)]
#[salvo(schema(aliases(MyI32 = MyObject<i32>, MyStr = MyObject<String>)))]
struct MyObject<T: ToSchema + std::fmt::Debug + 'static> {
    value: T,
}

/// Use string type, this will add to openapi doc.
#[endpoint]
async fn use_string(body: JsonBody<MyObject<String>>) -> String {
    format!("{:?}", body)
}

/// Use i32 type, this will add to openapi doc.
#[endpoint]
async fn use_i32(body: JsonBody<MyObject<i32>>) -> String {
    format!("{:?}", body)
}

/// Use u64 type, this will add to openapi doc.
#[endpoint]
async fn use_u64(body: JsonBody<MyObject<u64>>) -> String {
    format!("{:?}", body)
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();

    // Custom your OpenApi naming style. You should set it before using OpenApi.
    salvo::oapi::naming::set_namer(
        salvo::oapi::naming::FlexNamer::new()
            .short_mode(true)
            .generic_delimiter('_', '_'),
    );

    let router = Router::new()
        .push(Router::with_path("i32").post(use_i32))
        .push(Router::with_path("u64").post(use_u64))
        .push(Router::with_path("string").post(use_string));

    let doc = OpenApi::new("test api", "0.0.1").merge_router(&router);

    let router = router
        .unshift(doc.into_router("/api-doc/openapi.json"))
        .unshift(SwaggerUi::new("/api-doc/openapi.json").into_router("/swagger-ui"));

    let acceptor = TcpListener::new("0.0.0.0:5800").bind().await;
    Server::new(acceptor).serve(router).await;
}

---upload-files---

---upload-files/Cargo.mdx---

upload-files/Cargo.toml
[package]
name = "example-upload-files"
version = "0.1.0"
edition = "2024"


[dependencies]
salvo = { version = "0.77.1" }
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"

---upload-files/src---

---upload-files/src/main.mdx---

upload-files/src/main.rs
use std::fs::create_dir_all;
use std::path::Path;

use salvo::prelude::*;

#[handler]
async fn index(res: &mut Response) {
    res.render(Text::Html(INDEX_HTML));
}

#[handler]
async fn upload(req: &mut Request, res: &mut Response) {
    let files = req.files("files").await;
    if let Some(files) = files {
        let mut msgs = Vec::with_capacity(files.len());
        for file in files {
            let dest = format!("temp/{}", file.name().unwrap_or("file"));
            if let Err(e) = std::fs::copy(file.path(), Path::new(&dest)) {
                res.status_code(StatusCode::INTERNAL_SERVER_ERROR);
                res.render(Text::Plain(format!("file not found in request: {e}")));
            } else {
                msgs.push(dest);
            }
        }
        res.render(Text::Plain(format!(
            "Files uploaded:\n\n{}",
            msgs.join("\n")
        )));
    } else {
        res.status_code(StatusCode::BAD_REQUEST);
        res.render(Text::Plain("file not found in request"));
    }
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();

    create_dir_all("temp").unwrap();
    let router = Router::new().get(index).post(upload);

    let acceptor = TcpListener::new("0.0.0.0:5800").bind().await;
    Server::new(acceptor).serve(router).await;
}

static INDEX_HTML: &str = r#"<!DOCTYPE html>
<html>
    <head>
        <title>Upload files</title>
    </head>
    <body>
        <h1>Upload files</h1>
        <form action="/" method="post" enctype="multipart/form-data">
            <input type="file" name="files" multiple/>
            <input type="submit" value="upload" />
        </form>
    </body>
</html>
"#;

---todos-utoipa---

---todos-utoipa/Cargo.mdx---

todos-utoipa/Cargo.toml
[package]
name = "example-todos-utoipa"
version = "0.1.0"
edition = "2024"

[dependencies]
salvo = { version = "0.77.1", features = ["affix-state", "size-limiter"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["macros"] }
tracing = "0.1"
tracing-subscriber = "0.3"
utoipa = "4"
utoipa-swagger-ui = "*"

---todos-utoipa/src---

---todos-utoipa/src/main.mdx---

todos-utoipa/src/main.rs
use std::sync::{Arc, LazyLock};

use salvo::affix_state;
use salvo::http::header::{self, HeaderValue};
use salvo::http::response::ResBody;
use salvo::prelude::*;
use salvo::size_limiter;

use self::models::*;

use utoipa::openapi::security::{ApiKey, ApiKeyValue, SecurityScheme};
use utoipa::{Modify, OpenApi};
use utoipa_swagger_ui::Config;

static STORE: LazyLock<Db> = LazyLock::new(new_store);

#[handler]
async fn hello(res: &mut Response) {
    res.render("Hello");
}

#[derive(OpenApi)]
#[openapi(
    paths(
        list_todos,
        create_todo,
        delete_todo,
        update_todo,
    ),
    components(
        schemas(models::Todo, models::TodoError)
    ),
    modifiers(&SecurityAddon),
    tags(
        (name = "todo", description = "Todo items management endpoints.")
    )
)]
struct ApiDoc;

struct SecurityAddon;

impl Modify for SecurityAddon {
    fn modify(&self, openapi: &mut utoipa::openapi::OpenApi) {
        let components = openapi.components.as_mut().unwrap(); // we can unwrap safely since there already is components registered.
        components.add_security_scheme(
            "api_key",
            SecurityScheme::ApiKey(ApiKey::Header(ApiKeyValue::new("todo_apikey"))),
        )
    }
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();

    let acceptor = TcpListener::new("0.0.0.0:5800").bind().await;
    Server::new(acceptor).serve(route()).await;
}

pub(crate) fn route() -> Router {
    let config = Arc::new(Config::from("/api-doc/openapi.json"));
    Router::new()
        .get(hello)
        .push(
            Router::with_path("api").push(
                Router::with_path("todos")
                    .hoop(size_limiter::max_size(1024 * 16))
                    .get(list_todos)
                    .post(create_todo)
                    .push(
                        Router::with_path("{id}")
                            .put(update_todo)
                            .delete(delete_todo),
                    ),
            ),
        )
        .push(Router::with_path("/api-doc/openapi.json").get(openapi_json))
        .push(
            Router::with_path("/swagger-ui/{**}")
                .hoop(affix_state::inject(config))
                .get(serve_swagger),
        )
}

#[handler]
pub async fn openapi_json(res: &mut Response) {
    res.render(Json(ApiDoc::openapi()))
}

#[handler]
pub async fn serve_swagger(req: &mut Request, depot: &mut Depot, res: &mut Response) {
    let config = depot.obtain::<Arc<Config>>().unwrap();
    let path = req.uri().path();
    let tail = path.strip_prefix("/swagger-ui/").unwrap();

    match utoipa_swagger_ui::serve(tail, config.clone()) {
        Ok(swagger_file) => swagger_file
            .map(|file| {
                res.headers_mut().insert(
                    header::CONTENT_TYPE,
                    HeaderValue::from_str(&file.content_type).unwrap(),
                );
                res.body(ResBody::Once(file.bytes.to_vec().into()));
            })
            .unwrap_or_else(|| {
                res.status_code(StatusCode::NOT_FOUND);
            }),
        Err(_error) => {
            res.status_code(StatusCode::INTERNAL_SERVER_ERROR);
        }
    }
}

#[utoipa::path(
    get,
    path = "/api/todos",
    responses(
        (status = 200, description = "List all todos successfully", body = [Todo])
    )
)]
#[handler]
pub async fn list_todos(req: &mut Request, res: &mut Response) {
    let opts = req.parse_body::<ListOptions>().await.unwrap_or_default();
    let todos = STORE.lock().await;
    let todos: Vec<Todo> = todos
        .clone()
        .into_iter()
        .skip(opts.offset.unwrap_or(0))
        .take(opts.limit.unwrap_or(usize::MAX))
        .collect();
    res.render(Json(todos));
}

#[utoipa::path(
        post,
        path = "/api/todos",
        request_body = Todo,
        responses(
            (status = 201, description = "Todo created successfully", body = Todo),
            (status = 409, description = "Todo already exists", body = TodoError, example = json!(TodoError::Config(String::from("id = 1"))))
        )
    )]
#[handler]
pub async fn create_todo(req: &mut Request, res: &mut Response) {
    let new_todo = req.parse_body::<Todo>().await.unwrap();
    tracing::debug!(todo = ?new_todo, "create todo");

    let mut vec = STORE.lock().await;

    for todo in vec.iter() {
        if todo.id == new_todo.id {
            tracing::debug!(id = ?new_todo.id, "todo already exists");
            res.status_code(StatusCode::BAD_REQUEST);
            return;
        }
    }

    vec.push(new_todo);
    res.status_code(StatusCode::CREATED);
}

#[utoipa::path(
        put,
        path = "/api/todos/{id}",
        responses(
            (status = 200, description = "Todo modified successfully"),
            (status = 404, description = "Todo not found", body = TodoError, example = json!(TodoError::NotFound(String::from("id = 1"))))
        ),
        params(
            ("id" = i32, Path, description = "Id of todo item to modify")
        )
    )]
#[handler]
pub async fn update_todo(req: &mut Request, res: &mut Response) {
    let id = req.param::<u64>("id").unwrap();
    let updated_todo = req.parse_body::<Todo>().await.unwrap();
    tracing::debug!(todo = ?updated_todo, id = ?id, "update todo");
    let mut vec = STORE.lock().await;

    for todo in vec.iter_mut() {
        if todo.id == id {
            *todo = updated_todo;
            res.status_code(StatusCode::OK);
            return;
        }
    }

    tracing::debug!(id = ?id, "todo is not found");
    res.status_code(StatusCode::NOT_FOUND);
}

#[utoipa::path(
    delete,
    path = "/api/todos/{id}",
    responses(
        (status = 200, description = "Todo deleted successfully"),
        (status = 401, description = "Unauthorized to delete Todo"),
        (status = 404, description = "Todo not found", body = TodoError, example = json!(TodoError::NotFound(String::from("id = 1"))))
    ),
    params(
        ("id" = i32, Path, description = "Id of todo item to delete")
    ),
    security(
        ("api_key" = [])
    )
)]
#[handler]
pub async fn delete_todo(req: &mut Request, res: &mut Response) {
    let id = req.param::<u64>("id").unwrap();
    tracing::debug!(id = ?id, "delete todo");

    let mut vec = STORE.lock().await;

    let len = vec.len();
    vec.