This commit is contained in:
2025-07-20 04:17:44 +02:00
parent a1f829e2e6
commit d7a7af9eb4
7 changed files with 359 additions and 211 deletions

View File

@@ -4,6 +4,8 @@ use tauri::{AppHandle, Emitter, Listener};
use tokio; use tokio;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use crate::core::capture::AudioCapture; use crate::core::capture::AudioCapture;
use crate::core::mixer::AudioMixer;
use crate::core::playback::AudioPlayback;
use crate::domain::audio_client::AudioClientManager; use crate::domain::audio_client::AudioClientManager;
use crate::domain::event::{Event, EventBus}; use crate::domain::event::{Event, EventBus};
use crate::network::udp::UdpSession; use crate::network::udp::UdpSession;
@@ -20,6 +22,8 @@ pub struct OxSpeakApp {
// audio // audio
audio_capture: AudioCapture, audio_capture: AudioCapture,
audio_playback: AudioPlayback,
audio_mixer: AudioMixer,
// audio_client // audio_client
audio_client_manager: AudioClientManager, audio_client_manager: AudioClientManager,
@@ -40,18 +44,21 @@ impl OxSpeakApp {
// todo : pour le moment, paramètre par défaut, on verra plus tard pour dynamiser ça // todo : pour le moment, paramètre par défaut, on verra plus tard pour dynamiser ça
println!("Initializing audio capture"); println!("Initializing audio capture");
let audio_capture = AudioCapture::default(event_bus.clone()); let audio_capture = AudioCapture::default(event_bus.clone());
println!("Initializing audio client");
let audio_client_manager = AudioClientManager::new();
let audio_mixer = AudioMixer::new(audio_client_manager.clone());
let audio_playback = AudioPlayback::default(event_bus.clone(), audio_mixer.clone());
// UdpSession // UdpSession
println!("Initializing UDP session"); println!("Initializing UDP session");
let udp_session = UdpSession::new(event_bus.clone()); let udp_session = UdpSession::new(event_bus.clone());
// UdpClient // UdpClient
println!("Initializing UDP client");
let audio_client_manager = AudioClientManager::new();
// Dispatcher - Communication inter-components // Dispatcher - Communication inter-components
println!("Initializing event dispatcher"); println!("Initializing event dispatcher");
let mut dispatcher = Dispatcher::new(event_bus.clone(), udp_session.clone(), audio_client_manager.clone() ,tauri_handle.clone()); let mut dispatcher = Dispatcher::new(event_bus.clone(), udp_session.clone(), audio_client_manager.clone(), audio_mixer.clone(),tauri_handle.clone());
println!("OxSpeakApp initialization complete"); println!("OxSpeakApp initialization complete");
Self { Self {
@@ -61,6 +68,8 @@ impl OxSpeakApp {
udp_session, udp_session,
audio_capture, audio_capture,
audio_client_manager, audio_client_manager,
audio_playback,
audio_mixer,
tauri_handle, tauri_handle,
} }
} }
@@ -85,6 +94,8 @@ impl OxSpeakApp {
// Démarrer l'audio-capture // Démarrer l'audio-capture
println!("Starting audio capture"); println!("Starting audio capture");
self.audio_capture.start().await; self.audio_capture.start().await;
self.audio_playback.start().await;
println!("OxSpeakApp started successfully"); println!("OxSpeakApp started successfully");
let _ = self.tick_tasks().await; let _ = self.tick_tasks().await;

View File

@@ -1,98 +1,50 @@
use std::sync::{atomic, Arc}; // version pull-based
use std::sync::atomic::{AtomicBool, AtomicUsize};
use bytes::Bytes;
use tokio::sync::{mpsc, Notify};
use crate::utils::ringbuf::{RingBufReader, RingBufWriter, RingBuffer};
pub struct Mixer { use std::sync::Arc;
// writer: Arc<RingBufWriter<i16>>, use arc_swap::ArcSwap;
// reader: Arc<RingBufReader<i16>>, use crate::domain::audio_client::AudioClientManager;
pre_buffer: Arc<PreBuffer>,
worker_sender: Option<mpsc::UnboundedSender<Vec<i16>>>, #[derive(Clone)]
buffer: Bytes // 1920 frames pub struct AudioMixer {
audio_client_manager: AudioClientManager,
buffer: Arc<ArcSwap<Vec<i16>>>
} }
impl Mixer { impl AudioMixer {
pub fn new() -> Self { pub fn new(audio_client_manager: AudioClientManager) -> Self {
let (writer, reader) = RingBuffer::<i16>::new(1024).split();
Self { Self {
// writer: Arc::new(writer), audio_client_manager,
// reader: Arc::new(reader), buffer: Arc::new(ArcSwap::from_pointee(Vec::new())),
pre_buffer: Arc::new(PreBuffer::new()),
worker_sender: None,
} }
} }
pub fn mix_next_frame(&self, size: usize) {
let mut frames = Vec::<Vec<i16>>::new();
// Récupérer les buffers audio des utilisateurs, par défaut, ils sont en mono, donc size / 2
// convertir en stéréo, donc size * 2 frames
let users_audio = self.audio_client_manager.take_audio_collection(size/2).into_iter()
.map(|audio| AudioMixer::mono_to_stereo(audio))
.collect::<Vec<Vec<i16>>>();
frames.extend_from_slice(&users_audio);
// Démarrer le worker de pré-traitement // Récupérer tous les sons des notifications (pas encore dev)
pub async fn start(&mut self){
let (sender, mut receiver) = mpsc::unbounded_channel::<Vec<i16>>();
// let (writer, reader) = RingBuffer::<Bytes>::new(1024).split();
self.worker_sender = Some(sender);
let prebuffer = self.pre_buffer.clone();
// worker de pré-traitement
tokio::spawn(async move {
while let Some(data) = receiver.recv().await {
// data doit exactement faire 960 (mono) ou 1920 (stéréo), 20ms
// si il fait 960, on converti en stéréo
// on écrit dans un buffer de pré-traitement
// si data rempli pas les condition, on ignore
// on vérifie la taille des données
match data.len() {
960 => {
// Mono 20ms @ 48kHz - convertir en stéréo
let stereo_data = Self::mono_to_stereo(data);
// push dans un buffer de pré-traitement
prebuffer.push(stereo_data).await;
},
1920 => {
// push dans un buffer de pré-traitement
prebuffer.push(data).await;
}
_ => {
println!("⚠️ Données audio ignorées - taille incorrecte: {} bytes", data.len());
}
}
}
});
}
// Envoyer des données au pré-traitement
pub async fn write(&self, data: Vec<i16>){
if let Some(sender) = self.worker_sender.as_ref() {
let _ = sender.send(data);
}
}
// S'occupe de générer la trame qui sera lu dans 20ms
pub async fn mix(&self){
// récupérer le buffer de pré-traitement, qui sera tableau de Vec<i16> (le lock ? pour éviter que le worker de pré-traitement continue d'alimenter)
// le mixer (sum de chaque trame ?)
// le mettre dans un buffer, qui sera accessible par AudioPlayback
// écraser le buffer de pré-traitement
// (libérer le lock)
let frames = self.pre_buffer.read_all().await;
let mixed_frame = if frames.is_empty() { let mixed_frame = if frames.is_empty() {
[0i16; 1920] vec![0i16; size]
} else { }else{
Self::mix_frames(&frames) Self::mix_frames(&frames, size)
}; };
self.buffer.store(Arc::new(mixed_frame));
} }
// Récupérer la trame présente qui est déjà pré-généré par mix pub fn read(&self, size: usize) -> Vec<i16> {
pub async fn read(&self) -> [i16; 1920]{ let mut data = (**self.buffer.load()).clone();
let buffer = self.buffer; data.resize(size, 0);
// vider le buffer data
self.buffer =
buffer
} }
} }
impl Mixer { impl AudioMixer {
// Functions helpers
fn mono_to_stereo(mono_samples: Vec<i16>) -> Vec<i16> { fn mono_to_stereo(mono_samples: Vec<i16>) -> Vec<i16> {
let mut stereo_data = Vec::with_capacity(mono_samples.len() * 2); let mut stereo_data = Vec::with_capacity(mono_samples.len() * 2);
@@ -105,121 +57,22 @@ impl Mixer {
stereo_data stereo_data
} }
// Mixer plusieurs trames fn mix_frames(frames: &[Vec<i16>], size: usize) -> Vec<i16> {
fn mix_frames(frames: &[Vec<i16>]) -> [i16; 1920] { let mut mixed = vec![0i32; size];
let mut mixed = [0i32; 1920];
for frame in frames { for frame in frames {
for (i, &sample) in frame.iter().enumerate() { for (i, &sample) in frame.iter().enumerate() {
if i < 1920 { if i < size {
mixed[i] += sample as i32; mixed[i] += sample as i32;
} }
} }
} }
let mut result = [0i16; 1920]; let count = frames.len().max(1) as i32; // éviter la division par zéro
let count = frames.len() as i32; mixed
for (i, &sample) in mixed.iter().enumerate() { .into_iter()
result[i] = (sample / count).clamp(i16::MIN as i32, i16::MAX as i32) as i16; .map(|sample| (sample / count).clamp(i16::MIN as i32, i16::MAX as i32) as i16)
} .collect()
result
}
}
/// Pre buffer
#[derive(Clone)]
struct PreBuffer {
sender: Arc<kanal::AsyncSender<Vec<i16>>>,
receiver: Arc<kanal::AsyncReceiver<Vec<i16>>>,
is_being_read: Arc<AtomicBool>,
read_done_notify: Arc<Notify>,
}
impl PreBuffer {
fn new() -> Self {
let (sender, reader) = kanal::unbounded_async::<Vec<i16>>();
Self {
sender: Arc::new(sender),
receiver: Arc::new(reader),
is_being_read: Arc::new(AtomicBool::new(false)),
read_done_notify: Arc::new(Notify::new()),
}
}
async fn push(&self, frame: Vec<i16>) {
if self.is_being_read.load(atomic::Ordering::Acquire) {
self.read_done_notify.notified().await;
}
let _ = self.sender.send(frame);
}
async fn read_all(&self) -> Vec<Vec<i16>> {
self.is_being_read.store(true, atomic::Ordering::Release);
let mut frames = Vec::new();
while let Ok(frame) = self.receiver.recv().await {
frames.push(frame);
}
// Libérer et notifier les writers en attente
self.is_being_read.store(false, atomic::Ordering::Release);
self.read_done_notify.notify_waiters();
frames
} }
} }
// struct PreBuffer {
// // Vec dynamique pour stocker les trames
// frames: Vec<Vec<i16>>,
// // Compteur atomique pour le nombre de trames disponibles
// frame_count: AtomicUsize,
// // Flag atomique pour indiquer si le buffer est en cours de lecture
// is_being_read: AtomicBool,
// read_done_notify: Arc<Notify>,
// }
//
// impl PreBuffer {
// fn new() -> Self {
// Self {
// frames: Vec::new(),
// frame_count: AtomicUsize::new(0),
// is_being_read: AtomicBool::new(false),
// read_done_notify: Arc::new(Notify::new()),
// }
// }
//
// async fn push(&mut self, frame: Vec<i16>) {
// if self.is_being_read.load(atomic::Ordering::Acquire) {
// self.read_done_notify.notified().await;
// }
//
// self.frames.push(frame);
// self.frame_count.fetch_add(1, atomic::Ordering::Release);
// }
//
// fn reset(&mut self) {
// self.frame_count.store(0, atomic::Ordering::Release);
// }
//
// fn len(&self) -> usize {
// self.frame_count.load(atomic::Ordering::Acquire)
// }
//
// fn read_all(&mut self) -> Vec<Vec<i16>> {
// self.is_being_read.store(true, atomic::Ordering::Release);
//
// let frames = std::mem::take(&mut self.frames);
// self.frame_count.store(0, atomic::Ordering::Release);
//
// // Libérer et notifier les writers en attente
// self.is_being_read.store(false, atomic::Ordering::Release);
// self.read_done_notify.notify_waiters();
//
// frames
// }
// }
//

View File

@@ -0,0 +1,229 @@
// version non utilisé, c'était un prototype "push-based", jamais testé
use std::sync::{atomic, Arc};
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize};
use arc_swap::ArcSwap;
use bytes::Bytes;
use tokio::sync::{mpsc, Notify};
use crate::utils::ringbuf::{RingBufReader, RingBufWriter, RingBuffer};
pub struct Mixer {
// writer: Arc<RingBufWriter<i16>>,
// reader: Arc<RingBufReader<i16>>,
pre_buffer: Arc<PreBuffer>,
worker_sender: Option<mpsc::UnboundedSender<Vec<i16>>>,
buffer: Arc<ArcSwap<[i16; 1920]>>, // 1920 frames
}
impl Mixer {
pub fn new() -> Self {
let (writer, reader) = RingBuffer::<i16>::new(1024).split();
Self {
// writer: Arc::new(writer),
// reader: Arc::new(reader),
pre_buffer: Arc::new(PreBuffer::new()),
worker_sender: None,
buffer: Arc::new(ArcSwap::from_pointee([0i16; 1920])),
}
}
// Démarrer le worker de pré-traitement
pub async fn start(&mut self){
let (sender, mut receiver) = mpsc::unbounded_channel::<Vec<i16>>();
// let (writer, reader) = RingBuffer::<Bytes>::new(1024).split();
self.worker_sender = Some(sender);
let prebuffer = self.pre_buffer.clone();
// worker de pré-traitement
tokio::spawn(async move {
while let Some(data) = receiver.recv().await {
// data doit exactement faire 960 (mono) ou 1920 (stéréo), 20ms
// si il fait 960, on converti en stéréo
// on écrit dans un buffer de pré-traitement
// si data rempli pas les condition, on ignore
// on vérifie la taille des données
match data.len() {
960 => {
// Mono 20ms @ 48kHz - convertir en stéréo
let stereo_data = Self::mono_to_stereo(data);
// push dans un buffer de pré-traitement
prebuffer.push(stereo_data).await;
},
1920 => {
// push dans un buffer de pré-traitement
prebuffer.push(data).await;
}
_ => {
println!("⚠️ Données audio ignorées - taille incorrecte: {} bytes", data.len());
}
}
}
});
}
// Envoyer des données au pré-traitement
pub async fn write(&self, data: Vec<i16>){
if let Some(sender) = self.worker_sender.as_ref() {
let _ = sender.send(data);
}
}
// S'occupe de générer la trame qui sera lu dans 20ms
pub async fn mix(&self){
// récupérer le buffer de pré-traitement, qui sera tableau de Vec<i16> (le lock ? pour éviter que le worker de pré-traitement continue d'alimenter)
// le mixer (sum de chaque trame ?)
// le mettre dans un buffer, qui sera accessible par AudioPlayback
// écraser le buffer de pré-traitement
// (libérer le lock)
let frames = self.pre_buffer.read_all().await;
let mixed_frame = if frames.is_empty() {
[0i16; 1920] // Silence
} else {
Self::mix_frames(&frames)
};
// ✅ Swap direct - aucune conversion !
self.buffer.store(Arc::new(mixed_frame));
}
// Récupérer la trame présente qui est déjà pré-généré par mix
pub fn read(&self) -> [i16; 1920] {
// vider le buffer
**self.buffer.load()
}
}
impl Mixer {
// Functions helpers
fn mono_to_stereo(mono_samples: Vec<i16>) -> Vec<i16> {
let mut stereo_data = Vec::with_capacity(mono_samples.len() * 2);
// Chaque échantillon mono devient deux échantillons stéréo identiques
for sample in mono_samples {
stereo_data.push(sample); // Canal gauche
stereo_data.push(sample); // Canal droit
}
stereo_data
}
// Mixer plusieurs trames
fn mix_frames(frames: &[Vec<i16>]) -> [i16; 1920] {
let mut mixed = [0i32; 1920];
for frame in frames {
for (i, &sample) in frame.iter().enumerate() {
if i < 1920 {
mixed[i] += sample as i32;
}
}
}
let mut result = [0i16; 1920];
let count = frames.len() as i32;
for (i, &sample) in mixed.iter().enumerate() {
result[i] = (sample / count).clamp(i16::MIN as i32, i16::MAX as i32) as i16;
}
result
}
}
/// Pre buffer
#[derive(Clone)]
struct PreBuffer {
sender: Arc<kanal::AsyncSender<Vec<i16>>>,
receiver: Arc<kanal::AsyncReceiver<Vec<i16>>>,
is_being_read: Arc<AtomicBool>,
read_done_notify: Arc<Notify>,
}
impl PreBuffer {
fn new() -> Self {
let (sender, reader) = kanal::unbounded_async::<Vec<i16>>();
Self {
sender: Arc::new(sender),
receiver: Arc::new(reader),
is_being_read: Arc::new(AtomicBool::new(false)),
read_done_notify: Arc::new(Notify::new()),
}
}
async fn push(&self, frame: Vec<i16>) {
if self.is_being_read.load(atomic::Ordering::Acquire) {
self.read_done_notify.notified().await;
}
let _ = self.sender.send(frame);
}
async fn read_all(&self) -> Vec<Vec<i16>> {
self.is_being_read.store(true, atomic::Ordering::Release);
let mut frames = Vec::new();
while let Ok(frame) = self.receiver.recv().await {
frames.push(frame);
}
// Libérer et notifier les writers en attente
self.is_being_read.store(false, atomic::Ordering::Release);
self.read_done_notify.notify_waiters();
frames
}
}
// struct PreBuffer {
// // Vec dynamique pour stocker les trames
// frames: Vec<Vec<i16>>,
// // Compteur atomique pour le nombre de trames disponibles
// frame_count: AtomicUsize,
// // Flag atomique pour indiquer si le buffer est en cours de lecture
// is_being_read: AtomicBool,
// read_done_notify: Arc<Notify>,
// }
//
// impl PreBuffer {
// fn new() -> Self {
// Self {
// frames: Vec::new(),
// frame_count: AtomicUsize::new(0),
// is_being_read: AtomicBool::new(false),
// read_done_notify: Arc::new(Notify::new()),
// }
// }
//
// async fn push(&mut self, frame: Vec<i16>) {
// if self.is_being_read.load(atomic::Ordering::Acquire) {
// self.read_done_notify.notified().await;
// }
//
// self.frames.push(frame);
// self.frame_count.fetch_add(1, atomic::Ordering::Release);
// }
//
// fn reset(&mut self) {
// self.frame_count.store(0, atomic::Ordering::Release);
// }
//
// fn len(&self) -> usize {
// self.frame_count.load(atomic::Ordering::Acquire)
// }
//
// fn read_all(&mut self) -> Vec<Vec<i16>> {
// self.is_being_read.store(true, atomic::Ordering::Release);
//
// let frames = std::mem::take(&mut self.frames);
// self.frame_count.store(0, atomic::Ordering::Release);
//
// // Libérer et notifier les writers en attente
// self.is_being_read.store(false, atomic::Ordering::Release);
// self.read_done_notify.notify_waiters();
//
// frames
// }
// }
//

View File

@@ -1,8 +1,10 @@
use std::sync::Arc; use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::JoinHandle; use std::thread::JoinHandle;
use std::time::Instant;
use cpal::{default_host, BufferSize, Device, SampleRate, Stream, StreamConfig, SupportedStreamConfig}; use cpal::{default_host, BufferSize, Device, SampleRate, Stream, StreamConfig, SupportedStreamConfig};
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use crate::core::mixer::AudioMixer;
use crate::domain::event::{Event, EventBus}; use crate::domain::event::{Event, EventBus};
use crate::utils::real_time_event::RealTimeEvent; use crate::utils::real_time_event::RealTimeEvent;
@@ -17,7 +19,7 @@ pub struct AudioPlayback {
running: Arc<AtomicBool>, running: Arc<AtomicBool>,
stream: Option<Stream>, stream: Option<Stream>,
worker: Option<JoinHandle<()>>, worker: Option<JoinHandle<()>>,
next_tick: RealTimeEvent mixer: AudioMixer
} }
impl Speaker { impl Speaker {
@@ -33,16 +35,16 @@ impl Speaker {
Speaker::new(device) Speaker::new(device)
} }
pub fn get_input_config(&self) -> SupportedStreamConfig { pub fn get_output_config(&self) -> SupportedStreamConfig {
self.device.default_output_config().unwrap() self.device.default_output_config().unwrap()
} }
pub fn get_stream_config(&self) -> StreamConfig { pub fn get_stream_config(&self) -> StreamConfig {
let config = self.get_input_config(); let config = self.get_output_config();
let mut stream_config: StreamConfig = config.into(); let mut stream_config: StreamConfig = config.into();
stream_config.channels = 2; stream_config.channels = 2;
stream_config.sample_rate = SampleRate(48000); stream_config.sample_rate = SampleRate(48000);
stream_config.buffer_size = BufferSize::Fixed(1920); stream_config.buffer_size = BufferSize::Fixed(960);
stream_config stream_config
} }
@@ -58,40 +60,57 @@ impl Speaker {
|err| println!("Error output stream: {err}"), |err| println!("Error output stream: {err}"),
None None
).unwrap() ).unwrap()
} }
} }
impl AudioPlayback { impl AudioPlayback {
pub fn new(event_bus: EventBus, speaker: Speaker) -> Self { pub fn new(event_bus: EventBus, speaker: Speaker, mixer: AudioMixer) -> Self {
Self { Self {
event_bus, event_bus,
speaker, speaker,
running: Arc::new(AtomicBool::new(false)), running: Arc::new(AtomicBool::new(false)),
stream: None, stream: None,
worker: None, worker: None,
next_tick: RealTimeEvent::new(), mixer
} }
} }
pub fn default(event_bus: EventBus) -> Self { pub fn default(event_bus: EventBus, mixer: AudioMixer) -> Self {
let speaker = Speaker::default(); let speaker = Speaker::default();
AudioPlayback::new(event_bus, speaker) AudioPlayback::new(event_bus, speaker, mixer)
} }
pub async fn start(&mut self) { pub async fn start(&mut self) {
self.running.store(true, Ordering::SeqCst);
let stream_running = self.running.clone(); let stream_running = self.running.clone();
let event_bus = self.event_bus.clone(); let event_bus = self.event_bus.clone();
let mixer = self.mixer.clone();
// stream cpal // stream cpal
println!("Setting up audio playback stream..."); println!("Setting up audio playback stream...");
let stream = self.speaker.build_stream(move |data, _| { let last_time = Mutex::new(Instant::now());
let stream = self.speaker.build_stream(move |data, info| {
println!(
"CALLBACK : reçu {} samples, type info = {:?}",
data.len(),
info
);
let now = Instant::now();
let mut last = last_time.lock().unwrap();
let dt = now.duration_since(*last);
println!("Callback audio appelée chaque {:?} ms (≈ {:.1} Hz)", dt.as_millis(), 1000.0 / dt.as_millis().max(1) as f32);
*last = now;
if !stream_running.load(Ordering::Relaxed){ if !stream_running.load(Ordering::Relaxed){
return; return;
} }
// aller récupérer 1920 sur un buffer println!("Audio playback stream tick");
// écrire le contenu dans data
let _ = event_bus.emit(Event::PlaybackTick); let audio_mixer = mixer.read(data.len());
data.copy_from_slice(&audio_mixer);
// println!("data content : {:?}", data);
let _ = event_bus.emit_sync(Event::PlaybackTick(data.len()));
}); });
stream.play().unwrap(); stream.play().unwrap();
self.stream = Some(stream); self.stream = Some(stream);
@@ -99,7 +118,7 @@ impl AudioPlayback {
} }
pub async fn stop(&mut self) { pub async fn stop(&mut self) {
self.running.store(false, std::sync::atomic::Ordering::SeqCst); self.running.store(false, Ordering::SeqCst);
} }
} }

View File

@@ -53,7 +53,6 @@ impl AudioClient {
Ok(audio_frame) => { Ok(audio_frame) => {
// Pousser la frame complète dans le buffer // Pousser la frame complète dans le buffer
writer_clone.push_slice_overwrite(&audio_frame); writer_clone.push_slice_overwrite(&audio_frame);
println!("writed frame");
}, },
Err(e) => { Err(e) => {
eprintln!("Erreur de décodage audio : {}", e); eprintln!("Erreur de décodage audio : {}", e);
@@ -78,6 +77,21 @@ impl AudioClient {
sequence sequence
}); });
} }
pub fn read_audio(&self, size: usize) -> Option<Vec<i16>> {
if self.buffer_reader.len() < size {
return None;
}
let mut buffer = vec![0i16; size];
let read_count = self.buffer_reader.pop_slice(&mut buffer);
if read_count == size {
Some(buffer)
}else {
None
}
}
} }
@@ -107,4 +121,16 @@ impl AudioClientManager {
pub fn write_audio_to_client(&self, uuid: uuid::Uuid, sequence: u16, data: Bytes) { pub fn write_audio_to_client(&self, uuid: uuid::Uuid, sequence: u16, data: Bytes) {
let _ = self.audio_clients.get(&uuid).unwrap().write_audio(sequence, data); let _ = self.audio_clients.get(&uuid).unwrap().write_audio(sequence, data);
} }
pub fn take_audio_collection(&self, size: usize) -> Vec<Vec<i16>> {
let mut buffers = Vec::new();
for client in self.audio_clients.values() {
if let Some(buffer) = client.read_audio(size) {
buffers.push(buffer);
}
}
buffers
}
} }

View File

@@ -9,8 +9,8 @@ pub enum Event {
AudioIn(Vec<i16>), AudioIn(Vec<i16>),
AudioEncoded(Vec<u8>), AudioEncoded(Vec<u8>),
PlaybackTick, PlaybackTick(usize),
PlaybackRequest(Bytes), // PlaybackRequest(Bytes),
NetConnected, NetConnected,
NetDisconnected, NetDisconnected,

View File

@@ -8,6 +8,7 @@ use bytes::Bytes;
use parking_lot::RwLock; use parking_lot::RwLock;
use tokio::task::AbortHandle; use tokio::task::AbortHandle;
use uuid::Uuid; use uuid::Uuid;
use crate::core::mixer::AudioMixer;
use crate::domain::audio_client::{AudioClient, AudioClientManager}; use crate::domain::audio_client::{AudioClient, AudioClientManager};
use crate::domain::event::{Event, EventBus}; use crate::domain::event::{Event, EventBus};
use crate::network::protocol::{MessageClient, MessageServer}; use crate::network::protocol::{MessageClient, MessageServer};
@@ -28,6 +29,7 @@ pub struct Dispatcher {
tauri_handle: Arc<AppHandle>, tauri_handle: Arc<AppHandle>,
audio_client_manager: Arc<AudioClientManager>, audio_client_manager: Arc<AudioClientManager>,
audio_mixer: Arc<AudioMixer>,
// todo : temporaire, le temps d'avoir un handler // todo : temporaire, le temps d'avoir un handler
@@ -54,7 +56,12 @@ impl PingInfo {
} }
impl Dispatcher { impl Dispatcher {
pub fn new(event_bus: EventBus, udp_session: UdpSession, audio_client_manager: AudioClientManager, tauri_handle: AppHandle) -> Self { pub fn new(event_bus: EventBus,
udp_session: UdpSession,
audio_client_manager: AudioClientManager,
audio_mixer: AudioMixer,
tauri_handle: AppHandle
) -> Self {
Self { Self {
event_bus: Arc::new(event_bus), event_bus: Arc::new(event_bus),
udp_session: Arc::new(udp_session), udp_session: Arc::new(udp_session),
@@ -63,6 +70,7 @@ impl Dispatcher {
can_send_audio: Arc::new(AtomicBool::new(false)), can_send_audio: Arc::new(AtomicBool::new(false)),
ping_tracker: Arc::new(RwLock::new(HashMap::new())), ping_tracker: Arc::new(RwLock::new(HashMap::new())),
audio_client_manager: Arc::new(audio_client_manager), audio_client_manager: Arc::new(audio_client_manager),
audio_mixer: Arc::new(audio_mixer),
} }
} }
@@ -70,6 +78,7 @@ impl Dispatcher {
let (_udp_in_abort_handle, udp_in_sender) = self.udp_in_handler().await; let (_udp_in_abort_handle, udp_in_sender) = self.udp_in_handler().await;
let udp_session = self.udp_session.clone(); let udp_session = self.udp_session.clone();
let sequence_counter = self.sequence_counter.clone(); let sequence_counter = self.sequence_counter.clone();
let audio_mixer = self.audio_mixer.clone();
while let Ok(event) = receiver.recv().await { while let Ok(event) = receiver.recv().await {
match event { match event {
@@ -88,8 +97,8 @@ impl Dispatcher {
sequence_counter.fetch_add(1, Ordering::Relaxed); sequence_counter.fetch_add(1, Ordering::Relaxed);
} }
Event::PlaybackTick => { Event::PlaybackTick(paquet_size) => {
audio_mixer.mix_next_frame(paquet_size);
} }
Event::NetIn(message_event) => { Event::NetIn(message_event) => {
// println!("NetIn: {:?}", message_event); // println!("NetIn: {:?}", message_event);
@@ -143,6 +152,7 @@ impl Dispatcher {
} }
} }
MessageServer::Audio {user, sequence, data} => { MessageServer::Audio {user, sequence, data} => {
// println!("Audio received from {}: {} -> {:?}", user, sequence, data);
if audio_client_manager.audio_client_exists(user) { if audio_client_manager.audio_client_exists(user) {
audio_client_manager.write_audio_to_client(user, sequence, data); audio_client_manager.write_audio_to_client(user, sequence, data);
}else{ }else{