Files
ox_speak_server/src/network/udp_back.rs
2025-07-15 17:05:04 +02:00

193 lines
6.8 KiB
Rust

use tokio::net::UdpSocket;
use tokio::sync::RwLock;
use std::error::Error;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use dashmap::DashMap;
use tokio::task::AbortHandle;
use tokio::time::{sleep, Instant};
use crate::domain::client::Client;
use crate::domain::event::{Event, EventBus};
use crate::network::protocol::{UdpClientMessage, UdpServerMessage};
#[derive(Clone)]
pub struct UdpServer {
event_bus: EventBus,
socket: Arc<UdpSocket>,
abort_handle: Option<AbortHandle>,
clients: Arc<DashMap<SocketAddr, Client>>,
}
impl UdpServer {
pub async fn new(event_bus: EventBus, addr: &str) -> Self {
let socket = UdpSocket::bind(addr).await.unwrap();
let addr = socket.local_addr().unwrap();
println!("Socket UDP lié avec succès on {}", addr);
Self {
event_bus,
socket: Arc::new(socket),
abort_handle: None,
clients: Arc::new(DashMap::new()),
}
}
pub async fn start(&mut self) -> Result<(), Box<dyn Error>> {
println!("Démarrage du serveur UDP...");
let event_bus = self.event_bus.clone();
let socket = self.socket.clone();
let clients = self.clients.clone();
let recv_task = tokio::spawn(async move {
let mut buf = [0u8; 1500];
loop {
match socket.recv_from(&mut buf).await {
Ok((size, address)) => {
// Ajouter le client à la liste
// todo : solution vraiment pas idéal, il faudrait vraiment la repenser avec un système helo/bye
if !clients.contains_key(&address) {
let client = Client::new(address);
clients.insert(address, client);
println!("Nouveau client connecté: {}", address);
}else {
let mut client = clients.get_mut(&address).unwrap();
client.update_last_seen();
}
if let Ok(message) = UdpClientMessage::from_bytes(&buf[..size]) {
let event = Event::UdpIn { address, size, message };
event_bus.emit(event).await;
} else {
println!("Erreur lors du parsing du message de {}: {:?}", address, &buf[..size]);
}
}
Err(e) => {
match e.kind() {
std::io::ErrorKind::ConnectionReset |
std::io::ErrorKind::ConnectionAborted => {
// Silencieux pour les déconnexions normales
continue;
}
_ => {
println!("Erreur UDP: {}", e);
continue;
}
}
}
}
}
});
self.abort_handle = Some(recv_task.abort_handle());
Ok(())
}
pub async fn send(&self, address: SocketAddr, message: UdpServerMessage) -> bool {
let event_bus = self.event_bus.clone();
match self.socket.send_to(&message.to_byte(), address).await {
Ok(size) => {
event_bus.emit(Event::UdpOut { address, size, message }).await;
true
}
Err(e) => {
println!("Erreur lors de l'envoi du message à {}: {}", address, e);
// Optionnel : retirer le client si l'adresse est invalide
self.remove_client(address).await;
false
}
}
}
pub async fn group_send(&self, addr_list: Vec<SocketAddr>, message: UdpServerMessage) -> bool {
if addr_list.is_empty() {
return true;
}
let socket = self.socket.clone();
let clients = self.clients.clone();
let send_tasks: Vec<_> = addr_list.into_iter().map(|address| {
let event_bus = self.event_bus.clone();
let message_clone = message.clone();
let socket_clone = socket.clone();
let clients_clone = clients.clone();
tokio::spawn(async move {
match socket_clone.send_to(&message_clone.to_byte(), address).await {
Ok(size) => {
event_bus.emit(Event::UdpOut { address, size, message: message_clone }).await;
true
}
Err(e) => {
println!("Erreur lors de l'envoi du message à {}: {}", address, e);
// Optionnel : retirer le client si l'adresse est invalide
if clients_clone.contains_key(&address) {
clients_clone.remove(&address);
println!("Client {} retiré de la liste", address);
}
false
}
}
})
}).collect();
let mut all_success = true;
for task in send_tasks {
match task.await {
Ok(success) => {
if !success {
all_success = false;
}
}
Err(_) => {
all_success = false;
}
}
}
all_success
}
pub async fn all_send(&self, message: UdpServerMessage) -> bool {
let client_addresses = self.get_clients().await;
self.group_send(client_addresses, message).await
}
pub async fn get_clients(&self) -> Vec<SocketAddr> {
self.clients.iter()
.map(|entry| *entry.key())
.collect()
}
// Nouvelle méthode pour nettoyer les clients déconnectés
async fn remove_client(&self, address: SocketAddr) {
if self.clients.contains_key(&address){
self.clients.remove(&address);
println!("Client {} retiré de la liste", address);
}else {
println!("Client {} n'est pas dans la liste", address);
}
}
// Méthode pour nettoyer les clients inactifs
pub async fn cleanup_inactive_clients(&self) {
let timeout = Duration::from_secs(10);
let now = Instant::now();
let mut to_remove = Vec::new();
for entry in self.clients.iter() {
let address = *entry.key();
let client = entry.value();
if now.duration_since(client.last_seen()) > timeout {
to_remove.push(address);
}
}
for address in &to_remove {
println!("Suppression du client {}", address);
self.clients.remove(address);
}
}
}