From d7a7af9eb41f3ac10e31624ed41861863106e6fd Mon Sep 17 00:00:00 2001 From: Nell Date: Sun, 20 Jul 2025 04:17:44 +0200 Subject: [PATCH] init --- src-tauri/src/app/ox_speak_app.rs | 17 +- src-tauri/src/core/mixer.rs | 227 +++++--------------------- src-tauri/src/core/mixer_push.rs | 229 +++++++++++++++++++++++++++ src-tauri/src/core/playback.rs | 49 ++++-- src-tauri/src/domain/audio_client.rs | 28 +++- src-tauri/src/domain/event.rs | 4 +- src-tauri/src/runtime/dispatcher.rs | 16 +- 7 files changed, 359 insertions(+), 211 deletions(-) create mode 100644 src-tauri/src/core/mixer_push.rs diff --git a/src-tauri/src/app/ox_speak_app.rs b/src-tauri/src/app/ox_speak_app.rs index 5d2f4f6..d00a78b 100644 --- a/src-tauri/src/app/ox_speak_app.rs +++ b/src-tauri/src/app/ox_speak_app.rs @@ -4,6 +4,8 @@ use tauri::{AppHandle, Emitter, Listener}; use tokio; use tokio::sync::mpsc; use crate::core::capture::AudioCapture; +use crate::core::mixer::AudioMixer; +use crate::core::playback::AudioPlayback; use crate::domain::audio_client::AudioClientManager; use crate::domain::event::{Event, EventBus}; use crate::network::udp::UdpSession; @@ -20,6 +22,8 @@ pub struct OxSpeakApp { // audio audio_capture: AudioCapture, + audio_playback: AudioPlayback, + audio_mixer: AudioMixer, // audio_client audio_client_manager: AudioClientManager, @@ -40,18 +44,21 @@ impl OxSpeakApp { // todo : pour le moment, paramètre par défaut, on verra plus tard pour dynamiser ça println!("Initializing audio capture"); let audio_capture = AudioCapture::default(event_bus.clone()); + println!("Initializing audio client"); + let audio_client_manager = AudioClientManager::new(); + let audio_mixer = AudioMixer::new(audio_client_manager.clone()); + let audio_playback = AudioPlayback::default(event_bus.clone(), audio_mixer.clone()); // UdpSession println!("Initializing UDP session"); let udp_session = UdpSession::new(event_bus.clone()); // UdpClient - println!("Initializing UDP client"); - let audio_client_manager = AudioClientManager::new(); + // Dispatcher - Communication inter-components println!("Initializing event dispatcher"); - let mut dispatcher = Dispatcher::new(event_bus.clone(), udp_session.clone(), audio_client_manager.clone() ,tauri_handle.clone()); + let mut dispatcher = Dispatcher::new(event_bus.clone(), udp_session.clone(), audio_client_manager.clone(), audio_mixer.clone(),tauri_handle.clone()); println!("OxSpeakApp initialization complete"); Self { @@ -61,6 +68,8 @@ impl OxSpeakApp { udp_session, audio_capture, audio_client_manager, + audio_playback, + audio_mixer, tauri_handle, } } @@ -85,6 +94,8 @@ impl OxSpeakApp { // Démarrer l'audio-capture println!("Starting audio capture"); self.audio_capture.start().await; + self.audio_playback.start().await; + println!("OxSpeakApp started successfully"); let _ = self.tick_tasks().await; diff --git a/src-tauri/src/core/mixer.rs b/src-tauri/src/core/mixer.rs index 55477e0..ca4db7b 100644 --- a/src-tauri/src/core/mixer.rs +++ b/src-tauri/src/core/mixer.rs @@ -1,98 +1,50 @@ -use std::sync::{atomic, Arc}; -use std::sync::atomic::{AtomicBool, AtomicUsize}; -use bytes::Bytes; -use tokio::sync::{mpsc, Notify}; -use crate::utils::ringbuf::{RingBufReader, RingBufWriter, RingBuffer}; +// version pull-based -pub struct Mixer { - // writer: Arc>, - // reader: Arc>, - pre_buffer: Arc, - worker_sender: Option>>, - buffer: Bytes // 1920 frames +use std::sync::Arc; +use arc_swap::ArcSwap; +use crate::domain::audio_client::AudioClientManager; + +#[derive(Clone)] +pub struct AudioMixer { + audio_client_manager: AudioClientManager, + buffer: Arc>> } -impl Mixer { - pub fn new() -> Self { - let (writer, reader) = RingBuffer::::new(1024).split(); +impl AudioMixer { + pub fn new(audio_client_manager: AudioClientManager) -> Self { Self { - // writer: Arc::new(writer), - // reader: Arc::new(reader), - pre_buffer: Arc::new(PreBuffer::new()), - worker_sender: None, + audio_client_manager, + buffer: Arc::new(ArcSwap::from_pointee(Vec::new())), } } + pub fn mix_next_frame(&self, size: usize) { + let mut frames = Vec::>::new(); + // Récupérer les buffers audio des utilisateurs, par défaut, ils sont en mono, donc size / 2 + // convertir en stéréo, donc size * 2 frames + let users_audio = self.audio_client_manager.take_audio_collection(size/2).into_iter() + .map(|audio| AudioMixer::mono_to_stereo(audio)) + .collect::>>(); + frames.extend_from_slice(&users_audio); - // Démarrer le worker de pré-traitement - pub async fn start(&mut self){ - let (sender, mut receiver) = mpsc::unbounded_channel::>(); - - // let (writer, reader) = RingBuffer::::new(1024).split(); - self.worker_sender = Some(sender); - let prebuffer = self.pre_buffer.clone(); - // worker de pré-traitement - tokio::spawn(async move { - while let Some(data) = receiver.recv().await { - // data doit exactement faire 960 (mono) ou 1920 (stéréo), 20ms - // si il fait 960, on converti en stéréo - // on écrit dans un buffer de pré-traitement - // si data rempli pas les condition, on ignore - - // on vérifie la taille des données - match data.len() { - 960 => { - // Mono 20ms @ 48kHz - convertir en stéréo - let stereo_data = Self::mono_to_stereo(data); - // push dans un buffer de pré-traitement - prebuffer.push(stereo_data).await; - }, - 1920 => { - // push dans un buffer de pré-traitement - prebuffer.push(data).await; - } - _ => { - println!("⚠️ Données audio ignorées - taille incorrecte: {} bytes", data.len()); - } - } - } - }); - } - - // Envoyer des données au pré-traitement - pub async fn write(&self, data: Vec){ - if let Some(sender) = self.worker_sender.as_ref() { - let _ = sender.send(data); - } - } - - // S'occupe de générer la trame qui sera lu dans 20ms - pub async fn mix(&self){ - // récupérer le buffer de pré-traitement, qui sera tableau de Vec (le lock ? pour éviter que le worker de pré-traitement continue d'alimenter) - // le mixer (sum de chaque trame ?) - // le mettre dans un buffer, qui sera accessible par AudioPlayback - // écraser le buffer de pré-traitement - // (libérer le lock) - let frames = self.pre_buffer.read_all().await; + // Récupérer tous les sons des notifications (pas encore dev) let mixed_frame = if frames.is_empty() { - [0i16; 1920] - } else { - Self::mix_frames(&frames) + vec![0i16; size] + }else{ + Self::mix_frames(&frames, size) }; + self.buffer.store(Arc::new(mixed_frame)); } - // Récupérer la trame présente qui est déjà pré-généré par mix - pub async fn read(&self) -> [i16; 1920]{ - let buffer = self.buffer; - // vider le buffer - self.buffer = - buffer + pub fn read(&self, size: usize) -> Vec { + let mut data = (**self.buffer.load()).clone(); + data.resize(size, 0); + data } } -impl Mixer { - // Functions helpers +impl AudioMixer { fn mono_to_stereo(mono_samples: Vec) -> Vec { let mut stereo_data = Vec::with_capacity(mono_samples.len() * 2); @@ -105,121 +57,22 @@ impl Mixer { stereo_data } - // Mixer plusieurs trames - fn mix_frames(frames: &[Vec]) -> [i16; 1920] { - let mut mixed = [0i32; 1920]; + fn mix_frames(frames: &[Vec], size: usize) -> Vec { + let mut mixed = vec![0i32; size]; for frame in frames { for (i, &sample) in frame.iter().enumerate() { - if i < 1920 { + if i < size { mixed[i] += sample as i32; } } } - let mut result = [0i16; 1920]; - let count = frames.len() as i32; - for (i, &sample) in mixed.iter().enumerate() { - result[i] = (sample / count).clamp(i16::MIN as i32, i16::MAX as i32) as i16; - } + let count = frames.len().max(1) as i32; // éviter la division par zéro + mixed + .into_iter() + .map(|sample| (sample / count).clamp(i16::MIN as i32, i16::MAX as i32) as i16) + .collect() - result } - -} - - -/// Pre buffer -#[derive(Clone)] -struct PreBuffer { - sender: Arc>>, - receiver: Arc>>, - is_being_read: Arc, - read_done_notify: Arc, -} - -impl PreBuffer { - fn new() -> Self { - let (sender, reader) = kanal::unbounded_async::>(); - Self { - sender: Arc::new(sender), - receiver: Arc::new(reader), - is_being_read: Arc::new(AtomicBool::new(false)), - read_done_notify: Arc::new(Notify::new()), - } - } - - async fn push(&self, frame: Vec) { - if self.is_being_read.load(atomic::Ordering::Acquire) { - self.read_done_notify.notified().await; - } - - let _ = self.sender.send(frame); - } - - async fn read_all(&self) -> Vec> { - self.is_being_read.store(true, atomic::Ordering::Release); - - let mut frames = Vec::new(); - while let Ok(frame) = self.receiver.recv().await { - frames.push(frame); - } - - // Libérer et notifier les writers en attente - self.is_being_read.store(false, atomic::Ordering::Release); - self.read_done_notify.notify_waiters(); - - frames - } -} -// struct PreBuffer { -// // Vec dynamique pour stocker les trames -// frames: Vec>, -// // Compteur atomique pour le nombre de trames disponibles -// frame_count: AtomicUsize, -// // Flag atomique pour indiquer si le buffer est en cours de lecture -// is_being_read: AtomicBool, -// read_done_notify: Arc, -// } -// -// impl PreBuffer { -// fn new() -> Self { -// Self { -// frames: Vec::new(), -// frame_count: AtomicUsize::new(0), -// is_being_read: AtomicBool::new(false), -// read_done_notify: Arc::new(Notify::new()), -// } -// } -// -// async fn push(&mut self, frame: Vec) { -// if self.is_being_read.load(atomic::Ordering::Acquire) { -// self.read_done_notify.notified().await; -// } -// -// self.frames.push(frame); -// self.frame_count.fetch_add(1, atomic::Ordering::Release); -// } -// -// fn reset(&mut self) { -// self.frame_count.store(0, atomic::Ordering::Release); -// } -// -// fn len(&self) -> usize { -// self.frame_count.load(atomic::Ordering::Acquire) -// } -// -// fn read_all(&mut self) -> Vec> { -// self.is_being_read.store(true, atomic::Ordering::Release); -// -// let frames = std::mem::take(&mut self.frames); -// self.frame_count.store(0, atomic::Ordering::Release); -// -// // Libérer et notifier les writers en attente -// self.is_being_read.store(false, atomic::Ordering::Release); -// self.read_done_notify.notify_waiters(); -// -// frames -// } -// } -// +} \ No newline at end of file diff --git a/src-tauri/src/core/mixer_push.rs b/src-tauri/src/core/mixer_push.rs new file mode 100644 index 0000000..a1950aa --- /dev/null +++ b/src-tauri/src/core/mixer_push.rs @@ -0,0 +1,229 @@ +// version non utilisé, c'était un prototype "push-based", jamais testé + +use std::sync::{atomic, Arc}; +use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize}; +use arc_swap::ArcSwap; +use bytes::Bytes; +use tokio::sync::{mpsc, Notify}; +use crate::utils::ringbuf::{RingBufReader, RingBufWriter, RingBuffer}; + +pub struct Mixer { + // writer: Arc>, + // reader: Arc>, + pre_buffer: Arc, + worker_sender: Option>>, + buffer: Arc>, // 1920 frames +} + +impl Mixer { + pub fn new() -> Self { + let (writer, reader) = RingBuffer::::new(1024).split(); + Self { + // writer: Arc::new(writer), + // reader: Arc::new(reader), + pre_buffer: Arc::new(PreBuffer::new()), + worker_sender: None, + buffer: Arc::new(ArcSwap::from_pointee([0i16; 1920])), + } + } + + // Démarrer le worker de pré-traitement + pub async fn start(&mut self){ + let (sender, mut receiver) = mpsc::unbounded_channel::>(); + + // let (writer, reader) = RingBuffer::::new(1024).split(); + self.worker_sender = Some(sender); + let prebuffer = self.pre_buffer.clone(); + // worker de pré-traitement + tokio::spawn(async move { + while let Some(data) = receiver.recv().await { + // data doit exactement faire 960 (mono) ou 1920 (stéréo), 20ms + // si il fait 960, on converti en stéréo + // on écrit dans un buffer de pré-traitement + // si data rempli pas les condition, on ignore + + // on vérifie la taille des données + match data.len() { + 960 => { + // Mono 20ms @ 48kHz - convertir en stéréo + let stereo_data = Self::mono_to_stereo(data); + // push dans un buffer de pré-traitement + prebuffer.push(stereo_data).await; + }, + 1920 => { + // push dans un buffer de pré-traitement + prebuffer.push(data).await; + } + _ => { + println!("⚠️ Données audio ignorées - taille incorrecte: {} bytes", data.len()); + } + } + } + }); + } + + // Envoyer des données au pré-traitement + pub async fn write(&self, data: Vec){ + if let Some(sender) = self.worker_sender.as_ref() { + let _ = sender.send(data); + } + } + + // S'occupe de générer la trame qui sera lu dans 20ms + pub async fn mix(&self){ + // récupérer le buffer de pré-traitement, qui sera tableau de Vec (le lock ? pour éviter que le worker de pré-traitement continue d'alimenter) + // le mixer (sum de chaque trame ?) + // le mettre dans un buffer, qui sera accessible par AudioPlayback + // écraser le buffer de pré-traitement + // (libérer le lock) + let frames = self.pre_buffer.read_all().await; + + let mixed_frame = if frames.is_empty() { + [0i16; 1920] // Silence + } else { + Self::mix_frames(&frames) + }; + + // ✅ Swap direct - aucune conversion ! + self.buffer.store(Arc::new(mixed_frame)); + } + + // Récupérer la trame présente qui est déjà pré-généré par mix + pub fn read(&self) -> [i16; 1920] { + // vider le buffer + **self.buffer.load() + } +} + +impl Mixer { + // Functions helpers + fn mono_to_stereo(mono_samples: Vec) -> Vec { + let mut stereo_data = Vec::with_capacity(mono_samples.len() * 2); + + // Chaque échantillon mono devient deux échantillons stéréo identiques + for sample in mono_samples { + stereo_data.push(sample); // Canal gauche + stereo_data.push(sample); // Canal droit + } + + stereo_data + } + + // Mixer plusieurs trames + fn mix_frames(frames: &[Vec]) -> [i16; 1920] { + let mut mixed = [0i32; 1920]; + + for frame in frames { + for (i, &sample) in frame.iter().enumerate() { + if i < 1920 { + mixed[i] += sample as i32; + } + } + } + + let mut result = [0i16; 1920]; + let count = frames.len() as i32; + for (i, &sample) in mixed.iter().enumerate() { + result[i] = (sample / count).clamp(i16::MIN as i32, i16::MAX as i32) as i16; + } + + result + } + +} + + +/// Pre buffer +#[derive(Clone)] +struct PreBuffer { + sender: Arc>>, + receiver: Arc>>, + is_being_read: Arc, + read_done_notify: Arc, +} + +impl PreBuffer { + fn new() -> Self { + let (sender, reader) = kanal::unbounded_async::>(); + Self { + sender: Arc::new(sender), + receiver: Arc::new(reader), + is_being_read: Arc::new(AtomicBool::new(false)), + read_done_notify: Arc::new(Notify::new()), + } + } + + async fn push(&self, frame: Vec) { + if self.is_being_read.load(atomic::Ordering::Acquire) { + self.read_done_notify.notified().await; + } + + let _ = self.sender.send(frame); + } + + async fn read_all(&self) -> Vec> { + self.is_being_read.store(true, atomic::Ordering::Release); + + let mut frames = Vec::new(); + while let Ok(frame) = self.receiver.recv().await { + frames.push(frame); + } + + // Libérer et notifier les writers en attente + self.is_being_read.store(false, atomic::Ordering::Release); + self.read_done_notify.notify_waiters(); + + frames + } +} +// struct PreBuffer { +// // Vec dynamique pour stocker les trames +// frames: Vec>, +// // Compteur atomique pour le nombre de trames disponibles +// frame_count: AtomicUsize, +// // Flag atomique pour indiquer si le buffer est en cours de lecture +// is_being_read: AtomicBool, +// read_done_notify: Arc, +// } +// +// impl PreBuffer { +// fn new() -> Self { +// Self { +// frames: Vec::new(), +// frame_count: AtomicUsize::new(0), +// is_being_read: AtomicBool::new(false), +// read_done_notify: Arc::new(Notify::new()), +// } +// } +// +// async fn push(&mut self, frame: Vec) { +// if self.is_being_read.load(atomic::Ordering::Acquire) { +// self.read_done_notify.notified().await; +// } +// +// self.frames.push(frame); +// self.frame_count.fetch_add(1, atomic::Ordering::Release); +// } +// +// fn reset(&mut self) { +// self.frame_count.store(0, atomic::Ordering::Release); +// } +// +// fn len(&self) -> usize { +// self.frame_count.load(atomic::Ordering::Acquire) +// } +// +// fn read_all(&mut self) -> Vec> { +// self.is_being_read.store(true, atomic::Ordering::Release); +// +// let frames = std::mem::take(&mut self.frames); +// self.frame_count.store(0, atomic::Ordering::Release); +// +// // Libérer et notifier les writers en attente +// self.is_being_read.store(false, atomic::Ordering::Release); +// self.read_done_notify.notify_waiters(); +// +// frames +// } +// } +// diff --git a/src-tauri/src/core/playback.rs b/src-tauri/src/core/playback.rs index 5505598..1ef3f7b 100644 --- a/src-tauri/src/core/playback.rs +++ b/src-tauri/src/core/playback.rs @@ -1,8 +1,10 @@ -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread::JoinHandle; +use std::time::Instant; use cpal::{default_host, BufferSize, Device, SampleRate, Stream, StreamConfig, SupportedStreamConfig}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +use crate::core::mixer::AudioMixer; use crate::domain::event::{Event, EventBus}; use crate::utils::real_time_event::RealTimeEvent; @@ -17,7 +19,7 @@ pub struct AudioPlayback { running: Arc, stream: Option, worker: Option>, - next_tick: RealTimeEvent + mixer: AudioMixer } impl Speaker { @@ -33,16 +35,16 @@ impl Speaker { Speaker::new(device) } - pub fn get_input_config(&self) -> SupportedStreamConfig { + pub fn get_output_config(&self) -> SupportedStreamConfig { self.device.default_output_config().unwrap() } pub fn get_stream_config(&self) -> StreamConfig { - let config = self.get_input_config(); + let config = self.get_output_config(); let mut stream_config: StreamConfig = config.into(); stream_config.channels = 2; stream_config.sample_rate = SampleRate(48000); - stream_config.buffer_size = BufferSize::Fixed(1920); + stream_config.buffer_size = BufferSize::Fixed(960); stream_config } @@ -58,40 +60,57 @@ impl Speaker { |err| println!("Error output stream: {err}"), None ).unwrap() - } } impl AudioPlayback { - pub fn new(event_bus: EventBus, speaker: Speaker) -> Self { + pub fn new(event_bus: EventBus, speaker: Speaker, mixer: AudioMixer) -> Self { Self { event_bus, speaker, running: Arc::new(AtomicBool::new(false)), stream: None, worker: None, - next_tick: RealTimeEvent::new(), + mixer } } - pub fn default(event_bus: EventBus) -> Self { + pub fn default(event_bus: EventBus, mixer: AudioMixer) -> Self { let speaker = Speaker::default(); - AudioPlayback::new(event_bus, speaker) + AudioPlayback::new(event_bus, speaker, mixer) } pub async fn start(&mut self) { + self.running.store(true, Ordering::SeqCst); let stream_running = self.running.clone(); let event_bus = self.event_bus.clone(); + let mixer = self.mixer.clone(); // stream cpal println!("Setting up audio playback stream..."); - let stream = self.speaker.build_stream(move |data, _| { + let last_time = Mutex::new(Instant::now()); + let stream = self.speaker.build_stream(move |data, info| { + println!( + "CALLBACK : reçu {} samples, type info = {:?}", + data.len(), + info + ); + + let now = Instant::now(); + let mut last = last_time.lock().unwrap(); + let dt = now.duration_since(*last); + println!("Callback audio appelée chaque {:?} ms (≈ {:.1} Hz)", dt.as_millis(), 1000.0 / dt.as_millis().max(1) as f32); + *last = now; + if !stream_running.load(Ordering::Relaxed){ return; } - // aller récupérer 1920 sur un buffer - // écrire le contenu dans data + println!("Audio playback stream tick"); - let _ = event_bus.emit(Event::PlaybackTick); + let audio_mixer = mixer.read(data.len()); + data.copy_from_slice(&audio_mixer); + // println!("data content : {:?}", data); + + let _ = event_bus.emit_sync(Event::PlaybackTick(data.len())); }); stream.play().unwrap(); self.stream = Some(stream); @@ -99,7 +118,7 @@ impl AudioPlayback { } pub async fn stop(&mut self) { - self.running.store(false, std::sync::atomic::Ordering::SeqCst); + self.running.store(false, Ordering::SeqCst); } } \ No newline at end of file diff --git a/src-tauri/src/domain/audio_client.rs b/src-tauri/src/domain/audio_client.rs index 9880c42..92aada2 100644 --- a/src-tauri/src/domain/audio_client.rs +++ b/src-tauri/src/domain/audio_client.rs @@ -53,7 +53,6 @@ impl AudioClient { Ok(audio_frame) => { // Pousser la frame complète dans le buffer writer_clone.push_slice_overwrite(&audio_frame); - println!("writed frame"); }, Err(e) => { eprintln!("Erreur de décodage audio : {}", e); @@ -78,6 +77,21 @@ impl AudioClient { sequence }); } + + pub fn read_audio(&self, size: usize) -> Option> { + if self.buffer_reader.len() < size { + return None; + } + + let mut buffer = vec![0i16; size]; + let read_count = self.buffer_reader.pop_slice(&mut buffer); + + if read_count == size { + Some(buffer) + }else { + None + } + } } @@ -107,4 +121,16 @@ impl AudioClientManager { pub fn write_audio_to_client(&self, uuid: uuid::Uuid, sequence: u16, data: Bytes) { let _ = self.audio_clients.get(&uuid).unwrap().write_audio(sequence, data); } + + pub fn take_audio_collection(&self, size: usize) -> Vec> { + let mut buffers = Vec::new(); + + for client in self.audio_clients.values() { + if let Some(buffer) = client.read_audio(size) { + buffers.push(buffer); + } + } + buffers + + } } diff --git a/src-tauri/src/domain/event.rs b/src-tauri/src/domain/event.rs index cb5722f..74f1fee 100644 --- a/src-tauri/src/domain/event.rs +++ b/src-tauri/src/domain/event.rs @@ -9,8 +9,8 @@ pub enum Event { AudioIn(Vec), AudioEncoded(Vec), - PlaybackTick, - PlaybackRequest(Bytes), + PlaybackTick(usize), + // PlaybackRequest(Bytes), NetConnected, NetDisconnected, diff --git a/src-tauri/src/runtime/dispatcher.rs b/src-tauri/src/runtime/dispatcher.rs index a4966a5..b0ca357 100644 --- a/src-tauri/src/runtime/dispatcher.rs +++ b/src-tauri/src/runtime/dispatcher.rs @@ -8,6 +8,7 @@ use bytes::Bytes; use parking_lot::RwLock; use tokio::task::AbortHandle; use uuid::Uuid; +use crate::core::mixer::AudioMixer; use crate::domain::audio_client::{AudioClient, AudioClientManager}; use crate::domain::event::{Event, EventBus}; use crate::network::protocol::{MessageClient, MessageServer}; @@ -28,6 +29,7 @@ pub struct Dispatcher { tauri_handle: Arc, audio_client_manager: Arc, + audio_mixer: Arc, // todo : temporaire, le temps d'avoir un handler @@ -54,7 +56,12 @@ impl PingInfo { } impl Dispatcher { - pub fn new(event_bus: EventBus, udp_session: UdpSession, audio_client_manager: AudioClientManager, tauri_handle: AppHandle) -> Self { + pub fn new(event_bus: EventBus, + udp_session: UdpSession, + audio_client_manager: AudioClientManager, + audio_mixer: AudioMixer, + tauri_handle: AppHandle + ) -> Self { Self { event_bus: Arc::new(event_bus), udp_session: Arc::new(udp_session), @@ -63,6 +70,7 @@ impl Dispatcher { can_send_audio: Arc::new(AtomicBool::new(false)), ping_tracker: Arc::new(RwLock::new(HashMap::new())), audio_client_manager: Arc::new(audio_client_manager), + audio_mixer: Arc::new(audio_mixer), } } @@ -70,6 +78,7 @@ impl Dispatcher { let (_udp_in_abort_handle, udp_in_sender) = self.udp_in_handler().await; let udp_session = self.udp_session.clone(); let sequence_counter = self.sequence_counter.clone(); + let audio_mixer = self.audio_mixer.clone(); while let Ok(event) = receiver.recv().await { match event { @@ -88,8 +97,8 @@ impl Dispatcher { sequence_counter.fetch_add(1, Ordering::Relaxed); } - Event::PlaybackTick => { - + Event::PlaybackTick(paquet_size) => { + audio_mixer.mix_next_frame(paquet_size); } Event::NetIn(message_event) => { // println!("NetIn: {:?}", message_event); @@ -143,6 +152,7 @@ impl Dispatcher { } } MessageServer::Audio {user, sequence, data} => { + // println!("Audio received from {}: {} -> {:?}", user, sequence, data); if audio_client_manager.audio_client_exists(user) { audio_client_manager.write_audio_to_client(user, sequence, data); }else{