first commit
This commit is contained in:
@@ -1,24 +1,33 @@
|
||||
// Logique passive de l'application avec les resources globales
|
||||
|
||||
use std::sync::Arc;
|
||||
use crate::audio::capture::{AudioCapture, Microphone};
|
||||
|
||||
pub struct Context {
|
||||
// Audio related
|
||||
host: Arc<cpal::Host>,
|
||||
microphone: Microphone,
|
||||
audio_capture: AudioCapture
|
||||
|
||||
// todo : mettre tous les contextes de l'app nécessaire au bon fonctionnement de celle ci, udpclient, catpure audio, lecture audio ....
|
||||
// L'idéal étant que tout soit chargé "statiquement" dans le contexte de l'application pour être "relié" dans le runtime
|
||||
}
|
||||
|
||||
impl Context {
|
||||
pub fn new() -> Self {
|
||||
pub fn new() -> Result<Self, String> {
|
||||
let host = Arc::new(cpal::default_host());
|
||||
let microphone = Microphone::default(&host)?;
|
||||
let audio_capture = AudioCapture::new(microphone.clone());
|
||||
|
||||
// Arc::new(UdpClient::new())
|
||||
// Arc::new(AudioCapture::new())
|
||||
// etc ...
|
||||
|
||||
Self {
|
||||
host
|
||||
}
|
||||
Ok(Self {
|
||||
host,
|
||||
microphone,
|
||||
audio_capture
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_host(&self) -> Arc<cpal::Host> {
|
||||
|
||||
@@ -3,10 +3,18 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::thread;
|
||||
use std::thread::JoinHandle;
|
||||
use cpal::{BufferSize, Device, Host, Stream, StreamConfig, SupportedStreamConfig, SampleRate, InputCallbackInfo};
|
||||
use cpal::traits::{DeviceTrait, HostTrait};
|
||||
use crate::utils::ringbuf;
|
||||
use crate::utils::ringbuf::{ringbuf, RingBuffer};
|
||||
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
|
||||
use crate::audio::opus::{AudioOpus, AudioOpusEncoder};
|
||||
use crate::utils::event_bus::{AudioInputBus};
|
||||
use crate::utils::ringbuf::{RingBufReader, RingBuffer};
|
||||
|
||||
pub enum AudioCaptureError {
|
||||
DeviceNotFound,
|
||||
StreamError,
|
||||
EncoderError,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Microphone {
|
||||
device: Device,
|
||||
}
|
||||
@@ -55,7 +63,7 @@ impl Microphone {
|
||||
}
|
||||
}
|
||||
|
||||
struct AudioCapture {
|
||||
pub struct AudioCapture {
|
||||
microphone: Microphone,
|
||||
|
||||
// catpure related
|
||||
@@ -63,16 +71,20 @@ struct AudioCapture {
|
||||
stream: Option<Stream>,
|
||||
worker: Option<JoinHandle<()>>,
|
||||
running: Arc<AtomicBool>,
|
||||
|
||||
// exposed ringbuf
|
||||
event_bus: AudioInputBus,
|
||||
}
|
||||
|
||||
impl AudioCapture {
|
||||
pub fn new(microphone: Microphone) -> Self {
|
||||
Self {
|
||||
microphone,
|
||||
ringbuf: RingBuffer::new(48000),
|
||||
ringbuf: RingBuffer::<i16>::new(48000),
|
||||
stream: None,
|
||||
worker: None,
|
||||
running: Arc::new(AtomicBool::new(false)),
|
||||
event_bus: AudioInputBus::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,47 +113,72 @@ impl AudioCapture {
|
||||
|
||||
writer.push_slice_overwrite(data);
|
||||
})?;
|
||||
|
||||
|
||||
// Audio processing worker
|
||||
let worker_running = self.running.clone();
|
||||
let worker_event_bus = self.event_bus.clone();
|
||||
let input_config = self.microphone.get_input_config()?;
|
||||
let opus = AudioOpus::new(input_config.sample_rate().0, input_config.channels(), "voip");
|
||||
let encoder = opus.create_encoder()?;
|
||||
let worker = thread::spawn(move || {
|
||||
let mut frame = [0i16; 960]; // Taille de Frame souhaité
|
||||
while worker_running.load(Ordering::Relaxed){
|
||||
let read = reader.pop_slice_blocking(&mut frame);
|
||||
// ✅ Check simple après réveil
|
||||
if !worker_running.load(Ordering::Relaxed) {
|
||||
println!("🛑 Arrêt demandé");
|
||||
break;
|
||||
}
|
||||
}
|
||||
println!("Fermeture du thread de capture.")
|
||||
Self::audio_processing(reader, worker_running, encoder, worker_event_bus);
|
||||
});
|
||||
|
||||
|
||||
stream.play().map_err(|e| format!("Erreur de lancement du stream: {e}"))?;
|
||||
self.stream = Some(stream);
|
||||
self.worker = Some(worker);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn audio_processing(mut reader: RingBufReader<i16>, worker_running: Arc<AtomicBool>, mut encoder: AudioOpusEncoder, event_bus: AudioInputBus) {
|
||||
let mut frame = [0i16; 960];
|
||||
|
||||
while worker_running.load(Ordering::Relaxed){
|
||||
let _ = reader.pop_slice_blocking(&mut frame);
|
||||
|
||||
// ✅ Check simple après réveil
|
||||
if !worker_running.load(Ordering::Relaxed) {
|
||||
println!("🛑 Arrêt demandé");
|
||||
break;
|
||||
}
|
||||
|
||||
let raw_data = Arc::new(frame.to_vec());
|
||||
event_bus.broadcast_raw(raw_data.clone());
|
||||
|
||||
match encoder.encode(&frame) {
|
||||
Ok(encoded_data) => {
|
||||
event_bus.encoded.broadcast(Arc::new(encoded_data));
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Erreur encoding: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("Fermeture de l'audio processing.")
|
||||
}
|
||||
|
||||
pub fn stop_capture(&mut self){
|
||||
println!("🛑 Arrêt en cours...");
|
||||
|
||||
// ✅ 1. Signal d'arrêt
|
||||
// 1️⃣ Signal d'arrêt global
|
||||
self.running.store(false, Ordering::Relaxed);
|
||||
|
||||
// ✅ 2. RÉVEIL FORCÉ du worker !
|
||||
// 2️⃣ ARRÊT IMMÉDIAT du callback micro
|
||||
self.stream = None; // ✅ Plus de nouvelles données !
|
||||
|
||||
// 3️⃣ RÉVEIL du worker qui attend
|
||||
self.ringbuf.force_wake_up();
|
||||
|
||||
// ✅ 3. Attente de terminaison (maintenant ça marche !)
|
||||
// 4️⃣ Attente propre du worker
|
||||
if let Some(handle) = self.worker.take() {
|
||||
handle.join().unwrap();
|
||||
}
|
||||
|
||||
self.stream = None;
|
||||
|
||||
// todo ajouter une méthode pour vider le ringbuf
|
||||
// 5️⃣ Nettoyage
|
||||
self.ringbuf.clear();
|
||||
|
||||
println!("🎤 Capture audio arrêtée.");
|
||||
println!(" Capture audio arrêtée.");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,110 @@
|
||||
use opus::{Application, Channels, Decoder, Encoder};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AudioOpus{
|
||||
sample_rate: u32,
|
||||
channels: u16,
|
||||
application: Application
|
||||
}
|
||||
|
||||
impl AudioOpus {
|
||||
pub fn new(sample_rate: u32, channels: u16, application: &str) -> Self {
|
||||
let application = match application {
|
||||
"voip" => Application::Voip,
|
||||
"audio" => Application::Audio,
|
||||
"lowdelay" => Application::LowDelay,
|
||||
_ => Application::Voip,
|
||||
};
|
||||
Self{sample_rate, channels, application}
|
||||
}
|
||||
|
||||
pub fn create_encoder(&self) -> Result<AudioOpusEncoder, String> {
|
||||
AudioOpusEncoder::new(self.clone())
|
||||
}
|
||||
|
||||
pub fn create_decoder(&self) -> Result<AudioOpusDecoder, String> {
|
||||
AudioOpusDecoder::new(self.clone())
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
pub struct AudioOpusEncoder{
|
||||
audio_opus: AudioOpus,
|
||||
encoder: opus::Encoder,
|
||||
}
|
||||
|
||||
impl AudioOpusEncoder {
|
||||
fn new(audio_opus: AudioOpus) -> Result<Self, String> {
|
||||
let opus_channel = match audio_opus.channels {
|
||||
1 => Channels::Mono,
|
||||
2 => Channels::Stereo,
|
||||
_ => Channels::Mono,
|
||||
};
|
||||
let mut encoder = Encoder::new(audio_opus.sample_rate, opus_channel, audio_opus.application)
|
||||
.map_err(|e| format!("Échec de création de l'encodeur: {:?}", e))?;
|
||||
|
||||
match audio_opus.application {
|
||||
Application::Voip => {
|
||||
// Paramètres optimaux pour VoIP: bonne qualité vocale, CPU modéré
|
||||
let _ = encoder.set_bitrate(opus::Bitrate::Bits(24000)); // 24kbps est bon pour la voix
|
||||
let _ = encoder.set_vbr(true); // Variable bitrate économise du CPU
|
||||
let _ = encoder.set_vbr_constraint(false); // Sans contrainte stricte de débit
|
||||
// Pas de set_complexity (non supporté par la crate)
|
||||
},
|
||||
Application::Audio => {
|
||||
// Musique: priorité à la qualité
|
||||
let _ = encoder.set_bitrate(opus::Bitrate::Bits(64000));
|
||||
let _ = encoder.set_vbr(true);
|
||||
},
|
||||
Application::LowDelay => {
|
||||
// Priorité à la latence et l'efficacité CPU
|
||||
let _ = encoder.set_bitrate(opus::Bitrate::Bits(18000));
|
||||
let _ = encoder.set_vbr(true);
|
||||
},
|
||||
}
|
||||
Ok(Self{audio_opus, encoder})
|
||||
}
|
||||
|
||||
pub fn encode(&mut self, frames: &[i16]) -> Result<Vec<u8>, String> {
|
||||
let mut output = vec![0u8; 1276]; // 1276 octets (la vraie worst-case recommandée par Opus).
|
||||
let len = self.encoder.encode(frames, output.as_mut_slice())
|
||||
.map_err(|e| format!("Erreur encodage: {:?}", e))?;
|
||||
output.truncate(len);
|
||||
Ok(output)
|
||||
}
|
||||
|
||||
// 🔄 Approche avec buffer réutilisable (encore plus optimal)
|
||||
fn encode_reuse(&mut self, frames: &[i16], output: &mut Vec<u8>) -> Result<usize, String> {
|
||||
output.clear();
|
||||
output.resize(1276, 0);
|
||||
let len = self.encoder.encode(frames, output.as_mut_slice()).unwrap();
|
||||
output.truncate(len);
|
||||
Ok(len)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AudioOpusDecoder{
|
||||
audio_opus: AudioOpus,
|
||||
decoder: opus::Decoder,
|
||||
}
|
||||
|
||||
impl AudioOpusDecoder {
|
||||
fn new(audio_opus: AudioOpus) -> Result<Self, String> {
|
||||
let opus_channel = match audio_opus.channels {
|
||||
1 => Channels::Mono,
|
||||
2 => Channels::Stereo,
|
||||
_ => Channels::Mono,
|
||||
};
|
||||
|
||||
let decoder = Decoder::new(audio_opus.sample_rate, opus_channel)
|
||||
.map_err(|e| format!("Échec de création du décodeur: {:?}", e))?;;
|
||||
Ok(Self{audio_opus, decoder})
|
||||
}
|
||||
|
||||
pub fn decode(&mut self, frames: &[u8]) -> Result<Vec<i16>, String> {
|
||||
let mut output = vec![0i16; 5760];
|
||||
let len = self.decoder.decode(frames, output.as_mut_slice(), false).map_err(|e| format!("Erreur décodage: {:?}", e))?;
|
||||
output.truncate(len);
|
||||
Ok(output)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,75 @@
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::thread::JoinHandle;
|
||||
use cpal::{Device, Host, OutputCallbackInfo, Stream, StreamConfig, SupportedStreamConfig};
|
||||
use cpal::traits::{DeviceTrait, HostTrait};
|
||||
use crate::utils::event_bus::{AudioOutputBus, EventBus};
|
||||
use crate::utils::ringbuf::RingBuffer;
|
||||
|
||||
pub struct Speaker {
|
||||
device: Device,
|
||||
}
|
||||
|
||||
impl Speaker {
|
||||
pub fn new(device: Device) -> Self {
|
||||
Self{
|
||||
device
|
||||
}
|
||||
}
|
||||
|
||||
pub fn default(host: &Host) -> Result<Self, String> {
|
||||
let device = host.default_output_device().expect("No output device available");
|
||||
Ok(Self::new(device))
|
||||
}
|
||||
|
||||
pub fn get_output_config(&self) -> Result<SupportedStreamConfig, String> {
|
||||
self.device.default_output_config().map_err(|e| format!("Erreur config : {e}"))
|
||||
}
|
||||
|
||||
pub fn get_stream_config(&self) -> Result<StreamConfig, String> {
|
||||
let config = self.get_output_config()?;
|
||||
let mut stream_config: StreamConfig = config.into();
|
||||
// todo : voir si on doit/peu changer des paramètres
|
||||
Ok(stream_config)
|
||||
}
|
||||
|
||||
pub fn build_stream<F>(&self, callback: F) -> Result<Stream, String>
|
||||
where
|
||||
F: FnMut(&mut [i16], &OutputCallbackInfo) + Send + 'static,
|
||||
{
|
||||
let config = self.get_stream_config()?;
|
||||
|
||||
let stream = self.device.build_output_stream(
|
||||
&config,
|
||||
callback,
|
||||
|err| println!("Erreur stream: {err}"),
|
||||
None
|
||||
).map_err(|e| format!("Erreur création stream : {e}"))?;
|
||||
|
||||
Ok(stream)
|
||||
}
|
||||
}
|
||||
|
||||
struct AudioPlayback {
|
||||
speaker: Speaker,
|
||||
|
||||
ringbuf: RingBuffer<i16>,
|
||||
stream: Option<Stream>,
|
||||
worker: Option<JoinHandle<()>>,
|
||||
running: Arc<AtomicBool>,
|
||||
|
||||
event_bus: AudioOutputBus,
|
||||
}
|
||||
|
||||
impl AudioPlayback {
|
||||
pub fn new(speaker: Speaker) -> Self {
|
||||
Self{
|
||||
speaker,
|
||||
ringbuf: RingBuffer::new(1024),
|
||||
stream: None,
|
||||
worker: None,
|
||||
running: Arc::new(AtomicBool::new(false)),
|
||||
event_bus: AudioOutputBus::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
use num_enum::{IntoPrimitive, TryFromPrimitive};
|
||||
|
||||
|
||||
#[repr(u8)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, IntoPrimitive, TryFromPrimitive)]
|
||||
pub enum UDPMessageType {
|
||||
Ping = 0,
|
||||
Audio = 1,
|
||||
}
|
||||
|
||||
impl UDPMessageType {
|
||||
pub fn peek_message_type(data: &[u8]) -> Option<Self> {
|
||||
if data.is_empty() {
|
||||
return None;
|
||||
}
|
||||
Self::try_from(data[0]).ok()
|
||||
}
|
||||
}
|
||||
|
||||
/// Message envoyé au serveur depuis le client
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum MessageCall {
|
||||
// 0[u8]uuid4[u8;16/uuid4]
|
||||
Ping {message_id: [u8; 16]}, // (0)
|
||||
// 1[u8]4000[u16]data[u8; unlimited]
|
||||
Audio {sequence: u16, data: Vec<u8>},
|
||||
}
|
||||
|
||||
/// Message reçu depuis le serveur au client
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum MessageEvent {
|
||||
// 0[u8]uuid4[u8;16/uuid4]
|
||||
Ping {message_id: [u8; 16]},
|
||||
// 1[u8]4000[u32]data[u8; unlimited]
|
||||
Audio {user: [u8; 16],sequence: u32, data: Vec<u8>},
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,325 @@
|
||||
use std::net::{UdpSocket, SocketAddr, ToSocketAddrs};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::thread::{self, JoinHandle};
|
||||
use std::time::Duration;
|
||||
use kanal::{bounded, Receiver, Sender};
|
||||
|
||||
/// UdpClient = équivalent de "Microphone" (bas niveau, gère juste la socket)
|
||||
pub struct UdpClient {
|
||||
// Configuration de base
|
||||
server_addr: Option<SocketAddr>,
|
||||
socket: Option<UdpSocket>,
|
||||
|
||||
// État simple
|
||||
active: bool,
|
||||
}
|
||||
|
||||
impl UdpClient {
|
||||
/// Crée un nouveau client UDP (non connecté)
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
server_addr: None,
|
||||
socket: None,
|
||||
active: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Connecte au serveur
|
||||
pub fn connect<A: ToSocketAddrs>(&mut self, addr: A) -> Result<(), String> {
|
||||
if self.active {
|
||||
return Err("Client déjà actif".to_string());
|
||||
}
|
||||
|
||||
// Résoudre l'adresse
|
||||
let server_addr = addr
|
||||
.to_socket_addrs()
|
||||
.map_err(|e| format!("Résolution échouée: {}", e))?
|
||||
.next()
|
||||
.ok_or_else(|| "Aucune adresse trouvée".to_string())?;
|
||||
|
||||
// Créer la socket
|
||||
let socket = UdpSocket::bind("0.0.0.0:0")
|
||||
.map_err(|e| format!("Erreur socket: {}", e))?;
|
||||
|
||||
socket.set_read_timeout(Some(Duration::from_millis(100)))
|
||||
.map_err(|e| format!("Erreur timeout: {}", e))?;
|
||||
|
||||
self.server_addr = Some(server_addr);
|
||||
self.socket = Some(socket);
|
||||
|
||||
println!(" UdpClient connecté à {}", server_addr);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Démarre avec un callback (équivalent de microphone.start(callback))
|
||||
// pub fn start<F>(&mut self, data_callback: F) -> Result<(), String>
|
||||
// where
|
||||
// F: FnMut(&[u8], SocketAddr) + Send + 'static
|
||||
// {
|
||||
// if self.active {
|
||||
// return Err("Client déjà actif".to_string());
|
||||
// }
|
||||
//
|
||||
// if self.socket.is_none() {
|
||||
// return Err("Client non connecté".to_string());
|
||||
// }
|
||||
//
|
||||
// // Ici on utiliserait le callback pour le receiver
|
||||
// // (implémentation simplifiée pour l'exemple)
|
||||
//
|
||||
// self.active = true;
|
||||
// println!(" UdpClient démarré avec callback");
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
/// Arrête le client
|
||||
pub fn stop(&mut self) {
|
||||
self.active = false;
|
||||
println!(" UdpClient arrêté");
|
||||
}
|
||||
|
||||
/// Déconnecte (ferme socket)
|
||||
pub fn disconnect(&mut self) {
|
||||
self.stop();
|
||||
self.socket = None;
|
||||
self.server_addr = None;
|
||||
println!(" UdpClient déconnecté");
|
||||
}
|
||||
|
||||
/// Change de serveur
|
||||
pub fn change_server<A: ToSocketAddrs>(&mut self, new_addr: A) -> Result<(), String> {
|
||||
let was_active = self.active;
|
||||
|
||||
// Arrêter et déconnecter
|
||||
self.disconnect();
|
||||
|
||||
// Reconnecter au nouveau serveur
|
||||
self.connect(new_addr)?;
|
||||
|
||||
// Redémarrer si c'était actif (nécessiterait de stocker le callback)
|
||||
// Note: comme pour AudioCapture, c'est une limitation de cette approche
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Envoie des données (méthode simple)
|
||||
pub fn send_to(&self, data: &[u8]) -> Result<(), String> {
|
||||
if !self.active {
|
||||
return Err("Client non actif".to_string());
|
||||
}
|
||||
|
||||
let socket = self.socket.as_ref().ok_or("Socket fermée")?;
|
||||
let addr = self.server_addr.ok_or("Adresse non définie")?;
|
||||
|
||||
socket.send_to(data, addr)
|
||||
.map_err(|e| format!("Erreur envoi: {}", e))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Getters
|
||||
pub fn is_active(&self) -> bool { self.active }
|
||||
pub fn server_address(&self) -> Option<SocketAddr> { self.server_addr }
|
||||
pub fn local_address(&self) -> Option<SocketAddr> {
|
||||
self.socket.as_ref()?.local_addr().ok()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub struct NetworkSession {
|
||||
udp_client: UdpClient,
|
||||
|
||||
// Configuration
|
||||
keepalive_interval: u64, // secondes
|
||||
|
||||
// EventBus public (comme AudioCapture)
|
||||
// pub event_bus: NetworkEventBus,
|
||||
|
||||
// État des threads
|
||||
running: Arc<AtomicBool>,
|
||||
threads: Vec<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl NetworkSession {
|
||||
/// Crée une nouvelle session
|
||||
pub fn new(udp_client: UdpClient) -> Result<Self, String> {
|
||||
Ok(Self {
|
||||
udp_client,
|
||||
keepalive_interval: 10, // 10 secondes par défaut
|
||||
// event_bus: NetworkEventBus::new(),
|
||||
running: Arc::new(AtomicBool::new(false)),
|
||||
threads: Vec::new(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Démarre la session
|
||||
pub fn start_session(&mut self) -> Result<(), String> {
|
||||
if self.running.load(Ordering::Relaxed) {
|
||||
return Err("Session déjà démarrée".to_string());
|
||||
}
|
||||
|
||||
// Cloner ce qui est nécessaire pour les threads
|
||||
let server_addr = self.udp_client.server_address()
|
||||
.ok_or("Client non connecté")?;
|
||||
|
||||
// Créer les channels
|
||||
let (send_tx, send_rx) = bounded(2000);
|
||||
// let (recv_tx, recv_rx) = bounded(2000);
|
||||
|
||||
// Cloner la socket
|
||||
let socket_send = self.udp_client.socket.as_ref()
|
||||
.ok_or("Socket non disponible")?
|
||||
.try_clone()
|
||||
.map_err(|e| format!("Erreur clone socket: {}", e))?;
|
||||
|
||||
let socket_recv = self.udp_client.socket.as_ref()
|
||||
.unwrap()
|
||||
.try_clone()
|
||||
.map_err(|e| format!("Erreur clone socket: {}", e))?;
|
||||
|
||||
// Démarrer les threads (comme dans AudioCapture)
|
||||
let running = Arc::clone(&self.running);
|
||||
running.store(true, Ordering::Relaxed);
|
||||
|
||||
// Thread sender
|
||||
let running_send = Arc::clone(&running);
|
||||
let sender_handle = thread::spawn(move || {
|
||||
Self::sender_worker(socket_send, server_addr, send_rx, running_send);
|
||||
});
|
||||
|
||||
// Thread receiver
|
||||
let running_recv = Arc::clone(&running);
|
||||
let receiver_handle = thread::spawn(move || {
|
||||
Self::receiver_worker(socket_recv, running_recv);
|
||||
});
|
||||
|
||||
// Thread keepalive
|
||||
let running_keepalive = Arc::clone(&running);
|
||||
let keepalive_sender = send_tx.clone();
|
||||
let keepalive_interval = self.keepalive_interval;
|
||||
let keepalive_handle = thread::spawn(move || {
|
||||
Self::keepalive_worker(keepalive_sender, keepalive_interval, running_keepalive);
|
||||
});
|
||||
|
||||
// Stocker les handles et le sender
|
||||
self.threads.push(sender_handle);
|
||||
self.threads.push(receiver_handle);
|
||||
self.threads.push(keepalive_handle);
|
||||
|
||||
println!(" Session réseau démarrée");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Arrête la session
|
||||
pub fn stop(&mut self) {
|
||||
if !self.running.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Arrêter les threads
|
||||
self.running.store(false, Ordering::Relaxed);
|
||||
|
||||
// Attendre la fin
|
||||
for handle in self.threads.drain(..) {
|
||||
let _ = handle.join();
|
||||
}
|
||||
|
||||
println!(" Session réseau arrêtée");
|
||||
}
|
||||
|
||||
/// Change de serveur (comme change_device)
|
||||
pub fn change_server<A: std::net::ToSocketAddrs>(&mut self, new_addr: A) -> Result<(), String> {
|
||||
let was_running = self.running.load(Ordering::Relaxed);
|
||||
|
||||
// Arrêter la session
|
||||
self.stop();
|
||||
|
||||
// Changer de serveur sur le client UDP
|
||||
self.udp_client.change_server(new_addr)?;
|
||||
|
||||
// Redémarrer si c'était actif
|
||||
if was_running {
|
||||
self.start_session()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// === Workers (comme dans AudioCapture) ===
|
||||
|
||||
fn sender_worker(
|
||||
socket: UdpSocket,
|
||||
server_addr: SocketAddr,
|
||||
receiver: Receiver<Vec<u8>>,
|
||||
running: Arc<AtomicBool>,
|
||||
) {
|
||||
println!(" Network sender worker démarré");
|
||||
|
||||
// SANS timeout - plus simple et efficace
|
||||
while let Ok(data) = receiver.recv() {
|
||||
if !running.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Err(e) = socket.send_to(&data, server_addr) {
|
||||
eprintln!("❌ Erreur envoi: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
println!(" Network sender worker arrêté");
|
||||
|
||||
}
|
||||
|
||||
fn receiver_worker(
|
||||
socket: UdpSocket,
|
||||
running: Arc<AtomicBool>,
|
||||
) {
|
||||
let mut buffer = [0u8; 1500];
|
||||
println!(" Network receiver worker démarré");
|
||||
|
||||
while running.load(Ordering::Relaxed) {
|
||||
match socket.recv_from(&mut buffer) {
|
||||
Ok((size, addr)) => {
|
||||
let data = Arc::new(buffer[..size].to_vec());
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::TimedOut => continue,
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
println!(" Network receiver worker arrêté");
|
||||
}
|
||||
|
||||
fn keepalive_worker(
|
||||
sender: Sender<Vec<u8>>,
|
||||
interval_secs: u64,
|
||||
running: Arc<AtomicBool>,
|
||||
) {
|
||||
println!(" Keepalive worker démarré");
|
||||
|
||||
while running.load(Ordering::Relaxed) {
|
||||
thread::sleep(std::time::Duration::from_secs(interval_secs));
|
||||
|
||||
if !running.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Envoyer keepalive (MessageType::Keepalive = 0)
|
||||
let keepalive = 0u16.to_le_bytes().to_vec();
|
||||
if sender.try_send(keepalive).is_err() {
|
||||
break; // Queue fermée
|
||||
}
|
||||
|
||||
println!(" Keepalive envoyé");
|
||||
}
|
||||
|
||||
println!(" Keepalive worker arrêté");
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for NetworkSession {
|
||||
fn drop(&mut self) {
|
||||
self.stop();
|
||||
}
|
||||
}
|
||||
|
||||
312
src/utils/event_bus.rs
Normal file
312
src/utils/event_bus.rs
Normal file
@@ -0,0 +1,312 @@
|
||||
|
||||
//! # EventBus - Système de publication/souscription générique
|
||||
//!
|
||||
//! Un EventBus performant utilisant parking_lot::RwLock pour la distribution
|
||||
//! de données vers plusieurs consumers de manière zero-copy.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use kanal::{unbounded, Receiver, Sender};
|
||||
use parking_lot::RwLock;
|
||||
|
||||
// ═══════════════════════════════════════════════════════════════════════════════
|
||||
// 🎯 Types aliases pour une meilleure lisibilité
|
||||
// ═══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
/// Un subscriber générique pour un type de données T
|
||||
#[derive(Clone)]
|
||||
pub struct Subscriber<T> {
|
||||
pub sender: Sender<T>,
|
||||
pub receiver: Receiver<T>,
|
||||
pub active: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl<T> Subscriber<T> {
|
||||
/// Crée un nouveau subscriber
|
||||
pub fn new() -> Self {
|
||||
let (sender, receiver) = unbounded();
|
||||
Self {
|
||||
sender,
|
||||
receiver: receiver.clone(),
|
||||
active: Arc::new(AtomicBool::new(true)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Vérifie si le subscriber est encore actif
|
||||
pub fn is_active(&self) -> bool {
|
||||
self.active.load(Ordering::Relaxed) && !self.sender.is_closed()
|
||||
}
|
||||
|
||||
/// Désactive le subscriber
|
||||
pub fn deactivate(&self) {
|
||||
self.active.store(false, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
// ═══════════════════════════════════════════════════════════════════════════════
|
||||
// 🏗️ EventBus générique
|
||||
// ═══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
/// EventBus générique pour distribuer des données de type T vers plusieurs consumers
|
||||
pub struct EventBus<T> {
|
||||
subscribers: Arc<RwLock<Vec<Subscriber<T>>>>,
|
||||
}
|
||||
|
||||
impl<T> EventBus<T>
|
||||
where
|
||||
T: Clone + Send + 'static
|
||||
{
|
||||
/// Crée un nouveau EventBus vide
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
subscribers: Arc::new(RwLock::new(Vec::new())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Crée un EventBus avec une capacité pré-allouée
|
||||
pub fn with_capacity(capacity: usize) -> Self {
|
||||
Self {
|
||||
subscribers: Arc::new(RwLock::new(Vec::with_capacity(capacity))),
|
||||
}
|
||||
}
|
||||
|
||||
/// S'abonne au bus et retourne un Receiver pour recevoir les données
|
||||
pub fn subscribe(&self) -> Receiver<T> {
|
||||
let subscriber = Subscriber::new();
|
||||
let receiver = subscriber.receiver.clone();
|
||||
|
||||
self.subscribers.write().push(subscriber);
|
||||
receiver
|
||||
}
|
||||
|
||||
/// Diffuse des données à tous les subscribers actifs
|
||||
pub fn broadcast(&self, data: T) {
|
||||
let subscribers = self.subscribers.read();
|
||||
|
||||
for subscriber in subscribers.iter() {
|
||||
if subscriber.is_active() {
|
||||
// try_send pour éviter les blocages
|
||||
let _ = subscriber.sender.try_send(data.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: À implémenter si besoin futur d'unsubscribe dynamique
|
||||
/// Nettoie les subscribers fermés ou inactifs
|
||||
// pub fn cleanup_dead_subscribers(&self) {
|
||||
// let mut subscribers = self.subscribers.write();
|
||||
// subscribers.retain(|subscriber| subscriber.is_active());
|
||||
// }
|
||||
|
||||
/// Retourne le nombre de subscribers actifs
|
||||
pub fn active_subscriber_count(&self) -> usize {
|
||||
let subscribers = self.subscribers.read();
|
||||
subscribers.iter().filter(|s| s.is_active()).count()
|
||||
}
|
||||
|
||||
/// Retourne le nombre total de subscribers (actifs + inactifs)
|
||||
pub fn total_subscriber_count(&self) -> usize {
|
||||
self.subscribers.read().len()
|
||||
}
|
||||
|
||||
/// Désactive tous les subscribers
|
||||
pub fn shutdown(&self) {
|
||||
let subscribers = self.subscribers.read();
|
||||
for subscriber in subscribers.iter() {
|
||||
subscriber.deactivate();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Default for EventBus<T>
|
||||
where
|
||||
T: Clone + Send + 'static
|
||||
{
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for EventBus<T>
|
||||
where
|
||||
T: Clone + Send + 'static
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
subscribers: Arc::clone(&self.subscribers),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ═══════════════════════════════════════════════════════════════════════════════
|
||||
// 🎵 Types spécialisés pour l'audio
|
||||
// ═══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
/// Type pour les données audio brutes (samples i16)
|
||||
pub type RawAudioData = Arc<Vec<i16>>;
|
||||
|
||||
/// Type pour les données audio encodées (opus bytes)
|
||||
pub type EncodedAudioData = Arc<Vec<u8>>;
|
||||
|
||||
/// EventBus spécialisé pour les données audio brutes
|
||||
pub type RawAudioEventBus = EventBus<RawAudioData>;
|
||||
|
||||
/// EventBus spécialisé pour les données audio encodées
|
||||
pub type EncodedAudioEventBus = EventBus<EncodedAudioData>;
|
||||
|
||||
// ═══════════════════════════════════════════════════════════════════════════════
|
||||
// 🎤 AudioInputBus - Pipeline d'entrée (Micro → Réseau)
|
||||
// ═══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
/// Bus pour la chaîne d'entrée audio (micro → encodage → réseau)
|
||||
pub struct AudioInputBus {
|
||||
/// Données brutes du microphone
|
||||
pub raw: RawAudioEventBus,
|
||||
|
||||
/// Données encodées (prêtes pour l'envoi réseau)
|
||||
pub encoded: EncodedAudioEventBus,
|
||||
}
|
||||
|
||||
impl AudioInputBus {
|
||||
/// Crée un nouveau AudioInputBus
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
raw: RawAudioEventBus::new(),
|
||||
encoded: EncodedAudioEventBus::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Crée un AudioInputBus avec capacité pré-allouée
|
||||
pub fn with_capacity(capacity: usize) -> Self {
|
||||
Self {
|
||||
raw: RawAudioEventBus::with_capacity(capacity),
|
||||
encoded: EncodedAudioEventBus::with_capacity(capacity),
|
||||
}
|
||||
}
|
||||
|
||||
/// S'abonne aux données raw (pour l'encoder)
|
||||
pub fn subscribe_raw(&self) -> Receiver<RawAudioData> {
|
||||
self.raw.subscribe()
|
||||
}
|
||||
|
||||
/// S'abonne aux données encodées (pour le réseau)
|
||||
pub fn subscribe_encoded(&self) -> Receiver<EncodedAudioData> {
|
||||
self.encoded.subscribe()
|
||||
}
|
||||
|
||||
/// Diffuse des données raw (depuis AudioCapture)
|
||||
pub fn broadcast_raw(&self, data: RawAudioData) {
|
||||
self.raw.broadcast(data);
|
||||
}
|
||||
|
||||
/// Diffuse des données encodées (depuis AudioOpusEncoder)
|
||||
pub fn broadcast_encoded(&self, data: EncodedAudioData) {
|
||||
self.encoded.broadcast(data);
|
||||
}
|
||||
|
||||
// TODO: À implémenter si besoin futur d'unsubscribe dynamique
|
||||
/// Nettoie tous les subscribers morts
|
||||
// pub fn cleanup(&self) {
|
||||
// self.raw.cleanup_dead_subscribers();
|
||||
// self.encoded.cleanup_dead_subscribers();
|
||||
// }
|
||||
|
||||
/// Arrête tous les subscribers
|
||||
pub fn shutdown(&self) {
|
||||
self.raw.shutdown();
|
||||
self.encoded.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for AudioInputBus {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for AudioInputBus {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
raw: self.raw.clone(),
|
||||
encoded: self.encoded.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ═══════════════════════════════════════════════════════════════════════════════
|
||||
// 🔊 AudioOutputBus - Pipeline de sortie (Réseau → Speakers)
|
||||
// ═══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
/// Bus pour la chaîne de sortie audio (réseau → décodage → speakers)
|
||||
pub struct AudioOutputBus {
|
||||
/// Données encodées reçues du réseau
|
||||
pub encoded: EncodedAudioEventBus,
|
||||
|
||||
/// Données décodées (prêtes pour les speakers)
|
||||
pub raw: RawAudioEventBus,
|
||||
}
|
||||
|
||||
impl AudioOutputBus {
|
||||
/// Crée un nouveau AudioOutputBus
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
encoded: EncodedAudioEventBus::new(),
|
||||
raw: RawAudioEventBus::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Crée un AudioOutputBus avec capacité pré-allouée
|
||||
pub fn with_capacity(capacity: usize) -> Self {
|
||||
Self {
|
||||
encoded: EncodedAudioEventBus::with_capacity(capacity),
|
||||
raw: RawAudioEventBus::with_capacity(capacity),
|
||||
}
|
||||
}
|
||||
|
||||
/// S'abonne aux données encodées (pour le décodeur)
|
||||
pub fn subscribe_encoded(&self) -> Receiver<EncodedAudioData> {
|
||||
self.encoded.subscribe()
|
||||
}
|
||||
|
||||
/// S'abonne aux données raw (pour les speakers)
|
||||
pub fn subscribe_raw(&self) -> Receiver<RawAudioData> {
|
||||
self.raw.subscribe()
|
||||
}
|
||||
|
||||
/// Diffuse des données encodées (depuis UdpClient)
|
||||
pub fn broadcast_encoded(&self, data: EncodedAudioData) {
|
||||
self.encoded.broadcast(data);
|
||||
}
|
||||
|
||||
/// Diffuse des données raw (depuis AudioOpusDecoder)
|
||||
pub fn broadcast_raw(&self, data: RawAudioData) {
|
||||
self.raw.broadcast(data);
|
||||
}
|
||||
|
||||
/// Nettoie tous les subscribers morts
|
||||
// pub fn cleanup(&self) {
|
||||
// self.encoded.cleanup_dead_subscribers();
|
||||
// self.raw.cleanup_dead_subscribers();
|
||||
// }
|
||||
|
||||
/// Arrête tous les subscribers
|
||||
pub fn shutdown(&self) {
|
||||
self.encoded.shutdown();
|
||||
self.raw.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for AudioOutputBus {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for AudioOutputBus {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
encoded: self.encoded.clone(),
|
||||
raw: self.raw.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,2 +1,3 @@
|
||||
pub mod ringbuf;
|
||||
pub mod real_time_event;
|
||||
pub mod real_time_event;
|
||||
pub mod event_bus;
|
||||
Reference in New Issue
Block a user