Files
ox_speak_client/src-tauri/src/core/mixer.rs
2025-07-19 03:45:41 +02:00

226 lines
7.0 KiB
Rust

use std::sync::{atomic, Arc};
use std::sync::atomic::{AtomicBool, AtomicUsize};
use bytes::Bytes;
use tokio::sync::{mpsc, Notify};
use crate::utils::ringbuf::{RingBufReader, RingBufWriter, RingBuffer};
pub struct Mixer {
// writer: Arc<RingBufWriter<i16>>,
// reader: Arc<RingBufReader<i16>>,
pre_buffer: Arc<PreBuffer>,
worker_sender: Option<mpsc::UnboundedSender<Vec<i16>>>,
buffer: Bytes // 1920 frames
}
impl Mixer {
pub fn new() -> Self {
let (writer, reader) = RingBuffer::<i16>::new(1024).split();
Self {
// writer: Arc::new(writer),
// reader: Arc::new(reader),
pre_buffer: Arc::new(PreBuffer::new()),
worker_sender: None,
}
}
// Démarrer le worker de pré-traitement
pub async fn start(&mut self){
let (sender, mut receiver) = mpsc::unbounded_channel::<Vec<i16>>();
// let (writer, reader) = RingBuffer::<Bytes>::new(1024).split();
self.worker_sender = Some(sender);
let prebuffer = self.pre_buffer.clone();
// worker de pré-traitement
tokio::spawn(async move {
while let Some(data) = receiver.recv().await {
// data doit exactement faire 960 (mono) ou 1920 (stéréo), 20ms
// si il fait 960, on converti en stéréo
// on écrit dans un buffer de pré-traitement
// si data rempli pas les condition, on ignore
// on vérifie la taille des données
match data.len() {
960 => {
// Mono 20ms @ 48kHz - convertir en stéréo
let stereo_data = Self::mono_to_stereo(data);
// push dans un buffer de pré-traitement
prebuffer.push(stereo_data).await;
},
1920 => {
// push dans un buffer de pré-traitement
prebuffer.push(data).await;
}
_ => {
println!("⚠️ Données audio ignorées - taille incorrecte: {} bytes", data.len());
}
}
}
});
}
// Envoyer des données au pré-traitement
pub async fn write(&self, data: Vec<i16>){
if let Some(sender) = self.worker_sender.as_ref() {
let _ = sender.send(data);
}
}
// S'occupe de générer la trame qui sera lu dans 20ms
pub async fn mix(&self){
// récupérer le buffer de pré-traitement, qui sera tableau de Vec<i16> (le lock ? pour éviter que le worker de pré-traitement continue d'alimenter)
// le mixer (sum de chaque trame ?)
// le mettre dans un buffer, qui sera accessible par AudioPlayback
// écraser le buffer de pré-traitement
// (libérer le lock)
let frames = self.pre_buffer.read_all().await;
let mixed_frame = if frames.is_empty() {
[0i16; 1920]
} else {
Self::mix_frames(&frames)
};
}
// Récupérer la trame présente qui est déjà pré-généré par mix
pub async fn read(&self) -> [i16; 1920]{
let buffer = self.buffer;
// vider le buffer
self.buffer =
buffer
}
}
impl Mixer {
// Functions helpers
fn mono_to_stereo(mono_samples: Vec<i16>) -> Vec<i16> {
let mut stereo_data = Vec::with_capacity(mono_samples.len() * 2);
// Chaque échantillon mono devient deux échantillons stéréo identiques
for sample in mono_samples {
stereo_data.push(sample); // Canal gauche
stereo_data.push(sample); // Canal droit
}
stereo_data
}
// Mixer plusieurs trames
fn mix_frames(frames: &[Vec<i16>]) -> [i16; 1920] {
let mut mixed = [0i32; 1920];
for frame in frames {
for (i, &sample) in frame.iter().enumerate() {
if i < 1920 {
mixed[i] += sample as i32;
}
}
}
let mut result = [0i16; 1920];
let count = frames.len() as i32;
for (i, &sample) in mixed.iter().enumerate() {
result[i] = (sample / count).clamp(i16::MIN as i32, i16::MAX as i32) as i16;
}
result
}
}
/// Pre buffer
#[derive(Clone)]
struct PreBuffer {
sender: Arc<kanal::AsyncSender<Vec<i16>>>,
receiver: Arc<kanal::AsyncReceiver<Vec<i16>>>,
is_being_read: Arc<AtomicBool>,
read_done_notify: Arc<Notify>,
}
impl PreBuffer {
fn new() -> Self {
let (sender, reader) = kanal::unbounded_async::<Vec<i16>>();
Self {
sender: Arc::new(sender),
receiver: Arc::new(reader),
is_being_read: Arc::new(AtomicBool::new(false)),
read_done_notify: Arc::new(Notify::new()),
}
}
async fn push(&self, frame: Vec<i16>) {
if self.is_being_read.load(atomic::Ordering::Acquire) {
self.read_done_notify.notified().await;
}
let _ = self.sender.send(frame);
}
async fn read_all(&self) -> Vec<Vec<i16>> {
self.is_being_read.store(true, atomic::Ordering::Release);
let mut frames = Vec::new();
while let Ok(frame) = self.receiver.recv().await {
frames.push(frame);
}
// Libérer et notifier les writers en attente
self.is_being_read.store(false, atomic::Ordering::Release);
self.read_done_notify.notify_waiters();
frames
}
}
// struct PreBuffer {
// // Vec dynamique pour stocker les trames
// frames: Vec<Vec<i16>>,
// // Compteur atomique pour le nombre de trames disponibles
// frame_count: AtomicUsize,
// // Flag atomique pour indiquer si le buffer est en cours de lecture
// is_being_read: AtomicBool,
// read_done_notify: Arc<Notify>,
// }
//
// impl PreBuffer {
// fn new() -> Self {
// Self {
// frames: Vec::new(),
// frame_count: AtomicUsize::new(0),
// is_being_read: AtomicBool::new(false),
// read_done_notify: Arc::new(Notify::new()),
// }
// }
//
// async fn push(&mut self, frame: Vec<i16>) {
// if self.is_being_read.load(atomic::Ordering::Acquire) {
// self.read_done_notify.notified().await;
// }
//
// self.frames.push(frame);
// self.frame_count.fetch_add(1, atomic::Ordering::Release);
// }
//
// fn reset(&mut self) {
// self.frame_count.store(0, atomic::Ordering::Release);
// }
//
// fn len(&self) -> usize {
// self.frame_count.load(atomic::Ordering::Acquire)
// }
//
// fn read_all(&mut self) -> Vec<Vec<i16>> {
// self.is_being_read.store(true, atomic::Ordering::Release);
//
// let frames = std::mem::take(&mut self.frames);
// self.frame_count.store(0, atomic::Ordering::Release);
//
// // Libérer et notifier les writers en attente
// self.is_being_read.store(false, atomic::Ordering::Release);
// self.read_done_notify.notify_waiters();
//
// frames
// }
// }
//