From cdea4a5b25cfa481a4efd30b3efb6bc977135f28 Mon Sep 17 00:00:00 2001 From: Nell Date: Sat, 21 Jun 2025 16:57:50 +0200 Subject: [PATCH] first commit --- src/app/context.rs | 17 ++- src/audio/capture.rs | 85 ++++++++--- src/audio/opus.rs | 110 ++++++++++++++ src/audio/playback.rs | 75 ++++++++++ src/net/message.rs | 37 +++++ src/net/udp_client.rs | 325 +++++++++++++++++++++++++++++++++++++++++ src/utils/event_bus.rs | 312 +++++++++++++++++++++++++++++++++++++++ src/utils/mod.rs | 3 +- 8 files changed, 935 insertions(+), 29 deletions(-) create mode 100644 src/utils/event_bus.rs diff --git a/src/app/context.rs b/src/app/context.rs index cc41e8c..d6402ba 100644 --- a/src/app/context.rs +++ b/src/app/context.rs @@ -1,24 +1,33 @@ // Logique passive de l'application avec les resources globales use std::sync::Arc; +use crate::audio::capture::{AudioCapture, Microphone}; pub struct Context { + // Audio related host: Arc, + microphone: Microphone, + audio_capture: AudioCapture + // todo : mettre tous les contextes de l'app nécessaire au bon fonctionnement de celle ci, udpclient, catpure audio, lecture audio .... // L'idéal étant que tout soit chargé "statiquement" dans le contexte de l'application pour être "relié" dans le runtime } impl Context { - pub fn new() -> Self { + pub fn new() -> Result { let host = Arc::new(cpal::default_host()); + let microphone = Microphone::default(&host)?; + let audio_capture = AudioCapture::new(microphone.clone()); // Arc::new(UdpClient::new()) // Arc::new(AudioCapture::new()) // etc ... - Self { - host - } + Ok(Self { + host, + microphone, + audio_capture + }) } pub fn get_host(&self) -> Arc { diff --git a/src/audio/capture.rs b/src/audio/capture.rs index 3f9b0ec..e6cc9c7 100644 --- a/src/audio/capture.rs +++ b/src/audio/capture.rs @@ -3,10 +3,18 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use std::thread::JoinHandle; use cpal::{BufferSize, Device, Host, Stream, StreamConfig, SupportedStreamConfig, SampleRate, InputCallbackInfo}; -use cpal::traits::{DeviceTrait, HostTrait}; -use crate::utils::ringbuf; -use crate::utils::ringbuf::{ringbuf, RingBuffer}; +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +use crate::audio::opus::{AudioOpus, AudioOpusEncoder}; +use crate::utils::event_bus::{AudioInputBus}; +use crate::utils::ringbuf::{RingBufReader, RingBuffer}; +pub enum AudioCaptureError { + DeviceNotFound, + StreamError, + EncoderError, +} + +#[derive(Clone)] pub struct Microphone { device: Device, } @@ -55,7 +63,7 @@ impl Microphone { } } -struct AudioCapture { +pub struct AudioCapture { microphone: Microphone, // catpure related @@ -63,16 +71,20 @@ struct AudioCapture { stream: Option, worker: Option>, running: Arc, + + // exposed ringbuf + event_bus: AudioInputBus, } impl AudioCapture { pub fn new(microphone: Microphone) -> Self { Self { microphone, - ringbuf: RingBuffer::new(48000), + ringbuf: RingBuffer::::new(48000), stream: None, worker: None, running: Arc::new(AtomicBool::new(false)), + event_bus: AudioInputBus::new(), } } @@ -101,47 +113,72 @@ impl AudioCapture { writer.push_slice_overwrite(data); })?; - + + // Audio processing worker let worker_running = self.running.clone(); + let worker_event_bus = self.event_bus.clone(); + let input_config = self.microphone.get_input_config()?; + let opus = AudioOpus::new(input_config.sample_rate().0, input_config.channels(), "voip"); + let encoder = opus.create_encoder()?; let worker = thread::spawn(move || { - let mut frame = [0i16; 960]; // Taille de Frame souhaité - while worker_running.load(Ordering::Relaxed){ - let read = reader.pop_slice_blocking(&mut frame); - // ✅ Check simple après réveil - if !worker_running.load(Ordering::Relaxed) { - println!("🛑 Arrêt demandé"); - break; - } - } - println!("Fermeture du thread de capture.") + Self::audio_processing(reader, worker_running, encoder, worker_event_bus); }); - + stream.play().map_err(|e| format!("Erreur de lancement du stream: {e}"))?; self.stream = Some(stream); self.worker = Some(worker); Ok(()) } + + fn audio_processing(mut reader: RingBufReader, worker_running: Arc, mut encoder: AudioOpusEncoder, event_bus: AudioInputBus) { + let mut frame = [0i16; 960]; + + while worker_running.load(Ordering::Relaxed){ + let _ = reader.pop_slice_blocking(&mut frame); + + // ✅ Check simple après réveil + if !worker_running.load(Ordering::Relaxed) { + println!("🛑 Arrêt demandé"); + break; + } + + let raw_data = Arc::new(frame.to_vec()); + event_bus.broadcast_raw(raw_data.clone()); + + match encoder.encode(&frame) { + Ok(encoded_data) => { + event_bus.encoded.broadcast(Arc::new(encoded_data)); + } + Err(e) => { + println!("Erreur encoding: {e}"); + } + } + } + + println!("Fermeture de l'audio processing.") + } pub fn stop_capture(&mut self){ println!("🛑 Arrêt en cours..."); - // ✅ 1. Signal d'arrêt + // 1️⃣ Signal d'arrêt global self.running.store(false, Ordering::Relaxed); - // ✅ 2. RÉVEIL FORCÉ du worker ! + // 2️⃣ ARRÊT IMMÉDIAT du callback micro + self.stream = None; // ✅ Plus de nouvelles données ! + + // 3️⃣ RÉVEIL du worker qui attend self.ringbuf.force_wake_up(); - // ✅ 3. Attente de terminaison (maintenant ça marche !) + // 4️⃣ Attente propre du worker if let Some(handle) = self.worker.take() { handle.join().unwrap(); } - self.stream = None; - - // todo ajouter une méthode pour vider le ringbuf + // 5️⃣ Nettoyage self.ringbuf.clear(); - println!("🎤 Capture audio arrêtée."); + println!(" Capture audio arrêtée."); } } diff --git a/src/audio/opus.rs b/src/audio/opus.rs index e69de29..8212815 100644 --- a/src/audio/opus.rs +++ b/src/audio/opus.rs @@ -0,0 +1,110 @@ +use opus::{Application, Channels, Decoder, Encoder}; + +#[derive(Clone)] +pub struct AudioOpus{ + sample_rate: u32, + channels: u16, + application: Application +} + +impl AudioOpus { + pub fn new(sample_rate: u32, channels: u16, application: &str) -> Self { + let application = match application { + "voip" => Application::Voip, + "audio" => Application::Audio, + "lowdelay" => Application::LowDelay, + _ => Application::Voip, + }; + Self{sample_rate, channels, application} + } + + pub fn create_encoder(&self) -> Result { + AudioOpusEncoder::new(self.clone()) + } + + pub fn create_decoder(&self) -> Result { + AudioOpusDecoder::new(self.clone()) + } + +} + +pub struct AudioOpusEncoder{ + audio_opus: AudioOpus, + encoder: opus::Encoder, +} + +impl AudioOpusEncoder { + fn new(audio_opus: AudioOpus) -> Result { + let opus_channel = match audio_opus.channels { + 1 => Channels::Mono, + 2 => Channels::Stereo, + _ => Channels::Mono, + }; + let mut encoder = Encoder::new(audio_opus.sample_rate, opus_channel, audio_opus.application) + .map_err(|e| format!("Échec de création de l'encodeur: {:?}", e))?; + + match audio_opus.application { + Application::Voip => { + // Paramètres optimaux pour VoIP: bonne qualité vocale, CPU modéré + let _ = encoder.set_bitrate(opus::Bitrate::Bits(24000)); // 24kbps est bon pour la voix + let _ = encoder.set_vbr(true); // Variable bitrate économise du CPU + let _ = encoder.set_vbr_constraint(false); // Sans contrainte stricte de débit + // Pas de set_complexity (non supporté par la crate) + }, + Application::Audio => { + // Musique: priorité à la qualité + let _ = encoder.set_bitrate(opus::Bitrate::Bits(64000)); + let _ = encoder.set_vbr(true); + }, + Application::LowDelay => { + // Priorité à la latence et l'efficacité CPU + let _ = encoder.set_bitrate(opus::Bitrate::Bits(18000)); + let _ = encoder.set_vbr(true); + }, + } + Ok(Self{audio_opus, encoder}) + } + + pub fn encode(&mut self, frames: &[i16]) -> Result, String> { + let mut output = vec![0u8; 1276]; // 1276 octets (la vraie worst-case recommandée par Opus). + let len = self.encoder.encode(frames, output.as_mut_slice()) + .map_err(|e| format!("Erreur encodage: {:?}", e))?; + output.truncate(len); + Ok(output) + } + + // 🔄 Approche avec buffer réutilisable (encore plus optimal) + fn encode_reuse(&mut self, frames: &[i16], output: &mut Vec) -> Result { + output.clear(); + output.resize(1276, 0); + let len = self.encoder.encode(frames, output.as_mut_slice()).unwrap(); + output.truncate(len); + Ok(len) + } +} + +pub struct AudioOpusDecoder{ + audio_opus: AudioOpus, + decoder: opus::Decoder, +} + +impl AudioOpusDecoder { + fn new(audio_opus: AudioOpus) -> Result { + let opus_channel = match audio_opus.channels { + 1 => Channels::Mono, + 2 => Channels::Stereo, + _ => Channels::Mono, + }; + + let decoder = Decoder::new(audio_opus.sample_rate, opus_channel) + .map_err(|e| format!("Échec de création du décodeur: {:?}", e))?;; + Ok(Self{audio_opus, decoder}) + } + + pub fn decode(&mut self, frames: &[u8]) -> Result, String> { + let mut output = vec![0i16; 5760]; + let len = self.decoder.decode(frames, output.as_mut_slice(), false).map_err(|e| format!("Erreur décodage: {:?}", e))?; + output.truncate(len); + Ok(output) + } +} \ No newline at end of file diff --git a/src/audio/playback.rs b/src/audio/playback.rs index e69de29..4b44e7d 100644 --- a/src/audio/playback.rs +++ b/src/audio/playback.rs @@ -0,0 +1,75 @@ +use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::thread::JoinHandle; +use cpal::{Device, Host, OutputCallbackInfo, Stream, StreamConfig, SupportedStreamConfig}; +use cpal::traits::{DeviceTrait, HostTrait}; +use crate::utils::event_bus::{AudioOutputBus, EventBus}; +use crate::utils::ringbuf::RingBuffer; + +pub struct Speaker { + device: Device, +} + +impl Speaker { + pub fn new(device: Device) -> Self { + Self{ + device + } + } + + pub fn default(host: &Host) -> Result { + let device = host.default_output_device().expect("No output device available"); + Ok(Self::new(device)) + } + + pub fn get_output_config(&self) -> Result { + self.device.default_output_config().map_err(|e| format!("Erreur config : {e}")) + } + + pub fn get_stream_config(&self) -> Result { + let config = self.get_output_config()?; + let mut stream_config: StreamConfig = config.into(); + // todo : voir si on doit/peu changer des paramètres + Ok(stream_config) + } + + pub fn build_stream(&self, callback: F) -> Result + where + F: FnMut(&mut [i16], &OutputCallbackInfo) + Send + 'static, + { + let config = self.get_stream_config()?; + + let stream = self.device.build_output_stream( + &config, + callback, + |err| println!("Erreur stream: {err}"), + None + ).map_err(|e| format!("Erreur création stream : {e}"))?; + + Ok(stream) + } +} + +struct AudioPlayback { + speaker: Speaker, + + ringbuf: RingBuffer, + stream: Option, + worker: Option>, + running: Arc, + + event_bus: AudioOutputBus, +} + +impl AudioPlayback { + pub fn new(speaker: Speaker) -> Self { + Self{ + speaker, + ringbuf: RingBuffer::new(1024), + stream: None, + worker: None, + running: Arc::new(AtomicBool::new(false)), + event_bus: AudioOutputBus::new(), + } + } +} \ No newline at end of file diff --git a/src/net/message.rs b/src/net/message.rs index e69de29..ada45b4 100644 --- a/src/net/message.rs +++ b/src/net/message.rs @@ -0,0 +1,37 @@ +use num_enum::{IntoPrimitive, TryFromPrimitive}; + + +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, IntoPrimitive, TryFromPrimitive)] +pub enum UDPMessageType { + Ping = 0, + Audio = 1, +} + +impl UDPMessageType { + pub fn peek_message_type(data: &[u8]) -> Option { + if data.is_empty() { + return None; + } + Self::try_from(data[0]).ok() + } +} + +/// Message envoyé au serveur depuis le client +#[derive(Debug, Clone, PartialEq)] +pub enum MessageCall { + // 0[u8]uuid4[u8;16/uuid4] + Ping {message_id: [u8; 16]}, // (0) + // 1[u8]4000[u16]data[u8; unlimited] + Audio {sequence: u16, data: Vec}, +} + +/// Message reçu depuis le serveur au client +#[derive(Debug, Clone, PartialEq)] +pub enum MessageEvent { + // 0[u8]uuid4[u8;16/uuid4] + Ping {message_id: [u8; 16]}, + // 1[u8]4000[u32]data[u8; unlimited] + Audio {user: [u8; 16],sequence: u32, data: Vec}, +} + diff --git a/src/net/udp_client.rs b/src/net/udp_client.rs index e69de29..220bb80 100644 --- a/src/net/udp_client.rs +++ b/src/net/udp_client.rs @@ -0,0 +1,325 @@ +use std::net::{UdpSocket, SocketAddr, ToSocketAddrs}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread::{self, JoinHandle}; +use std::time::Duration; +use kanal::{bounded, Receiver, Sender}; + +/// UdpClient = équivalent de "Microphone" (bas niveau, gère juste la socket) +pub struct UdpClient { + // Configuration de base + server_addr: Option, + socket: Option, + + // État simple + active: bool, +} + +impl UdpClient { + /// Crée un nouveau client UDP (non connecté) + pub fn new() -> Self { + Self { + server_addr: None, + socket: None, + active: false, + } + } + + /// Connecte au serveur + pub fn connect(&mut self, addr: A) -> Result<(), String> { + if self.active { + return Err("Client déjà actif".to_string()); + } + + // Résoudre l'adresse + let server_addr = addr + .to_socket_addrs() + .map_err(|e| format!("Résolution échouée: {}", e))? + .next() + .ok_or_else(|| "Aucune adresse trouvée".to_string())?; + + // Créer la socket + let socket = UdpSocket::bind("0.0.0.0:0") + .map_err(|e| format!("Erreur socket: {}", e))?; + + socket.set_read_timeout(Some(Duration::from_millis(100))) + .map_err(|e| format!("Erreur timeout: {}", e))?; + + self.server_addr = Some(server_addr); + self.socket = Some(socket); + + println!(" UdpClient connecté à {}", server_addr); + Ok(()) + } + + /// Démarre avec un callback (équivalent de microphone.start(callback)) + // pub fn start(&mut self, data_callback: F) -> Result<(), String> + // where + // F: FnMut(&[u8], SocketAddr) + Send + 'static + // { + // if self.active { + // return Err("Client déjà actif".to_string()); + // } + // + // if self.socket.is_none() { + // return Err("Client non connecté".to_string()); + // } + // + // // Ici on utiliserait le callback pour le receiver + // // (implémentation simplifiée pour l'exemple) + // + // self.active = true; + // println!(" UdpClient démarré avec callback"); + // Ok(()) + // } + + /// Arrête le client + pub fn stop(&mut self) { + self.active = false; + println!(" UdpClient arrêté"); + } + + /// Déconnecte (ferme socket) + pub fn disconnect(&mut self) { + self.stop(); + self.socket = None; + self.server_addr = None; + println!(" UdpClient déconnecté"); + } + + /// Change de serveur + pub fn change_server(&mut self, new_addr: A) -> Result<(), String> { + let was_active = self.active; + + // Arrêter et déconnecter + self.disconnect(); + + // Reconnecter au nouveau serveur + self.connect(new_addr)?; + + // Redémarrer si c'était actif (nécessiterait de stocker le callback) + // Note: comme pour AudioCapture, c'est une limitation de cette approche + + Ok(()) + } + + /// Envoie des données (méthode simple) + pub fn send_to(&self, data: &[u8]) -> Result<(), String> { + if !self.active { + return Err("Client non actif".to_string()); + } + + let socket = self.socket.as_ref().ok_or("Socket fermée")?; + let addr = self.server_addr.ok_or("Adresse non définie")?; + + socket.send_to(data, addr) + .map_err(|e| format!("Erreur envoi: {}", e))?; + + Ok(()) + } + + /// Getters + pub fn is_active(&self) -> bool { self.active } + pub fn server_address(&self) -> Option { self.server_addr } + pub fn local_address(&self) -> Option { + self.socket.as_ref()?.local_addr().ok() + } +} + + +pub struct NetworkSession { + udp_client: UdpClient, + + // Configuration + keepalive_interval: u64, // secondes + + // EventBus public (comme AudioCapture) + // pub event_bus: NetworkEventBus, + + // État des threads + running: Arc, + threads: Vec>, +} + +impl NetworkSession { + /// Crée une nouvelle session + pub fn new(udp_client: UdpClient) -> Result { + Ok(Self { + udp_client, + keepalive_interval: 10, // 10 secondes par défaut + // event_bus: NetworkEventBus::new(), + running: Arc::new(AtomicBool::new(false)), + threads: Vec::new(), + }) + } + + /// Démarre la session + pub fn start_session(&mut self) -> Result<(), String> { + if self.running.load(Ordering::Relaxed) { + return Err("Session déjà démarrée".to_string()); + } + + // Cloner ce qui est nécessaire pour les threads + let server_addr = self.udp_client.server_address() + .ok_or("Client non connecté")?; + + // Créer les channels + let (send_tx, send_rx) = bounded(2000); + // let (recv_tx, recv_rx) = bounded(2000); + + // Cloner la socket + let socket_send = self.udp_client.socket.as_ref() + .ok_or("Socket non disponible")? + .try_clone() + .map_err(|e| format!("Erreur clone socket: {}", e))?; + + let socket_recv = self.udp_client.socket.as_ref() + .unwrap() + .try_clone() + .map_err(|e| format!("Erreur clone socket: {}", e))?; + + // Démarrer les threads (comme dans AudioCapture) + let running = Arc::clone(&self.running); + running.store(true, Ordering::Relaxed); + + // Thread sender + let running_send = Arc::clone(&running); + let sender_handle = thread::spawn(move || { + Self::sender_worker(socket_send, server_addr, send_rx, running_send); + }); + + // Thread receiver + let running_recv = Arc::clone(&running); + let receiver_handle = thread::spawn(move || { + Self::receiver_worker(socket_recv, running_recv); + }); + + // Thread keepalive + let running_keepalive = Arc::clone(&running); + let keepalive_sender = send_tx.clone(); + let keepalive_interval = self.keepalive_interval; + let keepalive_handle = thread::spawn(move || { + Self::keepalive_worker(keepalive_sender, keepalive_interval, running_keepalive); + }); + + // Stocker les handles et le sender + self.threads.push(sender_handle); + self.threads.push(receiver_handle); + self.threads.push(keepalive_handle); + + println!(" Session réseau démarrée"); + Ok(()) + } + + /// Arrête la session + pub fn stop(&mut self) { + if !self.running.load(Ordering::Relaxed) { + return; + } + + // Arrêter les threads + self.running.store(false, Ordering::Relaxed); + + // Attendre la fin + for handle in self.threads.drain(..) { + let _ = handle.join(); + } + + println!(" Session réseau arrêtée"); + } + + /// Change de serveur (comme change_device) + pub fn change_server(&mut self, new_addr: A) -> Result<(), String> { + let was_running = self.running.load(Ordering::Relaxed); + + // Arrêter la session + self.stop(); + + // Changer de serveur sur le client UDP + self.udp_client.change_server(new_addr)?; + + // Redémarrer si c'était actif + if was_running { + self.start_session()?; + } + + Ok(()) + } + + // === Workers (comme dans AudioCapture) === + + fn sender_worker( + socket: UdpSocket, + server_addr: SocketAddr, + receiver: Receiver>, + running: Arc, + ) { + println!(" Network sender worker démarré"); + + // SANS timeout - plus simple et efficace + while let Ok(data) = receiver.recv() { + if !running.load(Ordering::Relaxed) { + break; + } + + if let Err(e) = socket.send_to(&data, server_addr) { + eprintln!("❌ Erreur envoi: {}", e); + } + } + + println!(" Network sender worker arrêté"); + + } + + fn receiver_worker( + socket: UdpSocket, + running: Arc, + ) { + let mut buffer = [0u8; 1500]; + println!(" Network receiver worker démarré"); + + while running.load(Ordering::Relaxed) { + match socket.recv_from(&mut buffer) { + Ok((size, addr)) => { + let data = Arc::new(buffer[..size].to_vec()); + } + Err(e) if e.kind() == std::io::ErrorKind::TimedOut => continue, + Err(_) => break, + } + } + + println!(" Network receiver worker arrêté"); + } + + fn keepalive_worker( + sender: Sender>, + interval_secs: u64, + running: Arc, + ) { + println!(" Keepalive worker démarré"); + + while running.load(Ordering::Relaxed) { + thread::sleep(std::time::Duration::from_secs(interval_secs)); + + if !running.load(Ordering::Relaxed) { + break; + } + + // Envoyer keepalive (MessageType::Keepalive = 0) + let keepalive = 0u16.to_le_bytes().to_vec(); + if sender.try_send(keepalive).is_err() { + break; // Queue fermée + } + + println!(" Keepalive envoyé"); + } + + println!(" Keepalive worker arrêté"); + } +} + +impl Drop for NetworkSession { + fn drop(&mut self) { + self.stop(); + } +} diff --git a/src/utils/event_bus.rs b/src/utils/event_bus.rs new file mode 100644 index 0000000..5bcd1f5 --- /dev/null +++ b/src/utils/event_bus.rs @@ -0,0 +1,312 @@ + +//! # EventBus - Système de publication/souscription générique +//! +//! Un EventBus performant utilisant parking_lot::RwLock pour la distribution +//! de données vers plusieurs consumers de manière zero-copy. + +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use kanal::{unbounded, Receiver, Sender}; +use parking_lot::RwLock; + +// ═══════════════════════════════════════════════════════════════════════════════ +// 🎯 Types aliases pour une meilleure lisibilité +// ═══════════════════════════════════════════════════════════════════════════════ + +/// Un subscriber générique pour un type de données T +#[derive(Clone)] +pub struct Subscriber { + pub sender: Sender, + pub receiver: Receiver, + pub active: Arc, +} + +impl Subscriber { + /// Crée un nouveau subscriber + pub fn new() -> Self { + let (sender, receiver) = unbounded(); + Self { + sender, + receiver: receiver.clone(), + active: Arc::new(AtomicBool::new(true)), + } + } + + /// Vérifie si le subscriber est encore actif + pub fn is_active(&self) -> bool { + self.active.load(Ordering::Relaxed) && !self.sender.is_closed() + } + + /// Désactive le subscriber + pub fn deactivate(&self) { + self.active.store(false, Ordering::Relaxed); + } +} + +// ═══════════════════════════════════════════════════════════════════════════════ +// 🏗️ EventBus générique +// ═══════════════════════════════════════════════════════════════════════════════ + +/// EventBus générique pour distribuer des données de type T vers plusieurs consumers +pub struct EventBus { + subscribers: Arc>>>, +} + +impl EventBus +where + T: Clone + Send + 'static +{ + /// Crée un nouveau EventBus vide + pub fn new() -> Self { + Self { + subscribers: Arc::new(RwLock::new(Vec::new())), + } + } + + /// Crée un EventBus avec une capacité pré-allouée + pub fn with_capacity(capacity: usize) -> Self { + Self { + subscribers: Arc::new(RwLock::new(Vec::with_capacity(capacity))), + } + } + + /// S'abonne au bus et retourne un Receiver pour recevoir les données + pub fn subscribe(&self) -> Receiver { + let subscriber = Subscriber::new(); + let receiver = subscriber.receiver.clone(); + + self.subscribers.write().push(subscriber); + receiver + } + + /// Diffuse des données à tous les subscribers actifs + pub fn broadcast(&self, data: T) { + let subscribers = self.subscribers.read(); + + for subscriber in subscribers.iter() { + if subscriber.is_active() { + // try_send pour éviter les blocages + let _ = subscriber.sender.try_send(data.clone()); + } + } + } + + // TODO: À implémenter si besoin futur d'unsubscribe dynamique + /// Nettoie les subscribers fermés ou inactifs + // pub fn cleanup_dead_subscribers(&self) { + // let mut subscribers = self.subscribers.write(); + // subscribers.retain(|subscriber| subscriber.is_active()); + // } + + /// Retourne le nombre de subscribers actifs + pub fn active_subscriber_count(&self) -> usize { + let subscribers = self.subscribers.read(); + subscribers.iter().filter(|s| s.is_active()).count() + } + + /// Retourne le nombre total de subscribers (actifs + inactifs) + pub fn total_subscriber_count(&self) -> usize { + self.subscribers.read().len() + } + + /// Désactive tous les subscribers + pub fn shutdown(&self) { + let subscribers = self.subscribers.read(); + for subscriber in subscribers.iter() { + subscriber.deactivate(); + } + } +} + +impl Default for EventBus +where + T: Clone + Send + 'static +{ + fn default() -> Self { + Self::new() + } +} + +impl Clone for EventBus +where + T: Clone + Send + 'static +{ + fn clone(&self) -> Self { + Self { + subscribers: Arc::clone(&self.subscribers), + } + } +} + +// ═══════════════════════════════════════════════════════════════════════════════ +// 🎵 Types spécialisés pour l'audio +// ═══════════════════════════════════════════════════════════════════════════════ + +/// Type pour les données audio brutes (samples i16) +pub type RawAudioData = Arc>; + +/// Type pour les données audio encodées (opus bytes) +pub type EncodedAudioData = Arc>; + +/// EventBus spécialisé pour les données audio brutes +pub type RawAudioEventBus = EventBus; + +/// EventBus spécialisé pour les données audio encodées +pub type EncodedAudioEventBus = EventBus; + +// ═══════════════════════════════════════════════════════════════════════════════ +// 🎤 AudioInputBus - Pipeline d'entrée (Micro → Réseau) +// ═══════════════════════════════════════════════════════════════════════════════ + +/// Bus pour la chaîne d'entrée audio (micro → encodage → réseau) +pub struct AudioInputBus { + /// Données brutes du microphone + pub raw: RawAudioEventBus, + + /// Données encodées (prêtes pour l'envoi réseau) + pub encoded: EncodedAudioEventBus, +} + +impl AudioInputBus { + /// Crée un nouveau AudioInputBus + pub fn new() -> Self { + Self { + raw: RawAudioEventBus::new(), + encoded: EncodedAudioEventBus::new(), + } + } + + /// Crée un AudioInputBus avec capacité pré-allouée + pub fn with_capacity(capacity: usize) -> Self { + Self { + raw: RawAudioEventBus::with_capacity(capacity), + encoded: EncodedAudioEventBus::with_capacity(capacity), + } + } + + /// S'abonne aux données raw (pour l'encoder) + pub fn subscribe_raw(&self) -> Receiver { + self.raw.subscribe() + } + + /// S'abonne aux données encodées (pour le réseau) + pub fn subscribe_encoded(&self) -> Receiver { + self.encoded.subscribe() + } + + /// Diffuse des données raw (depuis AudioCapture) + pub fn broadcast_raw(&self, data: RawAudioData) { + self.raw.broadcast(data); + } + + /// Diffuse des données encodées (depuis AudioOpusEncoder) + pub fn broadcast_encoded(&self, data: EncodedAudioData) { + self.encoded.broadcast(data); + } + + // TODO: À implémenter si besoin futur d'unsubscribe dynamique + /// Nettoie tous les subscribers morts + // pub fn cleanup(&self) { + // self.raw.cleanup_dead_subscribers(); + // self.encoded.cleanup_dead_subscribers(); + // } + + /// Arrête tous les subscribers + pub fn shutdown(&self) { + self.raw.shutdown(); + self.encoded.shutdown(); + } +} + +impl Default for AudioInputBus { + fn default() -> Self { + Self::new() + } +} + +impl Clone for AudioInputBus { + fn clone(&self) -> Self { + Self { + raw: self.raw.clone(), + encoded: self.encoded.clone(), + } + } +} + +// ═══════════════════════════════════════════════════════════════════════════════ +// 🔊 AudioOutputBus - Pipeline de sortie (Réseau → Speakers) +// ═══════════════════════════════════════════════════════════════════════════════ + +/// Bus pour la chaîne de sortie audio (réseau → décodage → speakers) +pub struct AudioOutputBus { + /// Données encodées reçues du réseau + pub encoded: EncodedAudioEventBus, + + /// Données décodées (prêtes pour les speakers) + pub raw: RawAudioEventBus, +} + +impl AudioOutputBus { + /// Crée un nouveau AudioOutputBus + pub fn new() -> Self { + Self { + encoded: EncodedAudioEventBus::new(), + raw: RawAudioEventBus::new(), + } + } + + /// Crée un AudioOutputBus avec capacité pré-allouée + pub fn with_capacity(capacity: usize) -> Self { + Self { + encoded: EncodedAudioEventBus::with_capacity(capacity), + raw: RawAudioEventBus::with_capacity(capacity), + } + } + + /// S'abonne aux données encodées (pour le décodeur) + pub fn subscribe_encoded(&self) -> Receiver { + self.encoded.subscribe() + } + + /// S'abonne aux données raw (pour les speakers) + pub fn subscribe_raw(&self) -> Receiver { + self.raw.subscribe() + } + + /// Diffuse des données encodées (depuis UdpClient) + pub fn broadcast_encoded(&self, data: EncodedAudioData) { + self.encoded.broadcast(data); + } + + /// Diffuse des données raw (depuis AudioOpusDecoder) + pub fn broadcast_raw(&self, data: RawAudioData) { + self.raw.broadcast(data); + } + + /// Nettoie tous les subscribers morts + // pub fn cleanup(&self) { + // self.encoded.cleanup_dead_subscribers(); + // self.raw.cleanup_dead_subscribers(); + // } + + /// Arrête tous les subscribers + pub fn shutdown(&self) { + self.encoded.shutdown(); + self.raw.shutdown(); + } +} + +impl Default for AudioOutputBus { + fn default() -> Self { + Self::new() + } +} + +impl Clone for AudioOutputBus { + fn clone(&self) -> Self { + Self { + encoded: self.encoded.clone(), + raw: self.raw.clone(), + } + } +} \ No newline at end of file diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 24e6965..2aad96c 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,2 +1,3 @@ pub mod ringbuf; -pub mod real_time_event; \ No newline at end of file +pub mod real_time_event; +pub mod event_bus; \ No newline at end of file