diff --git a/Cargo.lock b/Cargo.lock index 5bda297..65fd413 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -184,6 +184,56 @@ dependencies = [ "windows", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -224,10 +274,16 @@ dependencies = [ ] [[package]] -name = "futures-core" -version = "0.3.31" +name = "getrandom" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasi", +] [[package]] name = "gimli" @@ -241,6 +297,12 @@ version = "0.15.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "84b26c544d002229e640969970a2e74021aadf6e2f96372b9c58eff97de08eb3" +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "indexmap" version = "2.9.0" @@ -283,16 +345,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "kanal" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e3953adf0cd667798b396c2fa13552d6d9b3269d7dd1154c4c416442d1ff574" -dependencies = [ - "futures-core", - "lock_api", -] - [[package]] name = "libc" version = "0.2.172" @@ -511,12 +563,13 @@ name = "ox_speak_rs" version = "0.1.0" dependencies = [ "cpal", + "crossbeam", "event-listener", - "kanal", - "num_enum", "opus", "parking_lot", + "strum", "tokio", + "uuid", ] [[package]] @@ -587,6 +640,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r-efi" +version = "5.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" + [[package]] name = "redox_syscall" version = "0.5.13" @@ -623,6 +682,26 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "serde" +version = "1.0.219" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.219" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "shlex" version = "1.3.0" @@ -635,6 +714,28 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "strum" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f64def088c51c9510a8579e3c5d67c65349dcf755e5479ad3d010aa6454e2c32" +dependencies = [ + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c77a8c5abcaf0f9ce05d62342b7d298c346515365c36b673df4ebe3ced01fde8" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn", +] + [[package]] name = "syn" version = "2.0.101" @@ -699,6 +800,18 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" +[[package]] +name = "uuid" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cf4199d1e5d15ddd86a694e4d0dffa9c323ce759fea589f00fef9d81cc1931d" +dependencies = [ + "getrandom", + "js-sys", + "serde", + "wasm-bindgen", +] + [[package]] name = "walkdir" version = "2.5.0" @@ -709,6 +822,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "wasi" +version = "0.14.2+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3" +dependencies = [ + "wit-bindgen-rt", +] + [[package]] name = "wasm-bindgen" version = "0.2.100" @@ -975,3 +1097,12 @@ checksum = "c06928c8748d81b05c9be96aad92e1b6ff01833332f281e8cfca3be4b35fc9ec" dependencies = [ "memchr", ] + +[[package]] +name = "wit-bindgen-rt" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" +dependencies = [ + "bitflags 2.9.1", +] diff --git a/Cargo.toml b/Cargo.toml index a97e7df..9defb93 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,11 +15,10 @@ path = "src/lib.rs" opus = "0.3" cpal = "0.16" #ringbuf = "0.4" -#crossbeam = "0.8" -kanal = "0.1" # attente du fix sur le recv_timeout qui fait burn le cpu -#kanal = { git = "https://github.com/fereidani/kanal.git" } +crossbeam = "0.8" event-listener = "5.4" -num_enum = "0.7" parking_lot = "0.12" +uuid = {version = "1.17", features = ["v4", "serde"]} tokio = "1.45" -#rtrb = "0.3" \ No newline at end of file +#rtrb = "0.3" +strum = {version = "0.27", features = ["derive"]} \ No newline at end of file diff --git a/src/app/context.rs b/src/app/context.rs index d6402ba..7f8cd3a 100644 --- a/src/app/context.rs +++ b/src/app/context.rs @@ -2,12 +2,22 @@ use std::sync::Arc; use crate::audio::capture::{AudioCapture, Microphone}; +use crate::net::udp_client::UdpSession; +use crate::services::audio::AudioCaptureService; +use crate::services::network::NetworkService; pub struct Context { // Audio related host: Arc, microphone: Microphone, - audio_capture: AudioCapture + audio_capture: AudioCapture, + + // Network related + udp_session: UdpSession, + + // Service related + audio_capture_service: AudioCaptureService, + network_service: NetworkService, // todo : mettre tous les contextes de l'app nécessaire au bon fonctionnement de celle ci, udpclient, catpure audio, lecture audio .... // L'idéal étant que tout soit chargé "statiquement" dans le contexte de l'application pour être "relié" dans le runtime @@ -19,18 +29,66 @@ impl Context { let microphone = Microphone::default(&host)?; let audio_capture = AudioCapture::new(microphone.clone()); - // Arc::new(UdpClient::new()) - // Arc::new(AudioCapture::new()) - // etc ... + let udp_session = UdpSession::new()?; + + let audio_capture_service = AudioCaptureService::new(audio_capture.sub_encoded(), udp_session.clone()); + let network_service = NetworkService::new(udp_session.clone()); Ok(Self { host, microphone, - audio_capture + audio_capture, + udp_session, + audio_capture_service, + network_service, }) } - pub fn get_host(&self) -> Arc { - self.host.clone() + // === composants === + pub fn start_audio_capture(&mut self) -> Result<(), String> { + self.audio_capture.start_capture() + } + + pub fn stop_audio_capture(&mut self) -> Result<(), String> { + self.audio_capture.stop_capture() + } + + // === Network === + pub fn connect_to(&mut self, server_addr: &str) -> Result<(), String> { + self.udp_session.connect(server_addr) + } + + + + // === Services === + pub fn start_audio_capture_service(&mut self) -> Result<(), String> { + self.audio_capture_service.audio_to_network(); + + Ok(()) + } + + // === getters === + pub fn host(&self) -> &Arc { + &self.host + } + + pub fn microphone(&self) -> &Microphone { + &self.microphone + } + + pub fn audio_capture(&mut self) -> &mut AudioCapture { + &mut self.audio_capture + } + + pub fn udp_session(&self) -> &UdpSession { + &self.udp_session + } + + pub fn audio_capture_service(&self) -> &AudioCaptureService { + &self.audio_capture_service + } + + pub fn network_service(&self) -> &NetworkService { + &self.network_service } } \ No newline at end of file diff --git a/src/app/runtime.rs b/src/app/runtime.rs index bbf21f2..3a2dd0f 100644 --- a/src/app/runtime.rs +++ b/src/app/runtime.rs @@ -1,20 +1,41 @@ // Logiques actives (connexion des différents composants, création des threads ...) -use std::sync::Arc; +use std::sync::{Arc}; +use std::thread; +use std::time::Duration; +use parking_lot::Mutex; use crate::app::context::Context; pub struct Runtime{ - context: Arc + context: Arc> } impl Runtime { - pub fn new(context: Arc) -> Runtime { + pub fn new(context: Arc>) -> Runtime { Self { context } } - pub fn run(&self) { - + pub fn run(&mut self) -> Result<(), String> { + let mut context = self.context.lock(); + // Démarrage des services + context.audio_capture_service().start(); + context.network_service().start(); + + // Démarrage des composants + // Network + let _udp_client = context.connect_to("127.0.0.1:5000")?; + // Audio in + let _audio_capture = context.start_audio_capture()?; + + + Ok(()) + } + + pub fn main_loop(&self) { + loop { + thread::sleep(Duration::from_millis(100)); + } } } diff --git a/src/audio/capture.rs b/src/audio/capture.rs index e6cc9c7..56b5f79 100644 --- a/src/audio/capture.rs +++ b/src/audio/capture.rs @@ -4,8 +4,9 @@ use std::thread; use std::thread::JoinHandle; use cpal::{BufferSize, Device, Host, Stream, StreamConfig, SupportedStreamConfig, SampleRate, InputCallbackInfo}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +use crossbeam::channel::Receiver; use crate::audio::opus::{AudioOpus, AudioOpusEncoder}; -use crate::utils::event_bus::{AudioInputBus}; +use crate::utils::event_bus::{AudioInputBus, EncodedAudioData, RawAudioData}; use crate::utils::ringbuf::{RingBufReader, RingBuffer}; pub enum AudioCaptureError { @@ -72,7 +73,7 @@ pub struct AudioCapture { worker: Option>, running: Arc, - // exposed ringbuf + // exposed eventbus event_bus: AudioInputBus, } @@ -80,7 +81,7 @@ impl AudioCapture { pub fn new(microphone: Microphone) -> Self { Self { microphone, - ringbuf: RingBuffer::::new(48000), + ringbuf: RingBuffer::::new(4096), // 10 frames = 200 ms stream: None, worker: None, running: Arc::new(AtomicBool::new(false)), @@ -127,11 +128,11 @@ impl AudioCapture { stream.play().map_err(|e| format!("Erreur de lancement du stream: {e}"))?; self.stream = Some(stream); self.worker = Some(worker); - + println!("Audio capture started"); Ok(()) } - fn audio_processing(mut reader: RingBufReader, worker_running: Arc, mut encoder: AudioOpusEncoder, event_bus: AudioInputBus) { + fn audio_processing(reader: RingBufReader, worker_running: Arc, mut encoder: AudioOpusEncoder, event_bus: AudioInputBus) { let mut frame = [0i16; 960]; while worker_running.load(Ordering::Relaxed){ @@ -143,7 +144,7 @@ impl AudioCapture { break; } - let raw_data = Arc::new(frame.to_vec()); + let raw_data = frame.to_vec(); event_bus.broadcast_raw(raw_data.clone()); match encoder.encode(&frame) { @@ -159,7 +160,7 @@ impl AudioCapture { println!("Fermeture de l'audio processing.") } - pub fn stop_capture(&mut self){ + pub fn stop_capture(&mut self) -> Result<(), String> { println!("🛑 Arrêt en cours..."); // 1️⃣ Signal d'arrêt global @@ -180,5 +181,14 @@ impl AudioCapture { self.ringbuf.clear(); println!(" Capture audio arrêtée."); + Ok(()) + } + + pub fn sub_raw(&self) -> Receiver { + self.event_bus.subscribe_raw() + } + + pub fn sub_encoded(&self) -> Receiver { + self.event_bus.subscribe_encoded() } } diff --git a/src/audio/mixer.rs b/src/audio/mixer.rs new file mode 100644 index 0000000..6428375 --- /dev/null +++ b/src/audio/mixer.rs @@ -0,0 +1,59 @@ +// Mixage de toutes les sources audio +// il sera nécessaire d'aller chercher sur chaque source les buffer audio + +use std::thread; +use crossbeam::channel::{bounded, Receiver, Sender}; + +pub struct AudioMixer { + // Canal qui reçoit tout l'audio de l'app + audio_receiver: Receiver, + audio_sender: Sender, + + // Sortie vers playback + playback_sender: Sender, + + + // Buffer de mixage + mix_buffer: Vec, +} + +impl AudioMixer { + pub fn new(playback_sender: Sender) -> Self { + let (audio_sender, audio_receiver) = bounded(960*10); // 10 trames de retard max + Self { + audio_sender, + audio_receiver, + playback_sender, + mix_buffer: Vec::with_capacity(1920), // 20ms/stéréo/48khz + } + } + + pub fn start(&self) { + // Stratégie live (pas besoin d'être dans un thread) + // aller chercher les buffers audio de chaque user (UserManager.read_buffers()) + // aller chercher les buffers audio des notifications + // aller chercher les buffers audio de X... + // mixer le tout + // écrire dans mix_buffer + + // Stratégie pré-work (n-1) + // aller chercher les buffers audio de chaque user (UserManager.read_buffers()) + // aller chercher les buffers audio des notifications + // aller chercher les buffers audio de X... + // écrire dans mix_buffer + // real_time_event.sleep (on attend) + // Quand cpal.callback est déclencher, lecture de mix_buffer + // real_time_event.réveil (on relance au début) + // + + + let receiver = self.audio_receiver.clone(); + + thread::spawn(move || { + let mut pending_frames = Vec::::new(); + loop { + + } + }); + } +} \ No newline at end of file diff --git a/src/audio/mod.rs b/src/audio/mod.rs index f0b6b88..8fc22a3 100644 --- a/src/audio/mod.rs +++ b/src/audio/mod.rs @@ -1,4 +1,6 @@ pub mod capture; pub mod playback; pub mod opus; -pub mod stats; \ No newline at end of file +pub mod stats; +pub mod mixer; +pub mod user_manager; \ No newline at end of file diff --git a/src/audio/user_manager.rs b/src/audio/user_manager.rs new file mode 100644 index 0000000..75ce8ed --- /dev/null +++ b/src/audio/user_manager.rs @@ -0,0 +1,65 @@ +// Gestion de tous les utilisateur +// Mixer viendra chercher les frames (avec read_buffers) + +use std::collections::HashMap; +use std::sync::Arc; +use crate::utils::ringbuf::RingBuffer; + +pub type UserId = [u8; 16]; + +#[derive(Clone)] +pub struct UserManager { + users: Arc>, +} + +pub struct UserJitterBuffer { + ringbuf: RingBuffer +} + +impl UserManager { + pub fn new() -> Self { + Self { + users: Arc::new(HashMap::new()) + } + } + + pub fn read_buffers(&self) -> Vec<[i16; 960]> { + // todo : Pas optimisé du tout, juste une conception basique pour le moment. + // les frames seront "merge/mix" dans Mixer + let mut buffers = Vec::<[i16; 960]>::new(); + for user in self.users.values() { + let user_buffer = user.read_buffer(); + if user_buffer.len() == 960 { + buffers.push(user_buffer); + } + }; + return buffers; + } +} + +impl UserJitterBuffer { + pub fn new() -> Self { + Self { + ringbuf: RingBuffer::new(48000) + } + } + + pub fn read_buffer(&self) -> [i16; 960] { + // todo : Pas optimisé du tout, juste une conception basique pour le moment. + let reader = self.ringbuf.reader(); + let mut buffer = [0i16; 960]; + reader.pop_slice(&mut buffer); + return buffer; + } + + pub fn write_buffer(&self, buffer: Vec) { + // todo : Pas optimisé du tout, juste une conception basique pour le moment. + // Pour le moment le système est pensé en mode "tout va bien", + // mais il va falloir faire le système basé sur le numéro de la séquence (drop si paquet en retard par rapport à un autre etc ...) + + // décodage avec opus + // injecter dans self.ringbuf + } +} + + diff --git a/src/lib.rs b/src/lib.rs index 8c12cc3..8c507c9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ pub mod app; pub mod audio; pub mod net; -pub mod utils; \ No newline at end of file +pub mod utils; +pub mod services; \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 7570c89..67ef30f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,55 +1,14 @@ -// mod modules; -// mod utils; - - use std::sync::Arc; -use ox_speak_rs::app::context; +use parking_lot::{Mutex}; use ox_speak_rs::app::context::Context; use ox_speak_rs::app::runtime::Runtime; -// use modules::audio_processor_in::{Microphone, AudioCapture}; -// use modules::client::UdpClient; - -fn main() { - println!("Hello, world!"); - let microphone = Microphone::new(None).unwrap(); - let mut capture = AudioCapture::new(microphone).unwrap(); - let mut udpclient = UdpClient::new("127.0.0.1:5000".to_string()).unwrap(); - - // ✅ Subscribe avant start - let raw_receiver = capture.event_bus.subscribe_raw().unwrap(); - let encoded_receiver = capture.event_bus.subscribe_encoded().unwrap(); - - capture.start_capture().unwrap(); - udpclient.start().unwrap(); - - // Créer le handle audio (zero-copy, thread-safe) - let audio_handle = udpclient.create_audio_handle().unwrap(); - - // Thread pour traiter raw - std::thread::spawn(move || { - while let Ok(data) = raw_receiver.recv() { - // println!("📊 Raw: {} échantillons", data.len()); - } - }); - - // Thread pour traiter encoded - std::thread::spawn(move || { - while let Ok(encoded_data) = encoded_receiver.recv() { - if let Err(e) = audio_handle.send_audio_frame((*encoded_data).clone()) { - eprintln!("⚠️ {}", e); - } - } - }); - - // todo : delete ce qui a au dessus, je garde le temps de migrer. - let context = Arc::new(Context::new()); - let runtime = Runtime::new(context); - runtime.run(); - - - loop { - std::thread::sleep(std::time::Duration::from_secs(1)); - } +fn main() -> Result<(), String> { + let context = Context::new()?; + let context = Arc::new(Mutex::new(context)); + let mut runtime = Runtime::new(context); + runtime.run()?; + runtime.main_loop(); + Ok(()) } \ No newline at end of file diff --git a/src/net/message.rs b/src/net/message.rs index 1c4336c..cc416bd 100644 --- a/src/net/message.rs +++ b/src/net/message.rs @@ -1,74 +1,213 @@ -use num_enum::{IntoPrimitive, TryFromPrimitive}; - +use std::sync::Arc; +use strum::{EnumIter, FromRepr}; #[repr(u8)] -#[derive(Debug, Clone, Copy, PartialEq, IntoPrimitive, TryFromPrimitive)] +#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, EnumIter, FromRepr)] pub enum UDPMessageType { Ping = 0, Audio = 1, } +// Usage pour la conversion impl UDPMessageType { - pub fn peek_message_type(data: &[u8]) -> Option { - if data.is_empty() { + pub fn from_u8(value: u8) -> Option { + Self::from_repr(value) + } + + pub fn to_u8(self) -> u8 { + self as u8 + } + + pub fn from_message(message: &[u8]) -> Option { + if message.is_empty() { return None; } - Self::try_from(data[0]).ok() + + Self::from_u8(message[0]) } } -/// Message envoyé au serveur depuis le client +/// Messages client → serveur (SERIALIZE ONLY) #[derive(Debug, Clone, PartialEq)] pub enum MessageCall { - // 0[u8]uuid4[u8;16/uuid4] - Ping {message_id: [u8; 16]}, // (0) - // 1[u8]4000[u16]data[u8; unlimited] - Audio {sequence: u16, data: Vec}, + Ping { message_id: [u8; 16] }, + Audio { sequence: u16, data: Arc>}, // Utilisation de Arc pour éviter les copy entre les threads } -impl MessageCall { - pub fn ping(message_id: [u8; 16]) -> Self { - Self::Ping {message_id} - } - pub fn audio(sequence: u16, data: Vec) -> Self { - Self::Audio {sequence, data} - } -} - -/// Message reçu depuis le serveur au client +/// Messages serveur → client (DESERIALIZE ONLY) #[derive(Debug, Clone, PartialEq)] pub enum MessageEvent { - // 0[u8]uuid4[u8;16/uuid4] - Ping {message_id: [u8; 16]}, - // 1[u8]4000[u32]data[u8; unlimited] - Audio {user: [u8; 16],sequence: u32, data: Vec}, + Ping { message_id: [u8; 16] }, + Audio { user: [u8; 16], sequence: u32, data: Vec }, } -impl MessageEvent { - pub fn ping(message_id: [u8; 16]) -> Self { - Self::Ping {message_id} - } - pub fn audio(user: [u8; 16], sequence: u32, data: Vec) -> Self { - Self::Audio {user, sequence, data} - } -} +// ═══════════════════════════════════════════════════════════════════════════════ +// SERIALIZATION (Client → Serveur) +// ═══════════════════════════════════════════════════════════════════════════════ impl MessageCall { + /// Sérialisation optimisée pub fn to_bytes(&self) -> Vec { - let mut buf = Vec::::with_capacity(15000); match self { - MessageCall::Ping {message_id} => { + Self::Ping { message_id } => { + let mut buf = Vec::with_capacity(17); buf.push(UDPMessageType::Ping as u8); - buf.extend_from_slice(&message_id.to_vec()); - }, - MessageCall::Audio {sequence, data} => { + buf.extend_from_slice(message_id); + buf + } + Self::Audio { sequence, data } => { + let mut buf = Vec::with_capacity(3 + data.len()); buf.push(UDPMessageType::Audio as u8); buf.extend_from_slice(&sequence.to_be_bytes()); - buf.extend_from_slice(&data); + buf.extend_from_slice(data); + buf } } - buf } - - -} \ No newline at end of file + + // Constructeurs + pub fn ping(message_id: [u8; 16]) -> Self { + Self::Ping { message_id } + } + + pub fn audio(sequence: u16, data: Arc>) -> Self { + Self::Audio { sequence, data } + } +} + +// ═══════════════════════════════════════════════════════════════════════════════ +// DESERIALIZATION (Serveur → Client) +// ═══════════════════════════════════════════════════════════════════════════════ + +impl MessageEvent { + /// Désérialisation sécurisée + pub fn from_bytes(data: &[u8]) -> Result { + if data.is_empty() { + return Err(ParseError::EmptyData); + } + + let msg_type = UDPMessageType::from_u8(data[0]) + .ok_or(ParseError::InvalidMessageType(data[0]))?; + + match msg_type { + UDPMessageType::Ping => Self::parse_ping(data), + UDPMessageType::Audio => Self::parse_audio(data), + } + } + + fn parse_ping(data: &[u8]) -> Result { + if data.len() != 17 { + return Err(ParseError::InvalidLength { + expected: 17, + actual: data.len(), + message_type: "Ping", + }); + } + + let mut message_id = [0u8; 16]; + message_id.copy_from_slice(&data[1..17]); + Ok(Self::Ping { message_id }) + } + + fn parse_audio(data: &[u8]) -> Result { + if data.len() < 21 { + return Err(ParseError::InvalidLength { + expected: 21, + actual: data.len(), + message_type: "Audio (minimum)", + }); + } + + // user[16] + sequence[4] + data[n] + let mut user = [0u8; 16]; + user.copy_from_slice(&data[1..17]); + + let sequence = u32::from_be_bytes([data[17], data[18], data[19], data[20]]); + let audio_data = data[21..].to_vec(); + + Ok(Self::Audio { + user, + sequence, + data: audio_data, + }) + } + + // Constructeurs + pub fn ping(message_id: [u8; 16]) -> Self { + Self::Ping { message_id } + } + + pub fn audio(user: [u8; 16], sequence: u32, data: Vec) -> Self { + Self::Audio { user, sequence, data } + } +} + +// ═══════════════════════════════════════════════════════════════════════════════ +// ERROR HANDLING +// ═══════════════════════════════════════════════════════════════════════════════ + +#[derive(Debug, Clone, PartialEq)] +pub enum ParseError { + EmptyData, + InvalidMessageType(u8), + InvalidLength { + expected: usize, + actual: usize, + message_type: &'static str, + }, +} + +impl std::fmt::Display for ParseError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ParseError::EmptyData => write!(f, "Empty data received"), + ParseError::InvalidMessageType(t) => write!(f, "Invalid message type: {}", t), + ParseError::InvalidLength { expected, actual, message_type } => { + write!(f, "{} message: expected {} bytes, got {}", message_type, expected, actual) + } + } + } +} + +impl std::error::Error for ParseError {} + +// ═══════════════════════════════════════════════════════════════════════════════ +// UTILITIES +// ═══════════════════════════════════════════════════════════════════════════════ + +/// Fonction utilitaire pour inspecter rapidement un message +pub fn peek_message_type(data: &[u8]) -> Option { + if data.is_empty() { + return None; + } + UDPMessageType::from_u8(data[0]) +} + +/// Validation rapide sans parsing complet +pub fn validate_message_format(data: &[u8]) -> Result { + if data.is_empty() { + return Err(ParseError::EmptyData); + } + + let msg_type = UDPMessageType::from_u8(data[0]) + .ok_or(ParseError::InvalidMessageType(data[0]))?; + + // Validation basique des longueurs + match msg_type { + UDPMessageType::Ping if data.len() != 17 => { + Err(ParseError::InvalidLength { + expected: 17, + actual: data.len(), + message_type: "Ping", + }) + } + UDPMessageType::Audio if data.len() < 21 => { + Err(ParseError::InvalidLength { + expected: 21, + actual: data.len(), + message_type: "Audio (minimum)", + }) + } + _ => Ok(msg_type), + } +} diff --git a/src/net/udp_client.rs b/src/net/udp_client.rs index 220bb80..367ff9e 100644 --- a/src/net/udp_client.rs +++ b/src/net/udp_client.rs @@ -3,34 +3,74 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread::{self, JoinHandle}; use std::time::Duration; -use kanal::{bounded, Receiver, Sender}; +use crossbeam::channel::Receiver; +use parking_lot::RwLock; +use uuid::Uuid; +use crate::net::message::{MessageCall, MessageEvent, UDPMessageType}; +use crate::utils::event_bus::{UDPMessageBus, UDPMessageDispatcherBus}; -/// UdpClient = équivalent de "Microphone" (bas niveau, gère juste la socket) -pub struct UdpClient { - // Configuration de base - server_addr: Option, - socket: Option, - - // État simple - active: bool, +/// Session UDP partageable - handle principal +#[derive(Clone)] +pub struct UdpSession { + inner: Arc, + } -impl UdpClient { - /// Crée un nouveau client UDP (non connecté) - pub fn new() -> Self { - Self { - server_addr: None, - socket: None, - active: false, - } +/// Structure interne de la session +struct UdpSessionInner { + // Configuration + config: RwLock, + + // État + state: RwLock, + + // event_bus + event_bus: UDPMessageBus, + message_event_bus: UDPMessageDispatcherBus, + + // Contrôle des threads + running: Arc, + threads: RwLock>>, +} + +#[derive(Clone)] +struct SessionConfig { + server_addr: Option, + keepalive_interval: Duration, + socket_timeout: Duration, +} + +#[derive(Debug, Clone)] +enum SessionState { + Disconnected, + Connecting, + Connected, + Error(String), +} + +impl UdpSession { + /// Crée une nouvelle session UDP + pub fn new() -> Result { + let inner = Arc::new(UdpSessionInner { + config: RwLock::new(SessionConfig { + server_addr: None, + keepalive_interval: Duration::from_secs(10), + socket_timeout: Duration::from_millis(100), + }), + state: RwLock::new(SessionState::Disconnected), + event_bus: UDPMessageBus::with_capacity(4096), + message_event_bus: UDPMessageDispatcherBus::with_capacity(4096), + running: Arc::new(AtomicBool::new(false)), + threads: RwLock::new(Vec::new()), + }); + + Ok(Self { + inner, + }) } - /// Connecte au serveur - pub fn connect(&mut self, addr: A) -> Result<(), String> { - if self.active { - return Err("Client déjà actif".to_string()); - } - + /// Connecte à un serveur (SYNCHRONE) + pub fn connect(&self, addr: A) -> Result<(), String> { // Résoudre l'adresse let server_addr = addr .to_socket_addrs() @@ -38,288 +78,298 @@ impl UdpClient { .next() .ok_or_else(|| "Aucune adresse trouvée".to_string())?; + // Mettre à jour la configuration + { + let mut config = self.inner.config.write(); + config.server_addr = Some(server_addr); + } + + // Démarrer la session + self.start_internal()?; + + println!("🌐 UdpSession connectée à {}", server_addr); + Ok(()) + } + + /// Déconnecte la session (SYNCHRONE) + pub fn disconnect(&self) { + self.stop_internal(); + + let mut state = self.inner.state.write(); + *state = SessionState::Disconnected; + + println!("🌐 UdpSession déconnectée"); + } + + /// Change de serveur (SYNCHRONE) + pub fn change_server(&self, new_addr: A) -> Result<(), String> { + let was_connected = matches!(*self.inner.state.read(), SessionState::Connected); + + if was_connected { + self.disconnect(); + } + + self.connect(new_addr)?; + Ok(()) + } + + /// Envoie des données + pub fn send(&self, data: MessageCall) -> Result<(), String> { + if !matches!(*self.inner.state.read(), SessionState::Connected) { + return Err("Session non connectée".to_string()); + } + + self.inner.event_bus.broadcast_outgoing(data); + + // self.inner.outbound_tx + // .try_send(data) + // .map_err(|_| "Canal d'envoi saturé".to_string())?; + + Ok(()) + } + + // /// Récupère un message reçu (non-bloquant) + // pub fn try_recv(&self) -> Option<(Vec, SocketAddr)> { + // self.inner.inbound_rx.try_recv().ok() + // } + + // /// Récupère un message reçu (bloquant) + // pub fn recv(&self) -> Result<(Vec, SocketAddr), String> { + // self.inner.inbound_rx + // .recv() + // .map_err(|_| "Canal de réception fermé".to_string()) + // } + + // /// Obtenir un sender pour l'envoi (utile pour les services) + // pub fn get_sender(&self) -> Sender> { + // self.inner.outbound_tx.clone() + // } + + // /// Obtenir un receiver pour la réception (utile pour les services) + // pub fn get_receiver(&self) -> Receiver<(Vec, SocketAddr)> { + // self.inner.inbound_rx.clone() + // } + + /// État actuel de la session + pub fn state(&self) -> SessionState { + self.inner.state.read().clone() + } + + /// Vérifier si connecté + pub fn is_connected(&self) -> bool { + matches!(*self.inner.state.read(), SessionState::Connected) + } + + // === Méthodes internes (SYNCHRONES) === + + fn start_internal(&self) -> Result<(), String> { + if self.inner.running.load(Ordering::Relaxed) { + return Err("Session déjà démarrée".to_string()); + } + + let config = self.inner.config.read().clone(); + let server_addr = config.server_addr + .ok_or("Adresse serveur non définie")?; + // Créer la socket let socket = UdpSocket::bind("0.0.0.0:0") .map_err(|e| format!("Erreur socket: {}", e))?; - socket.set_read_timeout(Some(Duration::from_millis(100))) + socket.set_read_timeout(Some(config.socket_timeout)) .map_err(|e| format!("Erreur timeout: {}", e))?; - self.server_addr = Some(server_addr); - self.socket = Some(socket); + // Démarrer les threads + self.inner.running.store(true, Ordering::Relaxed); - println!(" UdpClient connecté à {}", server_addr); - Ok(()) - } - - /// Démarre avec un callback (équivalent de microphone.start(callback)) - // pub fn start(&mut self, data_callback: F) -> Result<(), String> - // where - // F: FnMut(&[u8], SocketAddr) + Send + 'static - // { - // if self.active { - // return Err("Client déjà actif".to_string()); - // } - // - // if self.socket.is_none() { - // return Err("Client non connecté".to_string()); - // } - // - // // Ici on utiliserait le callback pour le receiver - // // (implémentation simplifiée pour l'exemple) - // - // self.active = true; - // println!(" UdpClient démarré avec callback"); - // Ok(()) - // } - - /// Arrête le client - pub fn stop(&mut self) { - self.active = false; - println!(" UdpClient arrêté"); - } - - /// Déconnecte (ferme socket) - pub fn disconnect(&mut self) { - self.stop(); - self.socket = None; - self.server_addr = None; - println!(" UdpClient déconnecté"); - } - - /// Change de serveur - pub fn change_server(&mut self, new_addr: A) -> Result<(), String> { - let was_active = self.active; - - // Arrêter et déconnecter - self.disconnect(); - - // Reconnecter au nouveau serveur - self.connect(new_addr)?; - - // Redémarrer si c'était actif (nécessiterait de stocker le callback) - // Note: comme pour AudioCapture, c'est une limitation de cette approche - - Ok(()) - } - - /// Envoie des données (méthode simple) - pub fn send_to(&self, data: &[u8]) -> Result<(), String> { - if !self.active { - return Err("Client non actif".to_string()); - } - - let socket = self.socket.as_ref().ok_or("Socket fermée")?; - let addr = self.server_addr.ok_or("Adresse non définie")?; - - socket.send_to(data, addr) - .map_err(|e| format!("Erreur envoi: {}", e))?; - - Ok(()) - } - - /// Getters - pub fn is_active(&self) -> bool { self.active } - pub fn server_address(&self) -> Option { self.server_addr } - pub fn local_address(&self) -> Option { - self.socket.as_ref()?.local_addr().ok() - } -} - - -pub struct NetworkSession { - udp_client: UdpClient, - - // Configuration - keepalive_interval: u64, // secondes - - // EventBus public (comme AudioCapture) - // pub event_bus: NetworkEventBus, - - // État des threads - running: Arc, - threads: Vec>, -} - -impl NetworkSession { - /// Crée une nouvelle session - pub fn new(udp_client: UdpClient) -> Result { - Ok(Self { - udp_client, - keepalive_interval: 10, // 10 secondes par défaut - // event_bus: NetworkEventBus::new(), - running: Arc::new(AtomicBool::new(false)), - threads: Vec::new(), - }) - } - - /// Démarre la session - pub fn start_session(&mut self) -> Result<(), String> { - if self.running.load(Ordering::Relaxed) { - return Err("Session déjà démarrée".to_string()); - } - - // Cloner ce qui est nécessaire pour les threads - let server_addr = self.udp_client.server_address() - .ok_or("Client non connecté")?; - - // Créer les channels - let (send_tx, send_rx) = bounded(2000); - // let (recv_tx, recv_rx) = bounded(2000); - - // Cloner la socket - let socket_send = self.udp_client.socket.as_ref() - .ok_or("Socket non disponible")? - .try_clone() - .map_err(|e| format!("Erreur clone socket: {}", e))?; - - let socket_recv = self.udp_client.socket.as_ref() - .unwrap() - .try_clone() - .map_err(|e| format!("Erreur clone socket: {}", e))?; - - // Démarrer les threads (comme dans AudioCapture) - let running = Arc::clone(&self.running); - running.store(true, Ordering::Relaxed); + let mut threads = self.inner.threads.write(); // Thread sender - let running_send = Arc::clone(&running); - let sender_handle = thread::spawn(move || { - Self::sender_worker(socket_send, server_addr, send_rx, running_send); - }); + let sender_socket = socket.try_clone() + .map_err(|e| format!("Erreur clone socket: {}", e))?; + let sender_eventbus = self.inner.event_bus.clone(); + let sender_running = Arc::clone(&self.inner.running); + + threads.push(thread::spawn(move || { + Self::sender_worker(sender_socket, server_addr, sender_eventbus, sender_running); + })); // Thread receiver - let running_recv = Arc::clone(&running); - let receiver_handle = thread::spawn(move || { - Self::receiver_worker(socket_recv, running_recv); - }); + let receiver_socket = socket.try_clone() + .map_err(|e| format!("Erreur clone socket: {}", e))?; + let receiver_eventbus = self.inner.event_bus.clone(); + let receiver_message_eventbus = self.inner.message_event_bus.clone(); + let receiver_running = Arc::clone(&self.inner.running); + + threads.push(thread::spawn(move || { + Self::receiver_worker(receiver_socket, receiver_eventbus, receiver_message_eventbus, receiver_running); + })); // Thread keepalive - let running_keepalive = Arc::clone(&running); - let keepalive_sender = send_tx.clone(); - let keepalive_interval = self.keepalive_interval; - let keepalive_handle = thread::spawn(move || { - Self::keepalive_worker(keepalive_sender, keepalive_interval, running_keepalive); - }); + // let keepalive_tx = self.inner.outbound_tx.clone(); + let keepalive_eventbus = self.inner.event_bus.clone(); + let keepalive_interval = config.keepalive_interval; + let keepalive_running = Arc::clone(&self.inner.running); - // Stocker les handles et le sender - self.threads.push(sender_handle); - self.threads.push(receiver_handle); - self.threads.push(keepalive_handle); + threads.push(thread::spawn(move || { + Self::keepalive_worker(keepalive_eventbus, keepalive_interval, keepalive_running); + })); + + // Mettre à jour l'état + { + let mut state = self.inner.state.write(); + *state = SessionState::Connected; + } - println!(" Session réseau démarrée"); Ok(()) } - /// Arrête la session - pub fn stop(&mut self) { - if !self.running.load(Ordering::Relaxed) { - return; - } + fn stop_internal(&self) { + self.inner.running.store(false, Ordering::Relaxed); - // Arrêter les threads - self.running.store(false, Ordering::Relaxed); - - // Attendre la fin - for handle in self.threads.drain(..) { + // Attendre la fin des threads + let mut threads = self.inner.threads.write(); + for handle in threads.drain(..) { let _ = handle.join(); } - - println!(" Session réseau arrêtée"); } - /// Change de serveur (comme change_device) - pub fn change_server(&mut self, new_addr: A) -> Result<(), String> { - let was_running = self.running.load(Ordering::Relaxed); + // === Workers (identiques, déjà synchrones) === - // Arrêter la session - self.stop(); + // fn sender_worker( + // socket: UdpSocket, + // server_addr: SocketAddr, + // receiver: Receiver>, + // running: Arc, + // ) { + // println!("📤 Network sender worker démarré"); + // + // while running.load(Ordering::Relaxed) { + // match receiver.recv_timeout(Duration::from_millis(100)) { + // Ok(data) => { + // if let Err(e) = socket.send_to(&data, server_addr) { + // eprintln!("❌ Erreur envoi: {}", e); + // } + // } + // Err(_) => continue, // Timeout ou canal fermé + // } + // } + // + // println!("📤 Network sender worker arrêté"); + // } - // Changer de serveur sur le client UDP - self.udp_client.change_server(new_addr)?; + // fn receiver_worker( + // socket: UdpSocket, + // sender: Sender<(Vec, SocketAddr)>, + // running: Arc, + // ) { + // let mut buffer = [0u8; 1500]; + // println!("📥 Network receiver worker démarré"); + // + // while running.load(Ordering::Relaxed) { + // match socket.recv_from(&mut buffer) { + // Ok((size, addr)) => { + // let data = buffer[..size].to_vec(); + // if sender.try_send((data, addr)).is_err() { + // break; // Canal fermé + // } + // } + // Err(e) if e.kind() == std::io::ErrorKind::TimedOut => continue, + // Err(_) => break, + // } + // } + // + // println!("📥 Network receiver worker arrêté"); + // } + // + + + fn sender_worker(socket: UdpSocket, server_addr: SocketAddr, event_bus: UDPMessageBus, running: Arc) { + let receiver = event_bus.subscribe_outgoing(); + println!("📤 Network sender worker démarré"); - // Redémarrer si c'était actif - if was_running { - self.start_session()?; - } - - Ok(()) - } - - // === Workers (comme dans AudioCapture) === - - fn sender_worker( - socket: UdpSocket, - server_addr: SocketAddr, - receiver: Receiver>, - running: Arc, - ) { - println!(" Network sender worker démarré"); - - // SANS timeout - plus simple et efficace - while let Ok(data) = receiver.recv() { - if !running.load(Ordering::Relaxed) { - break; - } - - if let Err(e) = socket.send_to(&data, server_addr) { - eprintln!("❌ Erreur envoi: {}", e); + while running.load(Ordering::Relaxed) { + match receiver.recv_timeout(Duration::from_millis(100)) { + Ok(message_call) => { + let bytes = message_call.to_bytes(); + if let Err(e) = socket.send_to(&bytes, server_addr) { + eprintln!("❌ Erreur envoi: {}", e); + } + } + Err(_) => continue, // Timeout ou canal fermé } } - println!(" Network sender worker arrêté"); - + println!("📤 Network sender worker arrêté"); } - - fn receiver_worker( - socket: UdpSocket, - running: Arc, - ) { + + fn receiver_worker(socket: UdpSocket, event_bus: UDPMessageBus, message_event_bus: UDPMessageDispatcherBus, running: Arc){ let mut buffer = [0u8; 1500]; - println!(" Network receiver worker démarré"); + println!("📥 Network receiver worker démarré"); while running.load(Ordering::Relaxed) { match socket.recv_from(&mut buffer) { Ok((size, addr)) => { - let data = Arc::new(buffer[..size].to_vec()); + match MessageEvent::from_bytes(&buffer[..size]){ + Ok(message) => { + event_bus.broadcast_incoming(message.clone()); + message_event_bus.broadcast(UDPMessageType::from_message(&buffer).unwrap(), message); + }, + Err(e) => println!("❌ Parse error: {}", e), + } } Err(e) if e.kind() == std::io::ErrorKind::TimedOut => continue, - Err(_) => break, + Err(e) => { + println!("❌ Erreur réseau dans receiver: {}", e); + break; + } } } - - println!(" Network receiver worker arrêté"); + println!("📥 Network receiver worker arrêté"); } fn keepalive_worker( - sender: Sender>, - interval_secs: u64, + sender: UDPMessageBus, + interval: Duration, running: Arc, ) { - println!(" Keepalive worker démarré"); + println!("💗 Keepalive worker démarré"); while running.load(Ordering::Relaxed) { - thread::sleep(std::time::Duration::from_secs(interval_secs)); + thread::sleep(interval); if !running.load(Ordering::Relaxed) { break; } - // Envoyer keepalive (MessageType::Keepalive = 0) - let keepalive = 0u16.to_le_bytes().to_vec(); - if sender.try_send(keepalive).is_err() { - break; // Queue fermée - } + // Envoyer keepalive (ping) + let uuid = Uuid::new_v4(); + let message_id = *uuid.as_bytes(); + let ping_message = MessageCall::ping(message_id); + // let keepalive = vec![0u8]; + // if sender.try_send(keepalive).is_err() { + // break; + // } - println!(" Keepalive envoyé"); + println!("💗 Keepalive envoyé"); } - println!(" Keepalive worker arrêté"); + println!("💗 Keepalive worker arrêté"); + } + + // event_bus + pub fn subscribe_outgoing(&self) -> Receiver{ + self.inner.event_bus.subscribe_outgoing() + } + pub fn subscribe_incoming(&self) -> Receiver{ + self.inner.event_bus.subscribe_incoming() } } -impl Drop for NetworkSession { +impl Drop for UdpSession { fn drop(&mut self) { - self.stop(); + if Arc::strong_count(&self.inner) == 1 { + // Dernier clone, arrêter la session + self.inner.running.store(false, Ordering::Relaxed); + } } -} +} \ No newline at end of file diff --git a/src/services/audio.rs b/src/services/audio.rs new file mode 100644 index 0000000..b9321b4 --- /dev/null +++ b/src/services/audio.rs @@ -0,0 +1,58 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU16, Ordering}; +use std::thread; +use crossbeam::channel::{Receiver}; +use crate::net::message::MessageCall; +use crate::net::udp_client::UdpSession; +use crate::utils::event_bus::EncodedAudioData; + +pub struct AudioCaptureService { + audio_in_receiver: Receiver, + udp_session: UdpSession, + sequence_counter: Arc, + running: Arc +} + +impl AudioCaptureService { + pub fn new(audio_in_receiver: Receiver, udp_session: UdpSession) -> Self { + Self { + audio_in_receiver, + udp_session, + sequence_counter: Arc::new(AtomicU16::new(0)), + running: Arc::new(AtomicBool::new(false)) + } + } + + pub fn start(&self) { + println!("Audio service : 'start' called"); + self.audio_to_network(); + } + + pub fn audio_to_network(&self) { + let receiver = self.audio_in_receiver.clone(); + let running = self.running.clone(); + let sequence = self.sequence_counter.clone(); + let client = self.udp_session.clone(); + + self.running.store(true, Ordering::Relaxed); + + thread::spawn(move || { + println!("Audio service : 'audio_to_network' thread started"); + while running.load(Ordering::Relaxed) { + match receiver.recv_timeout(std::time::Duration::from_millis(100)) { + Ok(frame_encoded) => { + println!("Audio service : 'audio_to_network' thread received frame"); + let message = MessageCall::audio(sequence.load(Ordering::Relaxed), frame_encoded); + let _ = client.send(message); + } + Err(e) => { + println!("Error: {}", e); + } + } + sequence.fetch_add(1, Ordering::Relaxed); + } + }); + + println!("Audio service : 'audio_to_network' started"); + } +} \ No newline at end of file diff --git a/src/services/mod.rs b/src/services/mod.rs new file mode 100644 index 0000000..6fa5b67 --- /dev/null +++ b/src/services/mod.rs @@ -0,0 +1,2 @@ +pub mod audio; +pub mod network; \ No newline at end of file diff --git a/src/services/network.rs b/src/services/network.rs new file mode 100644 index 0000000..442424d --- /dev/null +++ b/src/services/network.rs @@ -0,0 +1,26 @@ +use crate::net::message::MessageCall; +use crate::net::udp_client::UdpSession; + +pub struct NetworkService { + session: UdpSession, +} + +impl NetworkService { + pub fn new(network_session: UdpSession) -> Self { + Self { + session: network_session, + } + } + + pub fn start(&self) { + self.udp_dispatcher(); + } + + pub fn send_message(&self, message: MessageCall) -> Result<(), String> { + self.session.send(message) + } + + fn udp_dispatcher(&self) { + + } +} \ No newline at end of file diff --git a/src/utils/event_bus.rs b/src/utils/event_bus.rs index 5bcd1f5..45915ef 100644 --- a/src/utils/event_bus.rs +++ b/src/utils/event_bus.rs @@ -4,11 +4,13 @@ //! Un EventBus performant utilisant parking_lot::RwLock pour la distribution //! de données vers plusieurs consumers de manière zero-copy. +use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; -use kanal::{unbounded, Receiver, Sender}; +use crossbeam::channel::{unbounded, Receiver, Sender}; use parking_lot::RwLock; - +use strum::IntoEnumIterator; +use crate::net::message::{MessageCall, MessageEvent, UDPMessageType}; // ═══════════════════════════════════════════════════════════════════════════════ // 🎯 Types aliases pour une meilleure lisibilité // ═══════════════════════════════════════════════════════════════════════════════ @@ -34,7 +36,7 @@ impl Subscriber { /// Vérifie si le subscriber est encore actif pub fn is_active(&self) -> bool { - self.active.load(Ordering::Relaxed) && !self.sender.is_closed() + self.active.load(Ordering::Relaxed) } /// Désactive le subscriber @@ -86,7 +88,15 @@ where for subscriber in subscribers.iter() { if subscriber.is_active() { // try_send pour éviter les blocages - let _ = subscriber.sender.try_send(data.clone()); + // let _ = subscriber.sender.try_send(data.clone()); + match subscriber.sender.try_send(data.clone()) { + Ok(_) => (), + Err(crossbeam::channel::TrySendError::Full(_)) => { + println!("⚠️ Subscriber queue pleine, message ignoré"); + }Err(crossbeam::channel::TrySendError::Disconnected(_)) => { + subscriber.deactivate(); + } + } } } } @@ -138,12 +148,138 @@ where } } +// ═══════════════════════════════════════════════════════════════════════════════ +// 🖧 Types spécialisés pour le network +// ═══════════════════════════════════════════════════════════════════════════════ + +/// EventBus spécialisé pour les paquets UDP +pub type UDPCallBus = EventBus; +pub type UDPEventBus = EventBus; + +pub struct UDPMessageBus { + pub outgoing: UDPCallBus, + pub incoming: UDPEventBus, +} + +impl UDPMessageBus { + /// Crée un nouveau UDPPaquetBus + pub fn new() -> Self { + Self { + outgoing: UDPCallBus::new(), + incoming: UDPEventBus::new(), + } + } + + /// Crée un nouveau UDPPaquetBus avec capacity pré-alloué + pub fn with_capacity(capacity: usize) -> Self { + Self { + outgoing: UDPCallBus::with_capacity(capacity), + incoming: UDPEventBus::with_capacity(capacity), + } + } + + /// S'abonne aux paquets sortants + pub fn subscribe_outgoing(&self) -> Receiver { + self.outgoing.subscribe() + } + + /// S'abonne aux paquets entrants + pub fn subscribe_incoming(&self) -> Receiver { + self.incoming.subscribe() + } + + /// Diffuse un paquet sortant + pub fn broadcast_outgoing(&self, data: MessageCall) { + self.outgoing.broadcast(data); + } + + /// Diffuse un paquet entrant + pub fn broadcast_incoming(&self, data: MessageEvent) { + self.incoming.broadcast(data); + } + + pub fn shutdown(&self) { + self.outgoing.shutdown(); + self.incoming.shutdown(); + } +} + +impl Default for UDPMessageBus { + fn default() -> Self { + Self::new() + } +} + +impl Clone for UDPMessageBus { + fn clone(&self) -> Self { + Self { + outgoing: self.outgoing.clone(), + incoming: self.incoming.clone(), + } + } +} + +// ═══════════════════════════════════════════════════════════════════════════════ +// 🖧 Types spécialisés pour le type de message udp +// ═══════════════════════════════════════════════════════════════════════════════ +pub struct UDPMessageDispatcherBus{ + message_tree: HashMap +} + +impl UDPMessageDispatcherBus { + pub fn new() -> Self { + let mut map = HashMap::::new(); + for message_type in UDPMessageType::iter() { + map.insert(message_type, UDPEventBus::new()); + } + + Self { + message_tree: map + } + } + + pub fn with_capacity(capacity: usize) -> Self { + let mut map = HashMap::::new(); + for message_type in UDPMessageType::iter() { + map.insert(message_type, UDPEventBus::with_capacity(capacity)); + } + + Self { + message_tree: map + } + } + + pub fn subscribe(&self, message_type: UDPMessageType) -> Receiver { + self.message_tree.get(&message_type).unwrap().subscribe() + } + + pub fn broadcast(&self, message_type: UDPMessageType, data: MessageEvent) { + if let Some(bus) = self.message_tree.get(&message_type) { + bus.broadcast(data); + } + } +} + +impl Default for UDPMessageDispatcherBus { + fn default() -> Self { + Self::new() + } +} + +impl Clone for UDPMessageDispatcherBus { + fn clone(&self) -> Self { + Self { + message_tree: self.message_tree.clone(), + } + } +} + // ═══════════════════════════════════════════════════════════════════════════════ // 🎵 Types spécialisés pour l'audio // ═══════════════════════════════════════════════════════════════════════════════ /// Type pour les données audio brutes (samples i16) -pub type RawAudioData = Arc>; +pub type RawAudioData = Vec; /// Type pour les données audio encodées (opus bytes) pub type EncodedAudioData = Arc>;