This commit is contained in:
2025-07-15 17:05:04 +02:00
commit d72254c7d0
25 changed files with 2074 additions and 0 deletions

2
src/network/mod.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod protocol;
pub mod udp;

246
src/network/protocol.rs Normal file
View File

@@ -0,0 +1,246 @@
use std::collections::HashSet;
use bytes::{Bytes, BytesMut, Buf, BufMut};
use std::net::SocketAddr;
use uuid::Uuid;
use strum::{EnumIter, FromRepr};
#[repr(u8)]
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, EnumIter, FromRepr)]
pub enum UDPMessageType {
Ping = 0,
Audio = 1,
// Futurs types ici...
}
#[derive(Debug, Clone, PartialEq)]
pub enum UDPMessageData {
// Client messages - Zero-copy avec Bytes
ClientPing { message_id: Uuid },
ClientAudio { sequence: u16, data: Bytes },
// Server messages
ServerPing { message_id: Uuid },
ServerAudio { user: Uuid, sequence: u16, data: Bytes },
}
#[derive(Debug, Clone, PartialEq)]
pub struct UDPMessage {
pub data: UDPMessageData,
pub address: SocketAddr,
pub size: usize,
}
#[derive(Debug, Clone, PartialEq)]
pub struct UdpBroadcastMessage {
pub data: UDPMessageData,
pub addresses: HashSet<SocketAddr>, // ou Vec<SocketAddr> selon besoins
pub size: usize,
}
#[derive(Debug, Clone, PartialEq)]
pub enum ParseError {
EmptyData,
InvalidData,
InvalidMessageType,
InvalidUuid,
}
impl From<uuid::Error> for ParseError {
fn from(_: uuid::Error) -> Self {
ParseError::InvalidUuid
}
}
impl UDPMessageData {
// Parsing zero-copy depuis Bytes
pub fn from_client_bytes(mut data: Bytes) -> Result<Self, ParseError> {
if data.is_empty() {
return Err(ParseError::EmptyData);
}
let msg_type = data.get_u8(); // Consomme 1 byte
match msg_type {
0 => { // Ping
if data.remaining() < 16 {
return Err(ParseError::InvalidData);
}
let uuid_bytes = data.split_to(16); // Zero-copy split
let message_id = Uuid::from_slice(&uuid_bytes)?;
Ok(Self::ClientPing { message_id })
}
1 => { // Audio
if data.remaining() < 2 {
return Err(ParseError::InvalidData);
}
let sequence = data.get_u16(); // Big-endian par défaut
let audio_data = data; // Le reste pour l'audio
Ok(Self::ClientAudio { sequence, data: audio_data })
}
_ => Err(ParseError::InvalidMessageType),
}
}
// Constructeurs server
pub fn server_ping(message_id: Uuid) -> Self {
Self::ServerPing { message_id }
}
pub fn server_audio(user: Uuid, sequence: u16, data: Bytes) -> Self {
Self::ServerAudio { user, sequence, data }
}
// Sérialisation optimisée avec BytesMut
pub fn to_bytes(&self) -> Bytes {
match self {
Self::ServerPing { message_id } => {
let mut buf = BytesMut::with_capacity(17);
buf.put_u8(0); // Message type
buf.put_slice(message_id.as_bytes());
buf.freeze()
}
Self::ServerAudio { user, sequence, data } => {
let mut buf = BytesMut::with_capacity(19 + data.len());
buf.put_u8(1); // Message type
buf.put_slice(user.as_bytes());
buf.put_u16(*sequence);
buf.put_slice(data);
buf.freeze()
}
_ => panic!("Client messages cannot be serialized"),
}
}
pub fn to_vec(&self) -> Vec<u8> {
// pas très optimisé
self.to_bytes().to_vec()
}
pub fn message_type(&self) -> UDPMessageType {
match self {
Self::ClientPing { .. } | Self::ServerPing { .. } => UDPMessageType::Ping,
Self::ClientAudio { .. } | Self::ServerAudio { .. } => UDPMessageType::Audio,
}
}
// Calcule la taille du message sérialisé
pub fn size(&self) -> usize {
match self {
Self::ClientPing { .. } | Self::ServerPing { .. } => 17, // 1 + 16 (UUID)
Self::ClientAudio { data, .. } => 3 + data.len(), // 1 + 2 + audio_data
Self::ServerAudio { data, .. } => 19 + data.len(), // 1 + 16 + 2 + audio_data
}
}
}
impl UDPMessage {
// Parsing depuis slice -> Bytes (zero-copy si possible)
pub fn from_client_bytes(address: SocketAddr, data: &[u8]) -> Result<Self, ParseError> {
let original_size = data.len();
let bytes = Bytes::copy_from_slice(data); // Seule allocation
let data = UDPMessageData::from_client_bytes(bytes)?;
Ok(Self {
data,
address,
size: original_size
})
}
// Constructeurs server
pub fn server_ping(address: SocketAddr, message_id: Uuid) -> Self {
let data = UDPMessageData::server_ping(message_id);
let size = data.size();
Self { data, address, size }
}
pub fn server_audio(address: SocketAddr, user: Uuid, sequence: u16, data: Bytes) -> Self {
let msg_data = UDPMessageData::server_audio(user, sequence, data);
let size = msg_data.size();
Self { data: msg_data, address, size }
}
// Helpers
pub fn to_bytes(&self) -> Bytes {
self.data.to_bytes()
}
pub fn to_vec(&self) -> Vec<u8> {
self.data.to_vec()
}
pub fn message_type(&self) -> UDPMessageType {
self.data.message_type()
}
// Getters
pub fn size(&self) -> usize {
self.size
}
pub fn address(&self) -> SocketAddr {
self.address
}
}
impl UDPMessage {
// Helpers pour récupérer certain éléments des messages
pub fn get_message_id(&self) -> Option<Uuid> {
match &self.data {
UDPMessageData::ClientPing { message_id } => Some(*message_id),
UDPMessageData::ServerPing { message_id } => Some(*message_id),
_ => None,
}
}
}
// Helper pour compatibilité avec UDPMessageType
impl UDPMessageType {
pub fn from_message(data: &[u8]) -> Option<Self> {
if data.is_empty() {
return None;
}
Self::from_repr(data[0])
}
}
impl UdpBroadcastMessage {
// Constructeurs server
pub fn server_ping(addresses: HashSet<SocketAddr>, message_id: Uuid) -> Self {
let data = UDPMessageData::server_ping(message_id);
let size = data.size();
Self { data, addresses, size }
}
pub fn server_audio(addresses: HashSet<SocketAddr>, user: Uuid, sequence: u16, data: Bytes) -> Self {
let msg_data = UDPMessageData::server_audio(user, sequence, data);
let size = msg_data.size();
Self { data: msg_data, addresses, size }
}
// Conversion vers messages individuels (pour compatibilité)
pub fn to_individual_messages(&self) -> Vec<UDPMessage> {
self.addresses.iter().map(|&addr| {
UDPMessage {
data: self.data.clone(),
address: addr,
size: self.size,
}
}).collect()
}
// Helpers
pub fn to_bytes(&self) -> Bytes {
self.data.to_bytes()
}
pub fn addresses(&self) -> &HashSet<SocketAddr> {
&self.addresses
}
pub fn size(&self) -> usize {
self.size
}
}

104
src/network/udp.rs Normal file
View File

@@ -0,0 +1,104 @@
use tokio::net::UdpSocket;
use std::error::Error;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::task::AbortHandle;
use crate::domain::event::{Event, EventBus};
use crate::network::protocol::{UDPMessage, UdpBroadcastMessage};
#[derive(Clone)]
pub struct UdpServer {
event_bus: EventBus,
socket: Arc<UdpSocket>,
abort_handle: Option<AbortHandle>,
}
impl UdpServer {
pub async fn new(event_bus: EventBus, addr: &str) -> Self {
let socket = UdpSocket::bind(addr).await.unwrap();
let addr = socket.local_addr().unwrap();
println!("Socket UDP lié avec succès on {}", addr);
Self {
event_bus,
socket: Arc::new(socket),
abort_handle: None
}
}
pub async fn start(&mut self) -> Result<(), Box<dyn Error>> {
println!("Démarrage du serveur UDP...");
let event_bus = self.event_bus.clone();
let socket = self.socket.clone();
let recv_task = tokio::spawn(async move {
// Buffer réutilisable pour éviter les allocations
let mut buf = vec![0u8; 1500];
loop {
match socket.recv_from(&mut buf).await {
Ok((size, address)) => {
// Slice du buffer pour éviter de copier des données inutiles
if let Ok(message) = UDPMessage::from_client_bytes(address, &buf[..size]) {
event_bus.emit(Event::UdpIn(message)).await;
}
// Sinon, on ignore silencieusement le message malformé
}
Err(e) => {
match e.kind() {
std::io::ErrorKind::ConnectionReset |
std::io::ErrorKind::ConnectionAborted => {
// Silencieux pour les déconnexions normales
continue;
}
_ => {
println!("Erreur UDP: {}", e);
continue;
}
}
}
}
}
});
self.abort_handle = Some(recv_task.abort_handle());
Ok(())
}
pub async fn send_udp_message(&self, message: &UDPMessage) -> bool {
match self.socket.send_to(&message.to_bytes(), message.address()).await {
Ok(size) => {
self.event_bus.emit(Event::UdpOut(message.clone())).await;
true
}
Err(e) => {
println!("Erreur lors de l'envoi du message à {}: {}", message.address(), e);
false
}
}
}
pub async fn broadcast_udp_message(&self, message: &UdpBroadcastMessage) -> bool {
let bytes = message.to_bytes();
for &address in message.addresses() {
match self.socket.send_to(&bytes, address).await {
Ok(_) => {
// Emit individual event pour tracking
let individual_msg = UDPMessage {
data: message.data.clone(),
address,
size: message.size,
};
self.event_bus.emit(Event::UdpOut(individual_msg)).await;
}
Err(e) => {
println!("Erreur broadcast vers {}: {}", address, e);
}
}
}
true
}
}

193
src/network/udp_back.rs Normal file
View File

@@ -0,0 +1,193 @@
use tokio::net::UdpSocket;
use tokio::sync::RwLock;
use std::error::Error;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use dashmap::DashMap;
use tokio::task::AbortHandle;
use tokio::time::{sleep, Instant};
use crate::domain::client::Client;
use crate::domain::event::{Event, EventBus};
use crate::network::protocol::{UdpClientMessage, UdpServerMessage};
#[derive(Clone)]
pub struct UdpServer {
event_bus: EventBus,
socket: Arc<UdpSocket>,
abort_handle: Option<AbortHandle>,
clients: Arc<DashMap<SocketAddr, Client>>,
}
impl UdpServer {
pub async fn new(event_bus: EventBus, addr: &str) -> Self {
let socket = UdpSocket::bind(addr).await.unwrap();
let addr = socket.local_addr().unwrap();
println!("Socket UDP lié avec succès on {}", addr);
Self {
event_bus,
socket: Arc::new(socket),
abort_handle: None,
clients: Arc::new(DashMap::new()),
}
}
pub async fn start(&mut self) -> Result<(), Box<dyn Error>> {
println!("Démarrage du serveur UDP...");
let event_bus = self.event_bus.clone();
let socket = self.socket.clone();
let clients = self.clients.clone();
let recv_task = tokio::spawn(async move {
let mut buf = [0u8; 1500];
loop {
match socket.recv_from(&mut buf).await {
Ok((size, address)) => {
// Ajouter le client à la liste
// todo : solution vraiment pas idéal, il faudrait vraiment la repenser avec un système helo/bye
if !clients.contains_key(&address) {
let client = Client::new(address);
clients.insert(address, client);
println!("Nouveau client connecté: {}", address);
}else {
let mut client = clients.get_mut(&address).unwrap();
client.update_last_seen();
}
if let Ok(message) = UdpClientMessage::from_bytes(&buf[..size]) {
let event = Event::UdpIn { address, size, message };
event_bus.emit(event).await;
} else {
println!("Erreur lors du parsing du message de {}: {:?}", address, &buf[..size]);
}
}
Err(e) => {
match e.kind() {
std::io::ErrorKind::ConnectionReset |
std::io::ErrorKind::ConnectionAborted => {
// Silencieux pour les déconnexions normales
continue;
}
_ => {
println!("Erreur UDP: {}", e);
continue;
}
}
}
}
}
});
self.abort_handle = Some(recv_task.abort_handle());
Ok(())
}
pub async fn send(&self, address: SocketAddr, message: UdpServerMessage) -> bool {
let event_bus = self.event_bus.clone();
match self.socket.send_to(&message.to_byte(), address).await {
Ok(size) => {
event_bus.emit(Event::UdpOut { address, size, message }).await;
true
}
Err(e) => {
println!("Erreur lors de l'envoi du message à {}: {}", address, e);
// Optionnel : retirer le client si l'adresse est invalide
self.remove_client(address).await;
false
}
}
}
pub async fn group_send(&self, addr_list: Vec<SocketAddr>, message: UdpServerMessage) -> bool {
if addr_list.is_empty() {
return true;
}
let socket = self.socket.clone();
let clients = self.clients.clone();
let send_tasks: Vec<_> = addr_list.into_iter().map(|address| {
let event_bus = self.event_bus.clone();
let message_clone = message.clone();
let socket_clone = socket.clone();
let clients_clone = clients.clone();
tokio::spawn(async move {
match socket_clone.send_to(&message_clone.to_byte(), address).await {
Ok(size) => {
event_bus.emit(Event::UdpOut { address, size, message: message_clone }).await;
true
}
Err(e) => {
println!("Erreur lors de l'envoi du message à {}: {}", address, e);
// Optionnel : retirer le client si l'adresse est invalide
if clients_clone.contains_key(&address) {
clients_clone.remove(&address);
println!("Client {} retiré de la liste", address);
}
false
}
}
})
}).collect();
let mut all_success = true;
for task in send_tasks {
match task.await {
Ok(success) => {
if !success {
all_success = false;
}
}
Err(_) => {
all_success = false;
}
}
}
all_success
}
pub async fn all_send(&self, message: UdpServerMessage) -> bool {
let client_addresses = self.get_clients().await;
self.group_send(client_addresses, message).await
}
pub async fn get_clients(&self) -> Vec<SocketAddr> {
self.clients.iter()
.map(|entry| *entry.key())
.collect()
}
// Nouvelle méthode pour nettoyer les clients déconnectés
async fn remove_client(&self, address: SocketAddr) {
if self.clients.contains_key(&address){
self.clients.remove(&address);
println!("Client {} retiré de la liste", address);
}else {
println!("Client {} n'est pas dans la liste", address);
}
}
// Méthode pour nettoyer les clients inactifs
pub async fn cleanup_inactive_clients(&self) {
let timeout = Duration::from_secs(10);
let now = Instant::now();
let mut to_remove = Vec::new();
for entry in self.clients.iter() {
let address = *entry.key();
let client = entry.value();
if now.duration_since(client.last_seen()) > timeout {
to_remove.push(address);
}
}
for address in &to_remove {
println!("Suppression du client {}", address);
self.clients.remove(address);
}
}
}