Files
oxspeak_server/src/udp/server.rs
T
2026-05-16 17:57:54 +02:00

179 lines
6.6 KiB
Rust

use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
use tokio::sync::broadcast;
use super::metrics::UdpMetrics;
use super::router::RoutingTable;
use crate::config::NetworkConfig;
/// Taille du buffer de lecture pour chaque datagramme UDP entrant.
///
/// La RFC 768 limite les datagrammes UDP à 65 507 octets (payload max avec
/// en-têtes IP+UDP). Pour de la voix/vidéo compressée, les paquets réels
/// seront bien plus petits, mais on alloue le maximum une seule fois pour
/// éviter toute troncature silencieuse.
const UDP_READ_BUFFER_SIZE: usize = 65_507;
/// Erreurs pouvant survenir pendant l'opération du serveur UDP.
#[derive(Debug, thiserror::Error)]
pub enum UdpServerError {
#[error("failed to bind UDP socket to {addr}: {source}")]
Bind {
addr: SocketAddr,
#[source]
source: std::io::Error,
},
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
}
/// Serveur UDP asynchrone agissant comme routeur de paquets.
///
/// Reçoit des datagrammes entrants et les route vers les clients inscrits
/// dans les canaux correspondants via une [`RoutingTable`].
///
/// La configuration réseau est fournie par [`NetworkConfig`] (issue de
/// [`AppConfig`][crate::config::AppConfig]), qui centralise la lecture du
/// fichier TOML.
///
/// Les métriques sont collectées dans un [`UdpMetrics`] partageable via
/// [`Arc`] — passez-le à [`metrics::spawn_reporter`][super::metrics::spawn_reporter]
/// pour un reporting périodique automatique.
///
/// # Exemple
/// ```no_run
/// use std::time::Duration;
/// use oxspeak_server_lib::config::{AppConfig, NetworkConfig};
/// use oxspeak_server_lib::udp::server::UdpServer;
/// use oxspeak_server_lib::udp::metrics::{UdpMetrics, spawn_reporter};
///
/// #[tokio::main]
/// async fn main() {
/// let config = AppConfig::load().unwrap();
/// let metrics = UdpMetrics::new();
/// spawn_reporter(metrics.clone(), Duration::from_secs(10));
/// let (server, _shutdown_tx) = UdpServer::new(config.network, metrics);
/// server.run().await.unwrap();
/// }
/// ```
pub struct UdpServer {
bind_addr: SocketAddr,
routing_table: RoutingTable,
metrics: Arc<UdpMetrics>,
shutdown_rx: broadcast::Receiver<()>,
}
impl UdpServer {
/// Crée un nouveau [`UdpServer`] depuis la configuration réseau globale.
///
/// Retourne le serveur et un [`broadcast::Sender`] pour déclencher le
/// shutdown gracieux.
pub fn new(network: &NetworkConfig, metrics: Arc<UdpMetrics>) -> (Self, broadcast::Sender<()>) {
let bind_addr = SocketAddr::new(network.host.into(), network.udp_port);
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
(
Self {
bind_addr,
routing_table: RoutingTable::new(),
metrics,
shutdown_rx,
},
shutdown_tx,
)
}
/// Expose la table de routage de façon mutable pour y inscrire des
/// clients avant ou pendant l'exécution (via partage d'état ou messages).
pub fn routing_table_mut(&mut self) -> &mut RoutingTable {
&mut self.routing_table
}
/// Retourne une référence aux métriques du serveur.
pub fn metrics(&self) -> &Arc<UdpMetrics> {
&self.metrics
}
/// Bind le socket et démarre la boucle de routage.
///
/// Pour chaque datagramme reçu, le paquet est retransmis inline (sans
/// spawn de tâche) vers tous les clients abonnés au canal identifié.
/// La future se résout lorsqu'un signal de shutdown est reçu ou qu'une
/// erreur I/O fatale survient.
pub async fn run(mut self) -> Result<(), UdpServerError> {
let socket =
UdpSocket::bind(self.bind_addr)
.await
.map_err(|source| UdpServerError::Bind {
addr: self.bind_addr,
source,
})?;
tracing::info!(addr = %self.bind_addr, "UDP server listening");
let mut buf = vec![0u8; UDP_READ_BUFFER_SIZE];
loop {
tokio::select! {
result = socket.recv_from(&mut buf) => {
match result {
Ok((len, peer)) => {
self.metrics.inc_received(len as u64);
self.route_packet(&socket, &buf[..len], peer).await;
}
Err(err) => {
self.metrics.inc_recv_error();
tracing::error!(%err, "recv_from failed");
return Err(UdpServerError::Io(err));
}
}
}
_ = self.shutdown_rx.recv() => {
tracing::info!("UDP server shutting down");
break;
}
}
}
Ok(())
}
/// Route un paquet entrant vers les abonnés du canal correspondant.
///
/// # Logique actuelle (placeholder)
/// En l'absence de protocole applicatif défini, tous les paquets reçus
/// sont logués. Branche ici la logique d'identification du canal
/// (ex: lire un header de paquet pour extraire le `channel_id`).
async fn route_packet(&self, socket: &UdpSocket, data: &[u8], sender: SocketAddr) {
tracing::debug!(%sender, bytes = data.len(), "datagram received");
// TODO: extraire le channel_id depuis le header du paquet applicatif.
// Pour l'instant on utilise un canal de démonstration statique.
let channel_id = "default";
match self.routing_table.subscribers(channel_id) {
Some(clients) => {
for &client in clients {
// Ne pas renvoyer au sender lui-même.
if client == sender {
continue;
}
match socket.send_to(data, client).await {
Ok(_) => {
self.metrics.inc_sent(data.len() as u64);
}
Err(err) => {
self.metrics.inc_send_error();
tracing::warn!(%client, %err, "failed to forward packet");
}
}
}
}
None => {
self.metrics.inc_dropped();
tracing::debug!(%sender, channel = channel_id, "no subscribers, packet dropped");
}
}
}
}