diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index 8d50aa5..723ff4d 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -1945,6 +1945,16 @@ dependencies = [ "serde_json", ] +[[package]] +name = "kanal" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e3953adf0cd667798b396c2fa13552d6d9b3269d7dd1154c4c416442d1ff574" +dependencies = [ + "futures-core", + "lock_api", +] + [[package]] name = "keyboard-types" version = "0.7.0" @@ -2637,6 +2647,7 @@ dependencies = [ "cpal", "crossbeam-channel", "event-listener", + "kanal", "moka", "opus", "parking_lot", diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index 9cd552d..cd14e99 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -33,4 +33,5 @@ event-listener = "5.4" bytes = "1.10" moka = {version = "0.12", features = ["future"] } arc-swap = "1.7" -crossbeam-channel = "0.5" \ No newline at end of file +crossbeam-channel = "0.5" +kanal = "0.1" diff --git a/src-tauri/src/app/ox_speak_app.rs b/src-tauri/src/app/ox_speak_app.rs index 8c29b0a..5d2f4f6 100644 --- a/src-tauri/src/app/ox_speak_app.rs +++ b/src-tauri/src/app/ox_speak_app.rs @@ -4,6 +4,7 @@ use tauri::{AppHandle, Emitter, Listener}; use tokio; use tokio::sync::mpsc; use crate::core::capture::AudioCapture; +use crate::domain::audio_client::AudioClientManager; use crate::domain::event::{Event, EventBus}; use crate::network::udp::UdpSession; use crate::runtime::dispatcher::Dispatcher; @@ -12,7 +13,7 @@ pub struct OxSpeakApp { // Communication inter-thread event_bus: EventBus, dispatcher: Dispatcher, - event_rx: Option>, + event_rx: kanal::AsyncReceiver, // Network udp_session: UdpSession, @@ -20,6 +21,9 @@ pub struct OxSpeakApp { // audio audio_capture: AudioCapture, + // audio_client + audio_client_manager: AudioClientManager, + // Tauri handle tauri_handle: AppHandle } @@ -41,31 +45,34 @@ impl OxSpeakApp { println!("Initializing UDP session"); let udp_session = UdpSession::new(event_bus.clone()); + // UdpClient + println!("Initializing UDP client"); + let audio_client_manager = AudioClientManager::new(); + // Dispatcher - Communication inter-components println!("Initializing event dispatcher"); - let mut dispatcher = Dispatcher::new(event_bus.clone(), udp_session.clone(), tauri_handle.clone()); + let mut dispatcher = Dispatcher::new(event_bus.clone(), udp_session.clone(), audio_client_manager.clone() ,tauri_handle.clone()); println!("OxSpeakApp initialization complete"); Self { event_bus, dispatcher, - event_rx: Some(event_rx), + event_rx, udp_session, audio_capture, + audio_client_manager, tauri_handle, } } - - pub async fn start(&mut self) { println!("Starting OxSpeakApp"); // dispatcher - lancement du process pour la communication inter-process - println!("Starting event dispatcher"); - let mut dispatcher = self.dispatcher.clone(); - // Prendre l'ownership du receiver (event_rx) - if let Some(event_rx) = self.event_rx.take() { + println!("Starting 4 instances of event dispatcher"); + for _ in 0..4 { + let dispatcher = self.dispatcher.clone(); + let event_rx = self.event_rx.clone(); tokio::spawn(async move { dispatcher.start(event_rx).await }); diff --git a/src-tauri/src/core/capture.rs b/src-tauri/src/core/capture.rs index a0a29cb..618b2aa 100644 --- a/src-tauri/src/core/capture.rs +++ b/src-tauri/src/core/capture.rs @@ -99,7 +99,7 @@ impl AudioCapture { }); stream.play().unwrap(); self.steam = Some(stream); - println!("Audio stream started"); + println!("Audio capture stream started"); // Audio processing worker println!("Starting audio processing worker"); diff --git a/src-tauri/src/core/mixer.rs b/src-tauri/src/core/mixer.rs index 2656861..55477e0 100644 --- a/src-tauri/src/core/mixer.rs +++ b/src-tauri/src/core/mixer.rs @@ -1 +1,225 @@ -// aller pick l'audio des clients +use std::sync::{atomic, Arc}; +use std::sync::atomic::{AtomicBool, AtomicUsize}; +use bytes::Bytes; +use tokio::sync::{mpsc, Notify}; +use crate::utils::ringbuf::{RingBufReader, RingBufWriter, RingBuffer}; + +pub struct Mixer { + // writer: Arc>, + // reader: Arc>, + pre_buffer: Arc, + worker_sender: Option>>, + buffer: Bytes // 1920 frames +} + +impl Mixer { + pub fn new() -> Self { + let (writer, reader) = RingBuffer::::new(1024).split(); + Self { + // writer: Arc::new(writer), + // reader: Arc::new(reader), + pre_buffer: Arc::new(PreBuffer::new()), + worker_sender: None, + } + } + + // Démarrer le worker de pré-traitement + pub async fn start(&mut self){ + let (sender, mut receiver) = mpsc::unbounded_channel::>(); + + // let (writer, reader) = RingBuffer::::new(1024).split(); + self.worker_sender = Some(sender); + let prebuffer = self.pre_buffer.clone(); + // worker de pré-traitement + tokio::spawn(async move { + while let Some(data) = receiver.recv().await { + // data doit exactement faire 960 (mono) ou 1920 (stéréo), 20ms + // si il fait 960, on converti en stéréo + // on écrit dans un buffer de pré-traitement + // si data rempli pas les condition, on ignore + + // on vérifie la taille des données + match data.len() { + 960 => { + // Mono 20ms @ 48kHz - convertir en stéréo + let stereo_data = Self::mono_to_stereo(data); + // push dans un buffer de pré-traitement + prebuffer.push(stereo_data).await; + }, + 1920 => { + // push dans un buffer de pré-traitement + prebuffer.push(data).await; + } + _ => { + println!("⚠️ Données audio ignorées - taille incorrecte: {} bytes", data.len()); + } + } + } + }); + } + + // Envoyer des données au pré-traitement + pub async fn write(&self, data: Vec){ + if let Some(sender) = self.worker_sender.as_ref() { + let _ = sender.send(data); + } + } + + // S'occupe de générer la trame qui sera lu dans 20ms + pub async fn mix(&self){ + // récupérer le buffer de pré-traitement, qui sera tableau de Vec (le lock ? pour éviter que le worker de pré-traitement continue d'alimenter) + // le mixer (sum de chaque trame ?) + // le mettre dans un buffer, qui sera accessible par AudioPlayback + // écraser le buffer de pré-traitement + // (libérer le lock) + let frames = self.pre_buffer.read_all().await; + + let mixed_frame = if frames.is_empty() { + [0i16; 1920] + } else { + Self::mix_frames(&frames) + }; + + } + + // Récupérer la trame présente qui est déjà pré-généré par mix + pub async fn read(&self) -> [i16; 1920]{ + let buffer = self.buffer; + // vider le buffer + self.buffer = + buffer + } +} + +impl Mixer { + // Functions helpers + fn mono_to_stereo(mono_samples: Vec) -> Vec { + let mut stereo_data = Vec::with_capacity(mono_samples.len() * 2); + + // Chaque échantillon mono devient deux échantillons stéréo identiques + for sample in mono_samples { + stereo_data.push(sample); // Canal gauche + stereo_data.push(sample); // Canal droit + } + + stereo_data + } + + // Mixer plusieurs trames + fn mix_frames(frames: &[Vec]) -> [i16; 1920] { + let mut mixed = [0i32; 1920]; + + for frame in frames { + for (i, &sample) in frame.iter().enumerate() { + if i < 1920 { + mixed[i] += sample as i32; + } + } + } + + let mut result = [0i16; 1920]; + let count = frames.len() as i32; + for (i, &sample) in mixed.iter().enumerate() { + result[i] = (sample / count).clamp(i16::MIN as i32, i16::MAX as i32) as i16; + } + + result + } + +} + + +/// Pre buffer +#[derive(Clone)] +struct PreBuffer { + sender: Arc>>, + receiver: Arc>>, + is_being_read: Arc, + read_done_notify: Arc, +} + +impl PreBuffer { + fn new() -> Self { + let (sender, reader) = kanal::unbounded_async::>(); + Self { + sender: Arc::new(sender), + receiver: Arc::new(reader), + is_being_read: Arc::new(AtomicBool::new(false)), + read_done_notify: Arc::new(Notify::new()), + } + } + + async fn push(&self, frame: Vec) { + if self.is_being_read.load(atomic::Ordering::Acquire) { + self.read_done_notify.notified().await; + } + + let _ = self.sender.send(frame); + } + + async fn read_all(&self) -> Vec> { + self.is_being_read.store(true, atomic::Ordering::Release); + + let mut frames = Vec::new(); + while let Ok(frame) = self.receiver.recv().await { + frames.push(frame); + } + + // Libérer et notifier les writers en attente + self.is_being_read.store(false, atomic::Ordering::Release); + self.read_done_notify.notify_waiters(); + + frames + } +} +// struct PreBuffer { +// // Vec dynamique pour stocker les trames +// frames: Vec>, +// // Compteur atomique pour le nombre de trames disponibles +// frame_count: AtomicUsize, +// // Flag atomique pour indiquer si le buffer est en cours de lecture +// is_being_read: AtomicBool, +// read_done_notify: Arc, +// } +// +// impl PreBuffer { +// fn new() -> Self { +// Self { +// frames: Vec::new(), +// frame_count: AtomicUsize::new(0), +// is_being_read: AtomicBool::new(false), +// read_done_notify: Arc::new(Notify::new()), +// } +// } +// +// async fn push(&mut self, frame: Vec) { +// if self.is_being_read.load(atomic::Ordering::Acquire) { +// self.read_done_notify.notified().await; +// } +// +// self.frames.push(frame); +// self.frame_count.fetch_add(1, atomic::Ordering::Release); +// } +// +// fn reset(&mut self) { +// self.frame_count.store(0, atomic::Ordering::Release); +// } +// +// fn len(&self) -> usize { +// self.frame_count.load(atomic::Ordering::Acquire) +// } +// +// fn read_all(&mut self) -> Vec> { +// self.is_being_read.store(true, atomic::Ordering::Release); +// +// let frames = std::mem::take(&mut self.frames); +// self.frame_count.store(0, atomic::Ordering::Release); +// +// // Libérer et notifier les writers en attente +// self.is_being_read.store(false, atomic::Ordering::Release); +// self.read_done_notify.notify_waiters(); +// +// frames +// } +// } +// diff --git a/src-tauri/src/core/playback.rs b/src-tauri/src/core/playback.rs index 53ad6b2..5505598 100644 --- a/src-tauri/src/core/playback.rs +++ b/src-tauri/src/core/playback.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread::JoinHandle; use cpal::{default_host, BufferSize, Device, SampleRate, Stream, StreamConfig, SupportedStreamConfig}; -use cpal::traits::{DeviceTrait, HostTrait}; -use crate::domain::event::EventBus; +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +use crate::domain::event::{Event, EventBus}; use crate::utils::real_time_event::RealTimeEvent; #[derive(Clone)] @@ -80,21 +80,26 @@ impl AudioPlayback { } pub async fn start(&mut self) { - - } - - pub async fn stop(&mut self) { - self.running.store(false, std::sync::atomic::Ordering::SeqCst); - + let stream_running = self.running.clone(); + let event_bus = self.event_bus.clone(); // stream cpal println!("Setting up audio playback stream..."); - let stream_running = self.running.clone(); let stream = self.speaker.build_stream(move |data, _| { if !stream_running.load(Ordering::Relaxed){ return; } // aller récupérer 1920 sur un buffer // écrire le contenu dans data + + let _ = event_bus.emit(Event::PlaybackTick); }); + stream.play().unwrap(); + self.stream = Some(stream); + println!("Audio playback stream started"); + } + + pub async fn stop(&mut self) { + self.running.store(false, std::sync::atomic::Ordering::SeqCst); + } } \ No newline at end of file diff --git a/src-tauri/src/domain/audio_client.rs b/src-tauri/src/domain/audio_client.rs index 0370ba6..9880c42 100644 --- a/src-tauri/src/domain/audio_client.rs +++ b/src-tauri/src/domain/audio_client.rs @@ -8,11 +8,11 @@ use crate::core::opus::{AudioOpus, AudioOpusDecoder}; use crate::utils::ringbuf::{RingBufReader, RingBufWriter, RingBuffer}; use crate::utils::shared_store::SharedArcMap; -struct AudioClient { +pub struct AudioClient { uuid: uuid::Uuid, decode_sender: mpsc::Sender, - buffer_reader: RingBufReader>, - buffer_writer: RingBufWriter> + buffer_reader: RingBufReader, + buffer_writer: RingBufWriter } struct DecodeRequest { @@ -21,21 +21,23 @@ struct DecodeRequest { } #[derive(Clone)] -struct AudioClientManager { +pub struct AudioClientManager { audio_clients: SharedArcMap, } impl AudioClient { pub fn new() -> Self { - let (writer, reader) = RingBuffer::>::new(1024).split(); + let (writer, reader) = RingBuffer::::new(4096).split(); let (decode_sender, mut decode_reader) = mpsc::channel::(100); + let writer_clone = writer.clone(); let decode_handle = tokio::spawn(async move { - let mut decoder = AudioOpus::new(44800, 1, "voip") + let mut decoder = AudioOpus::new(48000, 1, "voip") .create_decoder().unwrap(); + let mut last_sequence: u16 = 0; while let Some(request) = decode_reader.recv().await { - // si la séquence est "trop vieille" on la drop. (voir plus tard pour un système de ratrapage si c'est possible) + // si la séquence est "trop vieille" on la drop. (voir plus tard pour un système de rattrapage si c'est possible) if last_sequence < request.sequence { // todo : si le décodage est trop long, voir pour le mettre dans un thread // avec let result = tokio::task::spawn_blocking({ @@ -50,7 +52,8 @@ impl AudioClient { match result { Ok(audio_frame) => { // Pousser la frame complète dans le buffer - writer.push(audio_frame); + writer_clone.push_slice_overwrite(&audio_frame); + println!("writed frame"); }, Err(e) => { eprintln!("Erreur de décodage audio : {}", e); @@ -69,8 +72,8 @@ impl AudioClient { } } - pub async fn write_audio(&self, sequence: u16, data: Bytes) { - let _ = self.decode_sender.send(DecodeRequest { + pub fn write_audio(&self, sequence: u16, data: Bytes) { + let _ = self.decode_sender.try_send(DecodeRequest { data, sequence }); @@ -79,9 +82,29 @@ impl AudioClient { impl AudioClientManager { - fn new() -> Self { + pub fn new() -> Self { Self { audio_clients: SharedArcMap::new() } } + + pub fn audio_client_exists(&self, uuid: uuid::Uuid) -> bool { + self.audio_clients.contains_key(&uuid) + } + + pub fn get_audio_client(&self, uuid: uuid::Uuid) -> Option> { + self.audio_clients.get(&uuid) + } + + pub fn add_audio_client(&self, uuid: uuid::Uuid, audio_client: AudioClient) { + self.audio_clients.insert(uuid, audio_client); + } + + pub fn remove_audio_client(&self, uuid: uuid::Uuid) { + self.audio_clients.remove(&uuid); + } + + pub fn write_audio_to_client(&self, uuid: uuid::Uuid, sequence: u16, data: Bytes) { + let _ = self.audio_clients.get(&uuid).unwrap().write_audio(sequence, data); + } } diff --git a/src-tauri/src/domain/event.rs b/src-tauri/src/domain/event.rs index f38ff9c..cb5722f 100644 --- a/src-tauri/src/domain/event.rs +++ b/src-tauri/src/domain/event.rs @@ -1,4 +1,5 @@ -use tokio::sync::mpsc; +use bytes::Bytes; +// use tokio::sync::mpsc; use crate::network::protocol::{MessageClient, MessageServer}; pub enum Event { @@ -8,6 +9,9 @@ pub enum Event { AudioIn(Vec), AudioEncoded(Vec), + PlaybackTick, + PlaybackRequest(Bytes), + NetConnected, NetDisconnected, NetIn(MessageServer), @@ -22,12 +26,12 @@ pub enum Event { #[derive(Clone)] pub struct EventBus { - pub sender: mpsc::Sender + pub sender: kanal::AsyncSender } impl EventBus { - pub fn new() -> (Self, mpsc::Receiver) { - let (sender, receiver) = mpsc::channel(4096); + pub fn new() -> (Self, kanal::AsyncReceiver) { + let (sender, receiver) = kanal::bounded_async::(4096); (Self { sender }, receiver) } @@ -40,7 +44,7 @@ impl EventBus { let _ = self.sender.try_send(event); } - pub fn clone_sender(&self) -> mpsc::Sender { + pub fn clone_sender(&self) -> kanal::AsyncSender { self.sender.clone() } } \ No newline at end of file diff --git a/src-tauri/src/runtime/dispatcher.rs b/src-tauri/src/runtime/dispatcher.rs index cbe0351..a4966a5 100644 --- a/src-tauri/src/runtime/dispatcher.rs +++ b/src-tauri/src/runtime/dispatcher.rs @@ -8,6 +8,7 @@ use bytes::Bytes; use parking_lot::RwLock; use tokio::task::AbortHandle; use uuid::Uuid; +use crate::domain::audio_client::{AudioClient, AudioClientManager}; use crate::domain::event::{Event, EventBus}; use crate::network::protocol::{MessageClient, MessageServer}; use crate::network::udp::UdpSession; @@ -20,11 +21,13 @@ struct PingInfo { #[derive(Clone)] pub struct Dispatcher { - event_bus: EventBus, + event_bus: Arc, - udp_session: UdpSession, + udp_session: Arc, - tauri_handle: AppHandle, + tauri_handle: Arc, + + audio_client_manager: Arc, // todo : temporaire, le temps d'avoir un handler @@ -51,23 +54,24 @@ impl PingInfo { } impl Dispatcher { - pub fn new(event_bus: EventBus, udp_session: UdpSession, tauri_handle: AppHandle) -> Self { + pub fn new(event_bus: EventBus, udp_session: UdpSession, audio_client_manager: AudioClientManager, tauri_handle: AppHandle) -> Self { Self { - event_bus, - udp_session, + event_bus: Arc::new(event_bus), + udp_session: Arc::new(udp_session), sequence_counter: Arc::new(AtomicU16::new(0)), - tauri_handle, + tauri_handle: Arc::new(tauri_handle), can_send_audio: Arc::new(AtomicBool::new(false)), ping_tracker: Arc::new(RwLock::new(HashMap::new())), + audio_client_manager: Arc::new(audio_client_manager), } } - pub async fn start(&mut self, mut receiver: mpsc::Receiver) { + pub async fn start(&self, receiver: kanal::AsyncReceiver) { let (_udp_in_abort_handle, udp_in_sender) = self.udp_in_handler().await; let udp_session = self.udp_session.clone(); let sequence_counter = self.sequence_counter.clone(); - while let Some(event) = receiver.recv().await { + while let Ok(event) = receiver.recv().await { match event { Event::AudioIn(sample) => { @@ -83,9 +87,12 @@ impl Dispatcher { udp_session.send(message).await; sequence_counter.fetch_add(1, Ordering::Relaxed); + } + Event::PlaybackTick => { + } Event::NetIn(message_event) => { - println!("NetIn: {:?}", message_event); + // println!("NetIn: {:?}", message_event); let _ = udp_in_sender.send(message_event).await; } Event::NetOut(message_call) => { @@ -117,6 +124,7 @@ impl Dispatcher { pub async fn udp_in_handler(&self) -> (AbortHandle, mpsc::Sender) { let (sender, mut consumer) = mpsc::channel::(1024); let ping_tracker = Arc::clone(&self.ping_tracker); + let audio_client_manager = self.audio_client_manager.clone(); let task = tokio::spawn(async move { while let Some(message) = consumer.recv().await { @@ -135,7 +143,12 @@ impl Dispatcher { } } MessageServer::Audio {user, sequence, data} => { - // Audio reçu + if audio_client_manager.audio_client_exists(user) { + audio_client_manager.write_audio_to_client(user, sequence, data); + }else{ + let client = AudioClient::new(); + audio_client_manager.add_audio_client(user, client); + } } } } diff --git a/src-tauri/src/tauri_ctx.rs b/src-tauri/src/tauri_ctx.rs index 08a3057..a176117 100644 --- a/src-tauri/src/tauri_ctx.rs +++ b/src-tauri/src/tauri_ctx.rs @@ -1,4 +1,4 @@ -use tauri::Manager; +use tauri::{Manager, WindowEvent}; use std::sync::Arc; use tokio::sync::Mutex; use crate::app::ox_speak_app::OxSpeakApp; @@ -45,6 +45,14 @@ pub async fn run() { Ok(()) }) // .invoke_handler(tauri::generate_handler![greet]) + .on_window_event(|window, event| match event { + WindowEvent::CloseRequested {api, ..} => { + println!("Closing window"); + } + _ => { + + } + }) .run(context) .expect("error while running tauri application"); }