This commit is contained in:
2025-07-18 01:57:18 +02:00
commit 21164df8cd
60 changed files with 9988 additions and 0 deletions

7
src-tauri/.gitignore vendored Normal file
View File

@@ -0,0 +1,7 @@
# Generated by Cargo
# will have compiled files and executables
/target/
# Generated by Tauri
# will have schema files for capabilities auto-completion
/gen/schemas

5563
src-tauri/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

36
src-tauri/Cargo.toml Normal file
View File

@@ -0,0 +1,36 @@
[package]
name = "ox_speak_client"
version = "0.1.0"
description = "A Tauri App"
authors = ["you"]
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
# The `_lib` suffix may seem redundant but it is necessary
# to make the lib name unique and wouldn't conflict with the bin name.
# This seems to be only an issue on Windows, see https://github.com/rust-lang/cargo/issues/8519
name = "ox_speak_client_lib"
crate-type = ["staticlib", "cdylib", "rlib"]
[build-dependencies]
tauri-build = { version = "2", features = [] }
[dependencies]
tauri = { version = "2", features = [] }
tauri-plugin-opener = "2"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
parking_lot = "0.12"
tokio = { version = "1.46", features = ["full"] }
cpal = "0.16"
opus = "0.3"
strum = {version = "0.27", features = ["derive"] }
uuid = {version = "1.17", features = ["v4", "serde"] }
event-listener = "5.4"
bytes = "1.10"
moka = {version = "0.12", features = ["future"] }
arc-swap = "1.7"
crossbeam-channel = "0.5"

3
src-tauri/build.rs Normal file
View File

@@ -0,0 +1,3 @@
fn main() {
tauri_build::build()
}

View File

@@ -0,0 +1,10 @@
{
"$schema": "../gen/schemas/desktop-schema.json",
"identifier": "default",
"description": "Capability for the main window",
"windows": ["main"],
"permissions": [
"core:default",
"opener:default"
]
}

BIN
src-tauri/icons/128x128.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.4 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.8 KiB

BIN
src-tauri/icons/32x32.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 974 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.8 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.8 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.9 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.6 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 903 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.4 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.0 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.4 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.5 KiB

BIN
src-tauri/icons/icon.icns Normal file

Binary file not shown.

BIN
src-tauri/icons/icon.ico Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 85 KiB

BIN
src-tauri/icons/icon.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 14 KiB

1
src-tauri/src/app/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod ox_speak_app;

View File

@@ -0,0 +1,113 @@
use std::sync::Arc;
use std::time::Duration;
use tauri::{AppHandle, Emitter, Listener};
use tokio;
use tokio::sync::mpsc;
use crate::core::capture::AudioCapture;
use crate::domain::event::{Event, EventBus};
use crate::network::udp::UdpSession;
use crate::runtime::dispatcher::Dispatcher;
pub struct OxSpeakApp {
// Communication inter-thread
event_bus: EventBus,
dispatcher: Dispatcher,
event_rx: Option<mpsc::Receiver<Event>>,
// Network
udp_session: UdpSession,
// audio
audio_capture: AudioCapture,
// Tauri handle
tauri_handle: AppHandle
}
impl OxSpeakApp {
pub fn new(tauri_handle: AppHandle) -> Self {
println!("Initializing OxSpeakApp");
// Event_bus - communication inter-components
println!("Creating event bus");
let (event_bus, event_rx) = EventBus::new();
// Audio
// todo : pour le moment, paramètre par défaut, on verra plus tard pour dynamiser ça
println!("Initializing audio capture");
let audio_capture = AudioCapture::default(event_bus.clone());
// UdpSession
println!("Initializing UDP session");
let udp_session = UdpSession::new(event_bus.clone());
// Dispatcher - Communication inter-components
println!("Initializing event dispatcher");
let mut dispatcher = Dispatcher::new(event_bus.clone(), udp_session.clone(), tauri_handle.clone());
println!("OxSpeakApp initialization complete");
Self {
event_bus,
dispatcher,
event_rx: Some(event_rx),
udp_session,
audio_capture,
tauri_handle,
}
}
pub async fn start(&mut self) {
println!("Starting OxSpeakApp");
// dispatcher - lancement du process pour la communication inter-process
println!("Starting event dispatcher");
let mut dispatcher = self.dispatcher.clone();
// Prendre l'ownership du receiver (event_rx)
if let Some(event_rx) = self.event_rx.take() {
tokio::spawn(async move {
dispatcher.start(event_rx).await
});
}
// Démarrer la connexion réseau
println!("Connecting to UDP server at 127.0.0.1:5000");
self.udp_session.connect("127.0.0.1:5000").await;
// Démarrer l'audio-capture
println!("Starting audio capture");
self.audio_capture.start().await;
println!("OxSpeakApp started successfully");
let _ = self.tick_tasks().await;
}
pub async fn stop(&mut self) {
println!("Stopping OxSpeakApp");
println!("Stopping audio capture");
self.audio_capture.stop().await;
println!("OxSpeakApp stopped successfully");
}
fn setup_tauri_events(&self) {
println!("Setting up Tauri event listeners");
let event_bus = self.event_bus.clone();
self.tauri_handle.listen("call", |event| {
println!("Received 'call' event from frontend");
event.payload(); // sera le contenu de l'event
});
println!("Tauri event listeners setup complete");
}
async fn tick_tasks(&self) {
let event_bus = self.event_bus.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
interval.tick().await;
let _ = event_bus.emit(Event::TaskTick).await;
}
});
}
}

View File

@@ -0,0 +1,178 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::thread::JoinHandle;
use cpal::{default_host, BufferSize, Device, SampleRate, Stream, StreamConfig, SupportedStreamConfig};
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use crate::core::opus::AudioOpus;
use crate::domain::event::{Event, EventBus};
use crate::utils::ringbuf::RingBuffer;
#[derive(Clone)]
pub struct Microphone {
device: Device,
}
pub struct AudioCapture {
event_bus: EventBus,
microphone: Microphone,
running: Arc<AtomicBool>,
ring_buffer: RingBuffer<i16>,
steam: Option<Stream>,
worker: Option<JoinHandle<()>>,
}
impl Microphone {
pub fn new(device: Device) -> Self {
println!("Initializing microphone with device: {}", device.name().unwrap_or_else(|_| "Unknown".to_string()));
Self {
device
}
}
pub fn default() -> Self {
println!("Creating default microphone");
let host = default_host();
let device = host.default_input_device().unwrap();
Self::new(device)
}
pub fn get_input_config(&self) -> SupportedStreamConfig {
self.device.default_input_config().unwrap()
}
pub fn get_stream_config(&self) -> StreamConfig {
let config = self.get_input_config();
let mut stream_config: StreamConfig = config.into();
stream_config.channels = 1;
stream_config.sample_rate = SampleRate(48000);
stream_config.buffer_size = BufferSize::Fixed(960);
stream_config
}
pub fn build_stream<F>(&self, callback: F) -> Stream
where
F: FnMut(&[i16], &cpal::InputCallbackInfo) + Send + 'static,
{
let config = self.get_stream_config();
self.device.build_input_stream(
&config,
callback,
|err| println!("Error input stream: {err}"),
None
).unwrap()
}
}
impl AudioCapture {
pub fn new(event_bus: EventBus, microphone: Microphone) -> Self {
println!("Creating new AudioCapture instance");
Self {
event_bus,
microphone,
running: Arc::new(AtomicBool::new(false)),
ring_buffer: RingBuffer::new(4096),
steam: None,
worker: None,
}
}
pub fn default(event_bus: EventBus) -> Self {
println!("Creating default AudioCapture");
Self::new(event_bus, Microphone::default())
}
pub async fn start(&mut self) {
println!("Starting audio capture");
self.running.store(true, Ordering::Relaxed);
// stream cpal
println!("Setting up audio stream");
let writer = self.ring_buffer.writer();
let stream_running = self.running.clone();
let stream = self.microphone.build_stream(move |data, _| {
if !stream_running.load(Ordering::Relaxed){
return;
}
writer.push_slice_overwrite(data);
});
stream.play().unwrap();
self.steam = Some(stream);
println!("Audio stream started");
// Audio processing worker
println!("Starting audio processing worker");
self.run_processing_worker();
println!("Audio capture fully initialized");
}
pub async fn stop(&mut self) {
println!("Stopping audio capture");
self.running.store(false, Ordering::Relaxed);
println!("Releasing audio stream");
self.steam = None;
self.ring_buffer.force_wake_up();
// code possiblement bloquant, wrap vers un thread tokio bloquant
if let Some(worker) = self.worker.take() {
println!("Waiting for audio processing worker to finish");
tokio::task::spawn_blocking(move || {
worker.join().unwrap();
}).await.unwrap();
}
println!("Clearing ring buffer");
self.ring_buffer.clear();
println!("Audio capture stopped");
}
fn run_processing_worker(&mut self){
println!("Configuring audio processing worker");
let worker_running = self.running.clone();
let event_bus = self.event_bus.clone();
let input_config = self.microphone.get_input_config();
println!("Audio input config: sample rate: {}, channels: {}", input_config.sample_rate().0, input_config.channels());
let opus = AudioOpus::new(input_config.sample_rate().0, input_config.channels(), "voip");
let mut encoder = opus.create_encoder().unwrap();
let reader = self.ring_buffer.reader();
println!("Spawning audio processing thread");
self.worker = Some(thread::spawn(move || {
println!("Audio processing thread started");
let mut frame = [0i16; 960];
let mut frame_count = 0;
while worker_running.load(Ordering::Relaxed) {
let _ = reader.pop_slice_blocking(&mut frame);
if !worker_running.load(Ordering::Relaxed){
println!("Audio processing thread stopping");
break;
}
frame_count += 1;
if frame_count % 100 == 0 {
println!("Processed {} audio frames", frame_count);
}
let raw_data = frame.to_vec();
event_bus.emit_sync(Event::AudioIn(raw_data));
match encoder.encode(&frame){
Ok(encoded_data) => {
event_bus.emit_sync(Event::AudioEncoded(encoded_data))
}
Err(e) => {
println!("Error encoding: {e}");
}
}
}
println!("Audio processing thread finished after processing {} frames", frame_count);
}));
}
}
impl AudioCapture {
fn audio_processing(){
}
}

View File

@@ -0,0 +1 @@
// aller pick l'audio des clients

View File

@@ -0,0 +1,6 @@
pub mod capture;
pub mod mixer;
pub mod opus;
pub mod playback;
pub mod rms;
pub mod stats;

110
src-tauri/src/core/opus.rs Normal file
View File

@@ -0,0 +1,110 @@
use opus::{Application, Channels, Decoder, Encoder};
#[derive(Clone)]
pub struct AudioOpus{
sample_rate: u32,
channels: u16,
application: Application
}
impl AudioOpus {
pub fn new(sample_rate: u32, channels: u16, application: &str) -> Self {
let application = match application {
"voip" => Application::Voip,
"audio" => Application::Audio,
"lowdelay" => Application::LowDelay,
_ => Application::Voip,
};
Self{sample_rate, channels, application}
}
pub fn create_encoder(&self) -> Result<AudioOpusEncoder, String> {
AudioOpusEncoder::new(self.clone())
}
pub fn create_decoder(&self) -> Result<AudioOpusDecoder, String> {
AudioOpusDecoder::new(self.clone())
}
}
pub struct AudioOpusEncoder{
audio_opus: AudioOpus,
encoder: opus::Encoder,
}
impl AudioOpusEncoder {
fn new(audio_opus: AudioOpus) -> Result<Self, String> {
let opus_channel = match audio_opus.channels {
1 => Channels::Mono,
2 => Channels::Stereo,
_ => Channels::Mono,
};
let mut encoder = Encoder::new(audio_opus.sample_rate, opus_channel, audio_opus.application)
.map_err(|e| format!("Échec de création de l'encodeur: {:?}", e))?;
match audio_opus.application {
Application::Voip => {
// Paramètres optimaux pour VoIP: bonne qualité vocale, CPU modéré
let _ = encoder.set_bitrate(opus::Bitrate::Bits(24000)); // 24kbps est bon pour la voix
let _ = encoder.set_vbr(true); // Variable bitrate économise du CPU
let _ = encoder.set_vbr_constraint(false); // Sans contrainte stricte de débit
// Pas de set_complexity (non supporté par la crate)
},
Application::Audio => {
// Musique: priorité à la qualité
let _ = encoder.set_bitrate(opus::Bitrate::Bits(64000));
let _ = encoder.set_vbr(true);
},
Application::LowDelay => {
// Priorité à la latence et l'efficacité CPU
let _ = encoder.set_bitrate(opus::Bitrate::Bits(18000));
let _ = encoder.set_vbr(true);
},
}
Ok(Self{audio_opus, encoder})
}
pub fn encode(&mut self, frames: &[i16]) -> Result<Vec<u8>, String> {
let mut output = vec![0u8; 1276]; // 1276 octets (la vraie worst-case recommandée par Opus).
let len = self.encoder.encode(frames, output.as_mut_slice())
.map_err(|e| format!("Erreur encodage: {:?}", e))?;
output.truncate(len);
Ok(output)
}
// 🔄 Approche avec buffer réutilisable (encore plus optimal)
fn encode_reuse(&mut self, frames: &[i16], output: &mut Vec<u8>) -> Result<usize, String> {
output.clear();
output.resize(1276, 0);
let len = self.encoder.encode(frames, output.as_mut_slice()).unwrap();
output.truncate(len);
Ok(len)
}
}
pub struct AudioOpusDecoder{
audio_opus: AudioOpus,
decoder: opus::Decoder,
}
impl AudioOpusDecoder {
fn new(audio_opus: AudioOpus) -> Result<Self, String> {
let opus_channel = match audio_opus.channels {
1 => Channels::Mono,
2 => Channels::Stereo,
_ => Channels::Mono,
};
let decoder = Decoder::new(audio_opus.sample_rate, opus_channel)
.map_err(|e| format!("Échec de création du décodeur: {:?}", e))?;
Ok(Self{audio_opus, decoder})
}
pub fn decode(&mut self, frames: &[u8]) -> Result<Vec<i16>, String> {
let mut output = vec![0i16; 5760];
let len = self.decoder.decode(frames, output.as_mut_slice(), false).map_err(|e| format!("Erreur décodage: {:?}", e))?;
output.truncate(len);
Ok(output)
}
}

View File

@@ -0,0 +1,100 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::JoinHandle;
use cpal::{default_host, BufferSize, Device, SampleRate, Stream, StreamConfig, SupportedStreamConfig};
use cpal::traits::{DeviceTrait, HostTrait};
use crate::domain::event::EventBus;
use crate::utils::real_time_event::RealTimeEvent;
#[derive(Clone)]
pub struct Speaker {
device: Device
}
pub struct AudioPlayback {
event_bus: EventBus,
speaker: Speaker,
running: Arc<AtomicBool>,
stream: Option<Stream>,
worker: Option<JoinHandle<()>>,
next_tick: RealTimeEvent
}
impl Speaker {
pub fn new(device: Device) -> Self {
Speaker {
device
}
}
pub fn default() -> Self {
let host = default_host();
let device = host.default_output_device().unwrap();
Speaker::new(device)
}
pub fn get_input_config(&self) -> SupportedStreamConfig {
self.device.default_output_config().unwrap()
}
pub fn get_stream_config(&self) -> StreamConfig {
let config = self.get_input_config();
let mut stream_config: StreamConfig = config.into();
stream_config.channels = 2;
stream_config.sample_rate = SampleRate(48000);
stream_config.buffer_size = BufferSize::Fixed(1920);
stream_config
}
pub fn build_stream<F>(&self, callback: F) -> Stream
where
F: FnMut(&mut [i16], &cpal::OutputCallbackInfo) + Send + 'static,
{
let config = self.get_stream_config();
self.device.build_output_stream(
&config,
callback,
|err| println!("Error output stream: {err}"),
None
).unwrap()
}
}
impl AudioPlayback {
pub fn new(event_bus: EventBus, speaker: Speaker) -> Self {
Self {
event_bus,
speaker,
running: Arc::new(AtomicBool::new(false)),
stream: None,
worker: None,
next_tick: RealTimeEvent::new(),
}
}
pub fn default(event_bus: EventBus) -> Self {
let speaker = Speaker::default();
AudioPlayback::new(event_bus, speaker)
}
pub async fn start(&mut self) {
}
pub async fn stop(&mut self) {
self.running.store(false, std::sync::atomic::Ordering::SeqCst);
// stream cpal
println!("Setting up audio playback stream...");
let stream_running = self.running.clone();
let stream = self.speaker.build_stream(move |data, _| {
if !stream_running.load(Ordering::Relaxed){
return;
}
// aller récupérer 1920 sur un buffer
// écrire le contenu dans data
});
}
}

View File

View File

View File

@@ -0,0 +1,87 @@
use std::sync::Arc;
use std::sync::atomic::AtomicU32;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use bytes::{Bytes};
use crate::core::opus::{AudioOpus, AudioOpusDecoder};
use crate::utils::ringbuf::{RingBufReader, RingBufWriter, RingBuffer};
use crate::utils::shared_store::SharedArcMap;
struct AudioClient {
uuid: uuid::Uuid,
decode_sender: mpsc::Sender<DecodeRequest>,
buffer_reader: RingBufReader<Vec<i16>>,
buffer_writer: RingBufWriter<Vec<i16>>
}
struct DecodeRequest {
data: Bytes,
sequence: u16,
}
#[derive(Clone)]
struct AudioClientManager {
audio_clients: SharedArcMap<uuid::Uuid, AudioClient>,
}
impl AudioClient {
pub fn new() -> Self {
let (writer, reader) = RingBuffer::<Vec<i16>>::new(1024).split();
let (decode_sender, mut decode_reader) = mpsc::channel::<DecodeRequest>(100);
let decode_handle = tokio::spawn(async move {
let mut decoder = AudioOpus::new(44800, 1, "voip")
.create_decoder().unwrap();
let mut last_sequence: u16 = 0;
while let Some(request) = decode_reader.recv().await {
// si la séquence est "trop vieille" on la drop. (voir plus tard pour un système de ratrapage si c'est possible)
if last_sequence < request.sequence {
// todo : si le décodage est trop long, voir pour le mettre dans un thread
// avec let result = tokio::task::spawn_blocking({
// let data = request.data.clone();
// move || decoder.decode(&data)
// }).await.unwrap();
let start = std::time::Instant::now();
let result = decoder.decode(&request.data);
if start.elapsed() > Duration::from_millis(1) {
println!("⚠️ Frame drop possible: {:?}", start.elapsed());
}
match result {
Ok(audio_frame) => {
// Pousser la frame complète dans le buffer
writer.push(audio_frame);
},
Err(e) => {
eprintln!("Erreur de décodage audio : {}", e);
}
}
last_sequence = request.sequence;
}
}
});
Self {
uuid: uuid::Uuid::new_v4(),
decode_sender,
buffer_reader: reader,
buffer_writer: writer,
}
}
pub async fn write_audio(&self, sequence: u16, data: Bytes) {
let _ = self.decode_sender.send(DecodeRequest {
data,
sequence
});
}
}
impl AudioClientManager {
fn new() -> Self {
Self {
audio_clients: SharedArcMap::new()
}
}
}

View File

@@ -0,0 +1,46 @@
use tokio::sync::mpsc;
use crate::network::protocol::{MessageClient, MessageServer};
pub enum Event {
AppStarted,
AppStopped,
AudioIn(Vec<i16>),
AudioEncoded(Vec<u8>),
NetConnected,
NetDisconnected,
NetIn(MessageServer),
NetOut(MessageClient),
UiStarted,
UiStopped,
TaskTick
}
#[derive(Clone)]
pub struct EventBus {
pub sender: mpsc::Sender<Event>
}
impl EventBus {
pub fn new() -> (Self, mpsc::Receiver<Event>) {
let (sender, receiver) = mpsc::channel(4096);
(Self { sender }, receiver)
}
pub async fn emit(&self, event: Event) {
// s'utilise de cette façon : bus.emit(Event::AudioIn {Vec[0,1,2,3]}.await;
let _ = self.sender.send(event).await;
}
pub fn emit_sync(&self, event: Event) {
let _ = self.sender.try_send(event);
}
pub fn clone_sender(&self) -> mpsc::Sender<Event> {
self.sender.clone()
}
}

View File

@@ -0,0 +1,2 @@
pub mod event;
pub mod audio_client;

7
src-tauri/src/lib.rs Normal file
View File

@@ -0,0 +1,7 @@
pub mod app;
pub mod core;
pub mod domain;
pub mod network;
pub mod runtime;
pub mod utils;
pub mod tauri_ctx;

7
src-tauri/src/main.rs Normal file
View File

@@ -0,0 +1,7 @@
// Prevents additional console window on Windows in release, DO NOT REMOVE!!
#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")]
#[tokio::main]
async fn main() {
ox_speak_client_lib::tauri_ctx::run().await;
}

View File

@@ -0,0 +1,2 @@
pub mod protocol;
pub mod udp;

View File

@@ -0,0 +1,281 @@
use std::collections::HashSet;
use std::net::SocketAddr;
use bytes::{Bytes, BytesMut, Buf, BufMut};
use uuid::Uuid;
use strum::{EnumIter, FromRepr};
#[repr(u8)]
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, EnumIter, FromRepr)]
pub enum UDPMessageType {
Ping = 0,
Audio = 1,
// Futurs types ici...
}
/// Messages client → serveur (SERIALIZE ONLY)
#[derive(Debug, Clone, PartialEq)]
pub enum MessageClient {
Ping { message_id: Uuid },
Audio { sequence: u16, data: Bytes }, // Utilisation de Bytes pour zero-copy
}
/// Messages serveur → client (DESERIALIZE ONLY)
#[derive(Debug, Clone, PartialEq)]
pub enum MessageServer {
Ping { message_id: Uuid },
Audio { user: Uuid, sequence: u16, data: Bytes },
}
#[derive(Debug, Clone, PartialEq)]
pub struct UDPMessage {
pub data: MessageServer,
pub address: SocketAddr,
pub size: usize,
}
#[derive(Debug, Clone, PartialEq)]
pub enum ParseError {
EmptyData,
InvalidData,
InvalidMessageType,
InvalidUuid,
}
impl From<uuid::Error> for ParseError {
fn from(_: uuid::Error) -> Self {
ParseError::InvalidUuid
}
}
impl UDPMessageType {
pub fn from_u8(value: u8) -> Option<Self> {
Self::from_repr(value)
}
pub fn to_u8(self) -> u8 {
self as u8
}
pub fn from_message(message: &[u8]) -> Option<Self> {
if message.is_empty() {
return None;
}
Self::from_u8(message[0])
}
}
impl MessageClient {
/// Sérialisation optimisée avec BytesMut
pub fn to_bytes(&self) -> Bytes {
match self {
Self::Ping { message_id } => {
let mut buf = BytesMut::with_capacity(17);
buf.put_u8(UDPMessageType::Ping as u8);
buf.put_slice(message_id.as_bytes());
buf.freeze()
}
Self::Audio { sequence, data } => {
let mut buf = BytesMut::with_capacity(3 + data.len());
buf.put_u8(UDPMessageType::Audio as u8);
buf.put_u16(*sequence);
buf.put_slice(data);
buf.freeze()
}
}
}
pub fn to_vec(&self) -> Vec<u8> {
self.to_bytes().to_vec()
}
pub fn message_type(&self) -> UDPMessageType {
match self {
Self::Ping { .. } => UDPMessageType::Ping,
Self::Audio { .. } => UDPMessageType::Audio,
}
}
pub fn size(&self) -> usize {
match self {
Self::Ping { .. } => 17, // 1 + 16 (UUID)
Self::Audio { data, .. } => 3 + data.len(), // 1 + 2 + audio_data
}
}
// Constructeurs
pub fn ping(message_id: Uuid) -> Self {
Self::Ping { message_id }
}
pub fn audio(sequence: u16, data: Bytes) -> Self {
Self::Audio { sequence, data }
}
}
impl MessageServer {
/// Parsing zero-copy depuis Bytes
pub fn from_bytes(mut data: Bytes) -> Result<Self, ParseError> {
if data.is_empty() {
return Err(ParseError::EmptyData);
}
let msg_type = data.get_u8(); // Consomme 1 byte
match msg_type {
0 => { // Ping
if data.remaining() < 16 {
return Err(ParseError::InvalidData);
}
let uuid_bytes = data.split_to(16); // Zero-copy split
let message_id = Uuid::from_slice(&uuid_bytes)?;
Ok(Self::Ping { message_id })
}
1 => { // Audio
if data.remaining() < 18 { // 16 (UUID) + 2 (sequence)
return Err(ParseError::InvalidData);
}
let user_bytes = data.split_to(16);
let user = Uuid::from_slice(&user_bytes)?;
let sequence = data.get_u16();
let audio_data = data; // Le reste pour l'audio
Ok(Self::Audio { user, sequence, data: audio_data })
}
_ => Err(ParseError::InvalidMessageType),
}
}
/// Parsing depuis &[u8] - conversion simple vers Bytes puis appel from_bytes
pub fn from_slice(data: &[u8]) -> Result<Self, ParseError> {
let bytes = Bytes::copy_from_slice(data);
Self::from_bytes(bytes)
}
pub fn message_type(&self) -> UDPMessageType {
match self {
Self::Ping { .. } => UDPMessageType::Ping,
Self::Audio { .. } => UDPMessageType::Audio,
}
}
pub fn size(&self) -> usize {
match self {
Self::Ping { .. } => 17, // 1 + 16 (UUID)
Self::Audio { data, .. } => 19 + data.len(), // 1 + 16 + 2 + audio_data
}
}
// Constructeurs
pub fn ping(message_id: Uuid) -> Self {
Self::Ping { message_id }
}
pub fn audio(user: Uuid, sequence: u16, data: Bytes) -> Self {
Self::Audio { user, sequence, data }
}
}
impl UDPMessage {
/// Parsing depuis slice → Bytes (zero-copy si possible)
pub fn from_bytes(address: SocketAddr, data: &[u8]) -> Result<Self, ParseError> {
let original_size = data.len();
let bytes = Bytes::copy_from_slice(data); // Seule allocation
let data = MessageServer::from_bytes(bytes)?;
Ok(Self {
data,
address,
size: original_size
})
}
// Constructeurs
pub fn ping(address: SocketAddr, message_id: Uuid) -> Self {
let data = MessageServer::ping(message_id);
let size = data.size();
Self { data, address, size }
}
pub fn audio(address: SocketAddr, user: Uuid, sequence: u16, data: Bytes) -> Self {
let msg_data = MessageServer::audio(user, sequence, data);
let size = msg_data.size();
Self { data: msg_data, address, size }
}
// Helpers pour récupérer certains éléments des messages
pub fn get_message_id(&self) -> Option<Uuid> {
match &self.data {
MessageServer::Ping { message_id } => Some(*message_id),
_ => None,
}
}
pub fn get_user(&self) -> Option<Uuid> {
match &self.data {
MessageServer::Audio { user, .. } => Some(*user),
_ => None,
}
}
pub fn get_sequence(&self) -> Option<u16> {
match &self.data {
MessageServer::Audio { sequence, .. } => Some(*sequence),
_ => None,
}
}
pub fn get_audio_data(&self) -> Option<&Bytes> {
match &self.data {
MessageServer::Audio { data, .. } => Some(data),
_ => None,
}
}
pub fn message_type(&self) -> UDPMessageType {
self.data.message_type()
}
pub fn size(&self) -> usize {
self.size
}
pub fn address(&self) -> SocketAddr {
self.address
}
}
impl std::fmt::Display for ParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ParseError::EmptyData => write!(f, "Empty data received"),
ParseError::InvalidData => write!(f, "Invalid data format"),
ParseError::InvalidMessageType => write!(f, "Invalid message type"),
ParseError::InvalidUuid => write!(f, "Invalid UUID format"),
}
}
}
impl std::error::Error for ParseError {}
/// Fonction utilitaire pour inspecter rapidement un message
pub fn peek_message_type(data: &[u8]) -> Option<UDPMessageType> {
if data.is_empty() {
return None;
}
UDPMessageType::from_u8(data[0])
}
/// Validation rapide sans parsing complet
pub fn validate_message_format(data: &[u8]) -> Result<UDPMessageType, ParseError> {
if data.is_empty() {
return Err(ParseError::EmptyData);
}
let msg_type = UDPMessageType::from_u8(data[0])
.ok_or(ParseError::InvalidMessageType)?;
// Validation basique des longueurs
match msg_type {
UDPMessageType::Ping if data.len() != 17 => Err(ParseError::InvalidData),
UDPMessageType::Audio if data.len() < 19 => Err(ParseError::InvalidData),
_ => Ok(msg_type),
}
}

View File

@@ -0,0 +1,216 @@
use std::net::SocketAddr;
use std::sync::Arc;
use parking_lot::RwLock;
use tokio::net::UdpSocket;
use tokio::task::AbortHandle;
use tokio::time::sleep;
use bytes::Bytes;
use crate::domain::event::{Event, EventBus};
use crate::network::protocol::{MessageClient, MessageServer};
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum UdpSessionState {
Disconnected,
Connecting,
Connected,
}
struct UdpSessionInner {
socket: Option<Arc<UdpSocket>>,
abort_handle: Option<Arc<AbortHandle>>,
state: UdpSessionState,
}
#[derive(Clone)]
pub struct UdpSession {
inner: Arc<RwLock<UdpSessionInner>>,
event_bus: EventBus,
}
impl UdpSession {
pub fn new(event_bus: EventBus) -> Self {
Self {
inner: Arc::new(RwLock::new(UdpSessionInner {
socket: None,
abort_handle: None,
state: UdpSessionState::Disconnected,
})),
event_bus,
}
}
pub async fn connect(&self, addr: &str) -> bool {
// Mettre à jour l'état à "Connecting"
{
let mut inner = self.inner.write();
inner.state = UdpSessionState::Connecting;
}
let server_addr: SocketAddr = match addr.parse() {
Ok(addr) => addr,
Err(_) => {
let mut inner = self.inner.write();
inner.state = UdpSessionState::Disconnected;
return false;
},
};
let socket = match UdpSocket::bind("0.0.0.0:0").await {
Ok(socket) => Arc::new(socket),
Err(_) => {
let mut inner = self.inner.write();
inner.state = UdpSessionState::Disconnected;
return false;
},
};
match socket.connect(server_addr).await {
Ok(_) => {},
Err(_) => {
let mut inner = self.inner.write();
inner.state = UdpSessionState::Disconnected;
return false;
}
};
// Réception en arrière-plan
let recv_socket = Arc::clone(&socket);
let event_bus = self.event_bus.clone();
println!("About to spawn receive task...");
let recv_task = tokio::spawn(async move {
println!("Receive task started! Listen {}", recv_socket.local_addr().unwrap());
event_bus.emit(Event::NetConnected).await;
let mut buf = [0u8; 1500];
loop {
match recv_socket.recv(&mut buf).await {
Ok((size)) => {
if let Ok(msg) = MessageServer::from_slice(&buf[..size]) {
event_bus.emit(Event::NetIn(msg)).await;
}
}
Err(_) => {
event_bus.emit(Event::NetDisconnected).await;
break;
},
}
}
});
// Tout mettre à jour en une fois
{
let mut inner = self.inner.write();
inner.socket = Some(socket);
inner.abort_handle = Some(Arc::new(recv_task.abort_handle()));
inner.state = UdpSessionState::Connected;
}
true
}
pub async fn send(&self, msg: MessageClient) -> bool {
// Récupérer la socket si connecté
let socket = {
let inner = self.inner.read();
if inner.state != UdpSessionState::Connected {
return false;
}
inner.socket.as_ref().map(Arc::clone)
};
match socket {
Some(socket) => {
match socket.send(&msg.to_bytes()).await {
Ok(_) => {
// notifier l'eventbus de l'envoie du message
self.event_bus.emit(Event::NetOut(msg)).await;
true
},
Err(_) => {
// En cas d'erreur, marquer comme déconnecté
let mut inner = self.inner.write();
inner.state = UdpSessionState::Disconnected;
false
}
}
}
None => false,
}
}
pub async fn disconnect(&self) {
// Attendre que la connexion soit établie si elle est en cours
loop {
let state = {
let inner = self.inner.read();
inner.state
};
if state != UdpSessionState::Connecting {
break;
}
sleep(std::time::Duration::from_millis(100)).await;
}
// Tout nettoyer en une fois
let abort_handle = {
let mut inner = self.inner.write();
let handle = inner.abort_handle.take();
inner.socket = None;
inner.state = UdpSessionState::Disconnected;
handle
};
// Arrêter la tâche de réception (en dehors du lock)
if let Some(handle) = abort_handle {
handle.abort();
}
}
pub async fn reconnect(&self, addr: &str) -> bool {
println!("Attempting to reconnect to {}", addr);
// Déconnecter proprement d'abord
self.disconnect().await;
// Attendre un peu avant de reconnecter
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
// Tentatives de reconnexion (3 essais)
for attempt in 1..=3 {
println!("Reconnection attempt {}/3", attempt);
if self.connect(addr).await {
println!("Reconnection successful!");
return true;
}
if attempt < 3 {
println!("Reconnection failed, waiting before retry...");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
println!("All reconnection attempts failed");
false
}
// Méthodes utilitaires
pub fn get_state(&self) -> UdpSessionState {
let inner = self.inner.read();
inner.state
}
pub fn is_connected(&self) -> bool {
self.get_state() == UdpSessionState::Connected
}
pub fn is_connecting(&self) -> bool {
self.get_state() == UdpSessionState::Connecting
}
pub fn is_disconnected(&self) -> bool {
self.get_state() == UdpSessionState::Disconnected
}
}

View File

@@ -0,0 +1,185 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
use std::time::Instant;
use tauri::{AppHandle, Emitter};
use tokio::sync::mpsc;
use bytes::Bytes;
use parking_lot::RwLock;
use tokio::task::AbortHandle;
use uuid::Uuid;
use crate::domain::event::{Event, EventBus};
use crate::network::protocol::{MessageClient, MessageServer};
use crate::network::udp::UdpSession;
#[derive(Debug, Clone)]
struct PingInfo {
sent_at: Instant,
response_time: Option<u64>, // en milliseconds
}
#[derive(Clone)]
pub struct Dispatcher {
event_bus: EventBus,
udp_session: UdpSession,
tauri_handle: AppHandle,
// todo : temporaire, le temps d'avoir un handler
sequence_counter: Arc<AtomicU16>,
can_send_audio: Arc<AtomicBool>,
ping_tracker: Arc<RwLock<HashMap<Uuid, PingInfo>>>,
}
impl PingInfo {
fn new() -> Self {
Self {
sent_at: Instant::now(),
response_time: None,
}
}
fn complete(&mut self) {
self.response_time = Some(self.sent_at.elapsed().as_millis() as u64);
}
fn is_completed(&self) -> bool {
self.response_time.is_some()
}
}
impl Dispatcher {
pub fn new(event_bus: EventBus, udp_session: UdpSession, tauri_handle: AppHandle) -> Self {
Self {
event_bus,
udp_session,
sequence_counter: Arc::new(AtomicU16::new(0)),
tauri_handle,
can_send_audio: Arc::new(AtomicBool::new(false)),
ping_tracker: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn start(&mut self, mut receiver: mpsc::Receiver<Event>) {
let (_udp_in_abort_handle, udp_in_sender) = self.udp_in_handler().await;
let udp_session = self.udp_session.clone();
let sequence_counter = self.sequence_counter.clone();
while let Some(event) = receiver.recv().await {
match event {
Event::AudioIn(sample) => {
}
Event::AudioEncoded(sample_encoded) => {
// Conversion de Vec<u8> vers Bytes
let bytes_sample_encoded = Bytes::from(sample_encoded);
let message = MessageClient::audio(
sequence_counter.load(Ordering::Relaxed),
bytes_sample_encoded
);
udp_session.send(message).await;
sequence_counter.fetch_add(1, Ordering::Relaxed);
}
Event::NetIn(message_event) => {
println!("NetIn: {:?}", message_event);
let _ = udp_in_sender.send(message_event).await;
}
Event::NetOut(message_call) => {
if let MessageClient::Ping { message_id } = message_call {
self.ping_tracker.write().insert(message_id, PingInfo::new());
}
}
Event::NetConnected => {
// envoyer un ping pour annoncer au serveur son existence.
udp_session.send(MessageClient::ping(Uuid::new_v4())).await;
}
Event::NetDisconnected => {
println!("Network disconnected, attempting reconnection...");
// Lancer la reconnexion en arrière-plan
udp_session.reconnect("127.0.0.1:5000").await;
}
Event::TaskTick => {
let ping_id = Uuid::new_v4();
udp_session.send(MessageClient::ping(ping_id)).await;
}
_ => {
println!("Event non prit en charge !")
}
}
}
}
pub async fn udp_in_handler(&self) -> (AbortHandle, mpsc::Sender<MessageServer>) {
let (sender, mut consumer) = mpsc::channel::<MessageServer>(1024);
let ping_tracker = Arc::clone(&self.ping_tracker);
let task = tokio::spawn(async move {
while let Some(message) = consumer.recv().await {
match message {
MessageServer::Ping {message_id} => {
// Réponse au ping reçue
let mut tracker = ping_tracker.write();
if let Some(ping_info) = tracker.get_mut(&message_id) {
ping_info.complete();
println!("Ping response: {} -> {}ms",
message_id,
ping_info.response_time.unwrap()
);
} else {
println!("Received ping response for unknown ID: {}", message_id);
}
}
MessageServer::Audio {user, sequence, data} => {
// Audio reçu
}
}
}
});
(task.abort_handle(), sender)
}
// todo : ce qui suit est temporaire le temps d'avoir une vrai gestion de ping
pub fn get_ping_stats(&self) -> (u32, u32, Option<u64>) {
let tracker = self.ping_tracker.read();
let total_pings = tracker.len() as u32;
let completed_pings = tracker.values().filter(|p| p.is_completed()).count() as u32;
let avg_response_time = if completed_pings > 0 {
let total_time: u64 = tracker.values()
.filter_map(|p| p.response_time)
.sum();
Some(total_time / completed_pings as u64)
} else {
None
};
(total_pings, completed_pings, avg_response_time)
}
pub fn get_recent_pings(&self, limit: usize) -> Vec<(Uuid, u64)> {
let tracker = self.ping_tracker.read();
tracker.iter()
.filter_map(|(id, info)| {
info.response_time.map(|time| (*id, time))
})
.take(limit)
.collect()
}
// Nettoyer les vieux pings
pub fn cleanup_old_pings(&self, max_age_seconds: u64) {
let mut tracker = self.ping_tracker.write();
let now = Instant::now();
tracker.retain(|_, ping_info| {
now.duration_since(ping_info.sent_at).as_secs() < max_age_seconds
});
}
}

View File

@@ -0,0 +1 @@
pub mod dispatcher;

View File

@@ -0,0 +1,65 @@
use tauri::Manager;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::app::ox_speak_app::OxSpeakApp;
pub struct AppState {
pub app: Arc<Mutex<OxSpeakApp>>,
}
// Séparation du generate_context, sinon l'RustRover (l'ide) rame énormément dans la fonction run
fn get_tauri_context() -> tauri::Context<tauri::Wry> {
tauri::generate_context!()
}
#[cfg_attr(mobile, tauri::mobile_entry_point)]
pub async fn run() {
println!("Starting OxSpeak application");
tauri::async_runtime::set(tokio::runtime::Handle::current());
println!("Tokio runtime set");
println!("Generating Tauri context");
let context = get_tauri_context();
println!("Building Tauri application");
tauri::Builder::default()
.plugin(tauri_plugin_opener::init())
.setup(|app| {
println!("Setting up Tauri application");
let my_app = OxSpeakApp::new(app.handle().clone());
let app_state = AppState {
app: Arc::new(Mutex::new(my_app))
};
app.manage(app_state);
println!("App state managed by Tauri");
let tauri_handle = app.handle().clone();
println!("Spawning async task to start the application");
tauri::async_runtime::spawn(async move {
let state = tauri_handle.state::<AppState>();
let mut my_app = state.app.lock().await;
my_app.start().await;
});
println!("Tauri setup complete");
Ok(())
})
// .invoke_handler(tauri::generate_handler![greet])
.run(context)
.expect("error while running tauri application");
}
// Learn more about Tauri commands at https://tauri.app/develop/calling-rust/
#[tauri::command]
async fn greet(name: &str) -> Result<String, String> {
println!("Hello from Rust: {}", name);
if name.is_empty() {
return Err("Le nom ne peut pas être vide".to_string());
}
// Simulation d'une opération async qui pourrait échouer
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Ok(format!("Hello, {}!!! You've been greeted from Rust!", name))
}

View File

@@ -0,0 +1,3 @@
pub mod ringbuf;
pub mod real_time_event;
pub mod shared_store;

View File

@@ -0,0 +1,46 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use event_listener::{Event, Listener};
struct RealTimeEventInner{
flag: AtomicBool,
event: Event,
}
#[derive(Clone)]
pub struct RealTimeEvent {
inner: Arc<RealTimeEventInner>,
}
impl RealTimeEvent{
pub fn new() -> Self{
Self{
inner: Arc::new(RealTimeEventInner{
flag: AtomicBool::new(false),
event: Event::new(),
})
}
}
pub fn notify(&self){
self.inner.flag.store(true, Ordering::Release);
self.inner.event.notify(usize::MAX);
}
pub fn wait(&self){
loop {
let listener = self.inner.event.listen();
if self.inner.flag.swap(false, Ordering::Acquire){
break
}
listener.wait();
}
}
}
impl Default for RealTimeEvent{
fn default() -> Self{
Self::new()
}
}

View File

@@ -0,0 +1,772 @@
// Optimisé pour performance audio temps réel avec overwrite automatique
// Version améliorée avec batch processing et gestion intelligente de l'overwrite
// todo : Code généré par IA, je le comprend pas trop trop encore, à peaufiner quand je maitriserais un peu mieux Rust.
use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use crate::utils::real_time_event::RealTimeEvent;
// ============================================================================
// STRUCTURES PRINCIPALES
// ============================================================================
/// Celui qui écrit dans le buffer (producteur)
pub struct RingBufWriter<T> {
inner: Arc<InnerRingBuf<T>>,
}
/// Celui qui lit depuis le buffer (consommateur)
pub struct RingBufReader<T> {
inner: Arc<InnerRingBuf<T>>,
}
/// Le buffer circulaire interne partagé entre writer et reader
struct InnerRingBuf<T> {
// Le buffer qui contient nos données
buffer: Vec<UnsafeCell<T>>,
// Position où on écrit les nouvelles données
tail: AtomicUsize,
// Position où on lit les données
head: AtomicUsize,
// Pour réveiller le reader quand il y a des nouvelles données
notify: RealTimeEvent,
// Taille du buffer
cap: usize,
// Masque pour optimiser les calculs (cap - 1)
// Au lieu de faire "index % cap", on fait "index & mask" (plus rapide)
mask: usize,
}
// On dit à Rust que c'est safe de partager entre threads
unsafe impl<T: Send> Send for InnerRingBuf<T> {}
unsafe impl<T: Send> Sync for InnerRingBuf<T> {}
// ============================================================================
// FONCTION DE CRÉATION
// ============================================================================
/// Crée un nouveau ring buffer
/// IMPORTANT: cap DOIT être une puissance de 2 (2, 4, 8, 16, 32, 64, 128...)
/// Pourquoi ? Pour l'optimisation avec le masque binaire
pub fn ringbuf<T>(cap: usize) -> (RingBufWriter<T>, RingBufReader<T>) {
let buffer = RingBuffer::new(cap);
buffer.split()
}
#[derive(Clone)]
pub struct RingBuffer<T> {
inner: Arc<InnerRingBuf<T>>,
}
impl<T> RingBuffer<T> {
pub fn new(cap: usize) -> Self {
// Vérifications de sécurité
assert!(cap > 0, "La capacité doit être > 0");
assert!(cap.is_power_of_two(), "La capacité doit être une puissance de 2 (ex: 8, 16, 32...)");
// Crée le buffer avec des cases vides
let mut buffer = Vec::with_capacity(cap);
for _ in 0..cap {
// UnsafeCell permet de modifier même quand c'est partagé entre threads
// On met des valeurs "poubelle" au début
buffer.push(UnsafeCell::new(unsafe { std::mem::zeroed() }));
}
// Crée la structure interne
let inner = Arc::new(InnerRingBuf {
buffer,
tail: AtomicUsize::new(0), // On commence à écrire à l'index 0
head: AtomicUsize::new(0), // On commence à lire à l'index 0
notify: RealTimeEvent::new(),
cap,
mask: cap - 1, // Si cap=8, mask=7. 7 en binaire = 0111
});
Self {
inner
}
}
pub fn writer(&self) -> RingBufWriter<T> {
RingBufWriter { inner: self.inner.clone() }
}
pub fn reader(&self) -> RingBufReader<T> {
RingBufReader { inner: self.inner.clone() }
}
/// Récupère writer et reader en gardant l'accès au buffer original.
/// Utile pour : struct fields, monitoring, accès multiples.
pub fn both(&self) -> (RingBufWriter<T>, RingBufReader<T>) {
(
RingBufWriter { inner: self.inner.clone() },
RingBufReader { inner: self.inner.clone() }
)
}
/// Consomme le buffer et retourne writer/reader (optimisé).
/// Plus efficace que both() - évite 1 clone.
/// Utile pour : setup initial, factory functions.
pub fn split(self) -> (RingBufWriter<T>, RingBufReader<T>) {
(
RingBufWriter { inner: self.inner.clone() },
RingBufReader { inner: self.inner } // Move optimisé
)
}
/// 📊 Méthodes utilitaires directement sur le buffer
pub fn len(&self) -> usize {
let head = self.inner.head.load(Ordering::Relaxed);
let tail = self.inner.tail.load(Ordering::Relaxed);
(tail.wrapping_sub(head)) & self.inner.mask
}
pub fn is_empty(&self) -> bool {
let head = self.inner.head.load(Ordering::Relaxed);
let tail = self.inner.tail.load(Ordering::Relaxed);
head == tail
}
pub fn capacity(&self) -> usize {
self.inner.cap
}
pub fn clear(&self) {
let tail = self.inner.tail.load(Ordering::Acquire);
self.inner.head.store(tail, Ordering::Release);
}
pub fn force_wake_up(&self) {
self.inner.notify.notify()
}
}
// ============================================================================
// IMPLÉMENTATION DU WRITER (celui qui écrit) - VERSION OPTIMISÉE
// ============================================================================
impl<T: Copy> RingBufWriter<T> {
/// Ajoute un élément dans le buffer
/// Si le buffer est plein, écrase les anciens éléments
pub fn push(&self, value: T) {
// 1. Récupère la position actuelle d'écriture
let tail = self.inner.tail.load(Ordering::Relaxed);
// 2. Calcule la prochaine position (avec le masque pour optimiser)
let next_tail = (tail + 1) & self.inner.mask;
// 3. Vérifie si on va rattraper le lecteur
let head = self.inner.head.load(Ordering::Acquire);
if next_tail == head {
// Buffer plein ! On fait avancer le head pour écraser
let new_head = (head + 1) & self.inner.mask;
self.inner.head.store(new_head, Ordering::Release);
}
// 4. Écrit la donnée dans le buffer
unsafe {
// On écrit directement dans la case mémoire
std::ptr::write(self.inner.buffer[tail].get(), value);
}
// 5. Met à jour la position d'écriture
self.inner.tail.store(next_tail, Ordering::Release);
// 6. Réveille le reader s'il attend
self.inner.notify.notify();
}
/// ⚡ VERSION OPTIMISÉE : Ajoute plusieurs éléments d'un coup avec overwrite automatique
/// C'est LA méthode à utiliser pour l'audio temps réel !
pub fn push_slice_overwrite(&self, data: &[T]) -> usize {
let len = data.len();
if len == 0 {
return 0;
}
let mask = self.inner.mask;
let tail = self.inner.tail.load(Ordering::Relaxed);
let head = self.inner.head.load(Ordering::Acquire);
// Calcul de l'espace disponible
let current_used = (tail.wrapping_sub(head)) & mask;
let available = self.inner.cap - current_used;
if len <= available {
// ✅ Assez de place : écriture normale batch (cas le plus fréquent)
self.push_slice_internal(data, tail)
} else {
// ⚡ Pas assez de place : OVERWRITE automatique
// 1. Calculer combien d'éléments anciens on doit écraser
let needed_space = len - available;
// 2. Avancer le head pour libérer exactement l'espace nécessaire
let new_head = (head + needed_space) & mask;
self.inner.head.store(new_head, Ordering::Release);
// 3. Maintenant on a la place, écrire les nouvelles données
self.push_slice_internal(data, tail)
}
}
/// 🚀 Méthode interne optimisée pour l'écriture batch
#[inline]
fn push_slice_internal(&self, data: &[T], tail: usize) -> usize {
let mask = self.inner.mask;
let buffer = &self.inner.buffer;
let len = data.len();
// Optimisation : gestion des cas où on wrap autour du buffer
let tail_pos = tail & mask;
let space_to_end = self.inner.cap - tail_pos;
if len <= space_to_end {
// ✅ Cas simple : tout tient avant la fin du buffer
unsafe {
for (i, &item) in data.iter().enumerate() {
let pos = tail_pos + i;
std::ptr::write(buffer[pos].get(), item);
}
}
} else {
// 🔄 Cas wrap : on doit couper en deux parties
unsafe {
// Première partie : jusqu'à la fin du buffer
for (i, &item) in data[..space_to_end].iter().enumerate() {
let pos = tail_pos + i;
std::ptr::write(buffer[pos].get(), item);
}
// Deuxième partie : depuis le début du buffer
for (i, &item) in data[space_to_end..].iter().enumerate() {
std::ptr::write(buffer[i].get(), item);
}
}
}
// Mettre à jour tail en une seule fois (atomique)
let new_tail = (tail + len) & mask;
self.inner.tail.store(new_tail, Ordering::Release);
// Notifier les readers
self.inner.notify.notify();
len
}
/// Version classique pour compatibilité (utilise push_slice_overwrite en interne)
pub fn push_slice(&self, data: &[T]) -> usize {
self.push_slice_overwrite(data)
}
/// Version spécialisée pour vos frames audio de 960 échantillons
/// Retourne toujours true car overwrite automatique
pub fn push_audio_frame(&self, samples: &[T]) -> bool {
self.push_slice_overwrite(samples);
true // Toujours réussi grâce à l'overwrite
}
/// 📊 Nombre d'éléments qu'on peut écrire sans overwrite
pub fn available_space(&self) -> usize {
let head = self.inner.head.load(Ordering::Relaxed);
let tail = self.inner.tail.load(Ordering::Relaxed);
let used = (tail.wrapping_sub(head)) & self.inner.mask;
self.inner.cap - used
}
/// 📏 Capacité totale du buffer
pub fn capacity(&self) -> usize {
self.inner.cap
}
}
// ============================================================================
// IMPLÉMENTATION DU READER (celui qui lit) - VERSION OPTIMISÉE
// ============================================================================
impl<T: Copy> RingBufReader<T> {
/// Lit un élément en attendant s'il n'y en a pas (BLOQUANT)
pub fn pop_blocking(&self) -> T {
// D'abord on essaie plusieurs fois rapidement (spin)
for _ in 0..100 {
if let Some(val) = self.try_pop() {
return val;
}
// Petite pause pour ne pas surcharger le CPU
std::hint::spin_loop();
}
// Si toujours rien, on attend qu'on nous réveille
loop {
if let Some(val) = self.try_pop() {
return val;
}
// On attend que le writer nous réveille
self.inner.notify.wait();
}
}
/// Essaie de lire un élément (NON-BLOQUANT)
/// Retourne None s'il n'y a rien
pub fn try_pop(&self) -> Option<T> {
// 1. Récupère les positions actuelles
let head = self.inner.head.load(Ordering::Relaxed);
let tail = self.inner.tail.load(Ordering::Acquire);
// 2. Vérifie s'il y a quelque chose à lire
if head == tail {
return None; // Buffer vide
}
// 3. Lit la donnée
let value = unsafe {
std::ptr::read(self.inner.buffer[head & self.inner.mask].get())
};
// 4. Avance la position de lecture
let next_head = (head + 1) & self.inner.mask;
self.inner.head.store(next_head, Ordering::Release);
Some(value)
}
/// 🚀 VERSION OPTIMISÉE : Lit plusieurs éléments d'un coup dans un buffer
pub fn pop_slice(&self, output: &mut [T]) -> usize {
let head = self.inner.head.load(Ordering::Relaxed);
let tail = self.inner.tail.load(Ordering::Acquire);
if head == tail {
return 0; // Buffer vide
}
// Calcule combien d'éléments on peut lire
let available = (tail.wrapping_sub(head)) & self.inner.mask;
let to_read = std::cmp::min(available, output.len());
if to_read == 0 {
return 0;
}
let mask = self.inner.mask;
let buffer = &self.inner.buffer;
let head_pos = head & mask;
let space_to_end = self.inner.cap - head_pos;
if to_read <= space_to_end {
// ✅ Cas simple : tout tient avant la fin du buffer
unsafe {
for i in 0..to_read {
let pos = head_pos + i;
output[i] = std::ptr::read(buffer[pos].get());
}
}
} else {
// 🔄 Cas wrap : on doit lire en deux parties
unsafe {
// Première partie : jusqu'à la fin du buffer
for i in 0..space_to_end {
let pos = head_pos + i;
output[i] = std::ptr::read(buffer[pos].get());
}
// Deuxième partie : depuis le début du buffer
let remaining = to_read - space_to_end;
for i in 0..remaining {
output[space_to_end + i] = std::ptr::read(buffer[i].get());
}
}
}
// Mettre à jour head en une fois
let new_head = (head + to_read) & mask;
self.inner.head.store(new_head, Ordering::Release);
to_read
}
/// Version bloquante pour lire exactement N éléments
pub fn pop_slice_blocking(&self, output: &mut [T]) -> usize {
let mut total_read = 0;
while total_read < output.len() {
let read = self.pop_slice(&mut output[total_read..]);
total_read += read;
if total_read < output.len() {
// Pas assez d'éléments, on attend
self.inner.notify.wait();
}
}
total_read
}
/// Récupère les données disponibles, bloque uniquement si buffer vide
/// Combine la puissance de pop_slice (flexible) avec l'attente automatique
pub fn pop_slice_wait(&self, output: &mut [T]) -> usize {
// ⚡ Tentative non-bloquante d'abord
let read = self.pop_slice(output);
if read > 0 {
return read; // ✅ Données disponibles
}
// 🔔 Buffer vide - attend signal du producteur
self.inner.notify.wait();
// ⚡ Récupère ce qui est maintenant disponible
self.pop_slice(output)
}
/// Vide complètement le buffer
pub fn clear(&self) {
let tail = self.inner.tail.load(Ordering::Acquire);
self.inner.head.store(tail, Ordering::Release);
}
/// Nombre approximatif d'éléments dans le buffer
pub fn len(&self) -> usize {
let head = self.inner.head.load(Ordering::Relaxed);
let tail = self.inner.tail.load(Ordering::Relaxed);
(tail.wrapping_sub(head)) & self.inner.mask
}
/// Le buffer est-il vide ?
pub fn is_empty(&self) -> bool {
let head = self.inner.head.load(Ordering::Relaxed);
let tail = self.inner.tail.load(Ordering::Relaxed);
head == tail
}
/// 📏 Capacité totale du buffer
pub fn capacity(&self) -> usize {
self.inner.cap
}
}
// ============================================================================
// IMPLÉMENTATIONS CLONABLES (pour partager entre threads)
// ============================================================================
impl<T> Clone for RingBufWriter<T> {
fn clone(&self) -> Self {
RingBufWriter {
inner: self.inner.clone(),
}
}
}
impl<T> Clone for RingBufReader<T> {
fn clone(&self) -> Self {
RingBufReader {
inner: self.inner.clone(),
}
}
}
// ============================================================================
// RINGBUFFER AUDIO TEMPS RÉEL - GUIDE COMPLET DES CAS D'USAGE
// ============================================================================
/*
CRÉATION ET CONFIGURATION :
========================
// Création basique (taille DOIT être puissance de 2)
let (writer, reader) = ringbuf::<i16>(1024); // Buffer basique
let (writer, reader) = ringbuf::<f32>(32768); // Audio haute qualité (~0.7s à 48kHz)
let (writer, reader) = ringbuf::<u8>(8192); // Données binaires
// Distribution multi-threads
let buffer = RingBuffer::<i16>::new(16384);
let capture_buffer = buffer.clone(); // Pour thread capture
let encoder_buffer = buffer.clone(); // Pour thread encodage
let stats_buffer = buffer.clone(); // Pour thread statistiques
// Récupération des endpoints
let (writer, reader) = buffer.split(); // Consomme le buffer
let writer = buffer.writer(); // Endpoint writer seul
let reader = buffer.reader(); // Endpoint reader seul
MÉTHODES D'ÉCRITURE (Writer) :
=============================
// Écriture unitaire
writer.push(sample); // Bloque si buffer plein
writer.try_push(sample)?; // Non-bloquant, erreur si plein
// Écriture batch - Mode sécurisé
let written = writer.push_slice(&samples); // Écrit tous ou aucun
writer.try_push_slice(&samples)?; // Non-bloquant
// ⚡ Écriture batch - Mode temps réel (RECOMMANDÉ pour audio)
writer.push_slice_overwrite(&samples); // Jamais bloque, écrase les anciennes données
// Cas d'usage par contexte :
// - Callback audio temps réel
move |audio_data: &[i16], _info| {
writer.push_slice_overwrite(audio_data); // ✅ Performance garantie
}
// - Thread de capture manuel
loop {
let samples = microphone.read_samples()?;
writer.push_slice_overwrite(&samples); // ✅ Jamais de blocage
}
// - Écriture conditionnelle
if buffer.available_write() >= samples.len() {
writer.push_slice(&samples); // Mode sécurisé
} else {
writer.push_slice_overwrite(&samples); // Force l'écriture
}
MÉTHODES DE LECTURE (Reader) :
=============================
// Lecture unitaire
let sample = reader.pop(); // Bloque jusqu'à avoir un élément
let sample = reader.try_pop()?; // Non-bloquant, erreur si vide
// ⚡ Lecture batch - Mode flexible (RECOMMANDÉ)
let mut buffer = vec![0i16; 960];
let read = reader.pop_slice(&mut buffer); // Prend ce qui est dispo (0 à 960)
if read > 0 {
process_audio(&buffer[..read]); // Traite la taille réelle
}
// Lecture batch - Mode blocking (frame exact requis)
let mut buffer = vec![0i16; 960];
let read = reader.pop_slice_blocking(&mut buffer); // Remplit EXACTEMENT le buffer
assert_eq!(read, buffer.len()); // Toujours vrai
encode_fixed_frame(&buffer); // Encodeur exigeant 960 samples
// ⭐ Lecture batch - Mode wait (MEILLEUR DES DEUX)
let mut buffer = vec![0i16; 960];
let read = reader.pop_slice_wait(&mut buffer); // Prend dispo, bloque SEULEMENT si vide
if read > 0 {
process_flexible_audio(&buffer[..read]); // Taille variable OK
}
// Lecture avec timeout
let read = reader.pop_slice_wait_timeout(&mut buffer, Duration::from_millis(10));
match read {
0 => println!("Timeout - pas de données"),
n => process_audio(&buffer[..n]),
}
CAS D'USAGE PAR DOMAINE :
========================
🎵 AUDIO TEMPS RÉEL :
-------------------
// Thread capture (producteur temps réel)
move |data: &[i16], _info| {
writer.push_slice_overwrite(data); // ✅ Jamais bloque
}
// Thread encodage (consommateur temps réel)
loop {
let mut frame = vec![0i16; 960]; // 20ms frame
let samples = reader.pop_slice_wait(&mut frame); // ✅ Prend dispo, attend si vide
if samples >= 480 { // Au moins 10ms
encode_opus(&frame[..samples]);
} else if samples > 0 {
frame[samples..].fill(0); // Padding silence
encode_opus(&frame);
}
}
// Thread playback (deadline critique)
move |output: &mut [i16], _info| {
let read = reader.pop_slice(output); // ✅ Non-bloquant
if read < output.len() {
output[read..].fill(0); // Underrun -> silence
}
}
📊 TRAITEMENT BATCH NON-CRITIQUE :
---------------------------------
// Thread analyse (peut bloquer)
loop {
let mut chunk = vec![0i16; 4800]; // 100ms de données
let read = reader.pop_slice_blocking(&mut chunk); // ✅ OK de bloquer
analyze_frequency_spectrum(&chunk[..read]); // Traitement lourd
}
💾 SAUVEGARDE FICHIER :
----------------------
let mut file = File::create("recording.raw")?;
loop {
let mut buffer = vec![0i16; 8192];
let read = reader.pop_slice_blocking(&mut buffer);
if read == 0 { break; } // EOF
let bytes = bytemuck::cast_slice(&buffer[..read]);
file.write_all(bytes)?; // Écrit séquentiellement
}
🌐 RÉSEAU AVEC BUFFERISATION :
-----------------------------
// Thread envoi réseau
loop {
let mut packet = vec![0u8; 1400]; // MTU Ethernet
let read = reader.pop_slice_wait(&mut packet);
if read > 0 {
udp_socket.send_to(&packet[..read], addr)?;
}
}
// Thread réception réseau
loop {
let mut buffer = [0u8; 1500];
let (size, _addr) = udp_socket.recv_from(&mut buffer)?;
writer.push_slice_overwrite(&buffer[..size]); // Peut perdre paquets
}
PATTERNS AVANCÉS :
=================
🔄 MULTI-PRODUCTEUR, MULTI-CONSOMMATEUR :
----------------------------------------
let buffer = RingBuffer::<i16>::new(32768);
// Plusieurs producteurs (ex: micros)
let mic1_writer = buffer.clone().writer();
let mic2_writer = buffer.clone().writer();
// Plusieurs consommateurs (ex: encodage + stats)
let encoder_reader = buffer.clone().reader();
let stats_reader = buffer.clone().reader();
🏭 PIPELINE AUDIO COMPLEXE :
---------------------------
// Capture -> Filtre -> Encodage -> Réseau
let raw_buffer = RingBuffer::<i16>::new(16384);
let filtered_buffer = RingBuffer::<i16>::new(16384);
// Thread 1: Capture
std::thread::spawn({
let writer = raw_buffer.writer();
move || {
loop {
let samples = capture_audio();
writer.push_slice_overwrite(&samples);
}
}
});
// Thread 2: Filtrage
std::thread::spawn({
let reader = raw_buffer.reader();
let writer = filtered_buffer.writer();
move || {
let mut buffer = vec![0i16; 480];
loop {
let read = reader.pop_slice_wait(&mut buffer);
let filtered = apply_noise_reduction(&buffer[..read]);
writer.push_slice_overwrite(&filtered);
}
}
});
// Thread 3: Encodage + Réseau
std::thread::spawn({
let reader = filtered_buffer.reader();
move || {
let mut buffer = vec![0i16; 960];
loop {
let read = reader.pop_slice_wait(&mut buffer);
let encoded = encode_opus(&buffer[..read]);
send_to_network(&encoded);
}
}
});
OPTIMISATIONS ET BONNES PRATIQUES :
==================================
📏 SIZING DU BUFFER :
--------------------
// Calcul pour audio 48kHz, latence 100ms
const SAMPLE_RATE: usize = 48000;
const LATENCY_MS: usize = 100;
let buffer_size = (SAMPLE_RATE * LATENCY_MS / 1000).next_power_of_two();
let (writer, reader) = ringbuf::<i16>(buffer_size);
💾 GESTION MÉMOIRE :
-------------------
// ✅ Réutiliser les buffers
let mut reusable_buffer = vec![0i16; 960];
loop {
let read = reader.pop_slice(&mut reusable_buffer);
if read > 0 {
process_audio(&reusable_buffer[..read]); // Pas d'allocation
}
}
// ❌ Éviter allocations répétées
loop {
let read = reader.pop_slice_wait(&mut vec![0i16; 960]); // ❌ Alloc à chaque tour
}
📊 MONITORING ET SANTÉ :
-----------------------
// Surveillance utilisation buffer
let usage = buffer.len() as f32 / buffer.capacity() as f32;
match usage {
x if x > 0.9 => println!("⚠️ Buffer presque plein: {:.1}%", x * 100.0),
x if x < 0.1 => println!(" Buffer presque vide: {:.1}%", x * 100.0),
_ => {} // OK
}
// Qualité adaptative selon charge
let quality = match usage {
x if x > 0.8 => AudioQuality::Low, // Réduire latence
x if x < 0.2 => AudioQuality::High, // Augmenter qualité
_ => AudioQuality::Medium,
};
TABLEAU RÉCAPITULATIF DES MÉTHODES :
===================================
| CONTEXTE | ÉCRITURE | LECTURE | RAISON |
|-----------------------|-----------------------|-----------------------|---------------------------|
| Audio temps réel | push_slice_overwrite | pop_slice_wait | Performance + réactivité |
| Callback critique | push_slice_overwrite | pop_slice | Jamais bloquer |
| Traitement batch | push_slice | pop_slice_blocking | Garantie complétude |
| Réseau | push_slice_overwrite | pop_slice_wait | Robustesse + efficacité |
| Sauvegarde fichier | push_slice | pop_slice_blocking | Intégrité données |
| Pipeline flexibile | push_slice_overwrite | pop_slice_wait | Optimal général |
🏆 VOTRE RINGBUFFER = PUISSANCE DES CHANNELS + PERFORMANCE ZERO-COPY ! 🚀
*/

File diff suppressed because it is too large Load Diff

35
src-tauri/tauri.conf.json Normal file
View File

@@ -0,0 +1,35 @@
{
"$schema": "https://schema.tauri.app/config/2",
"productName": "ox_speak_client",
"version": "0.1.0",
"identifier": "com.oxspeak.app",
"build": {
"beforeDevCommand": "yarn dev",
"devUrl": "http://localhost:1420",
"beforeBuildCommand": "yarn build",
"frontendDist": "../dist"
},
"app": {
"windows": [
{
"title": "ox_speak_client",
"width": 1024,
"height": 768
}
],
"security": {
"csp": null
}
},
"bundle": {
"active": true,
"targets": "all",
"icon": [
"icons/32x32.png",
"icons/128x128.png",
"icons/128x128@2x.png",
"icons/icon.icns",
"icons/icon.ico"
]
}
}