init sync

This commit is contained in:
2025-07-04 17:02:28 +02:00
commit 9d86f322f2
30 changed files with 2523 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
/target

8
.idea/.gitignore generated vendored Normal file
View File

@@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

8
.idea/modules.xml generated Normal file
View File

@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/ox_speak.iml" filepath="$PROJECT_DIR$/.idea/ox_speak.iml" />
</modules>
</component>
</project>

11
.idea/ox_speak.iml generated Normal file
View File

@@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="EMPTY_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

6
.idea/vcs.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

1147
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

22
Cargo.toml Normal file
View File

@@ -0,0 +1,22 @@
[package]
name = "ox_speak"
version = "0.1.0"
edition = "2024"
[[bin]]
name = "ox_speak"
path = "src/main.rs"
[lib]
name = "ox_speak"
path = "src/lib.rs"
[dependencies]
cpal = "0.16"
opus = "0.3"
crossbeam = "0.8"
parking_lot = "0.12"
tokio = "1.45"
strum = {version = "0.27", features = ["derive"]}
uuid = {version = "1.17", features = ["v4", "serde"]}
event-listener = "5.4"

112
README.md Normal file
View File

@@ -0,0 +1,112 @@
# ox_speak Architecture Audio Temps Réel en Rust
## 📦 Structure actuelle du projet
```text
src/
├── main.rs # Point d'entrée, instancie App et démarre l'orchestration
├── lib.rs # Déclarations des modules principaux
├── app/ # Initialisation, gestion de haut niveau de l'application
│ ├── mod.rs
│ └── app.rs # Struct App : contient les stacks (audio, net) et le EventBus
├── core/ # Traitements audio bas-niveau (CPAL, Opus, stats...)
│ ├── mod.rs
│ ├── capture.rs # Capture micro avec CPAL
│ ├── playback.rs # Playback via CPAL
│ ├── mixer.rs # Mixage audio
│ ├── opus.rs # Encodage/Décodage Opus
│ ├── stats.rs # Statistiques audio
│ └── rms.rs # Détection de silence via RMS
├── domain/ # Types globaux et événements partagés
│ ├── mod.rs
│ └── event.rs # EventBus (struct + enum Event) pour routing interne
├── network/ # Communication réseau (UDP, protocole)
│ ├── mod.rs
│ ├── udp.rs # Client UDP (envoi/réception)
│ └── protocol.rs # Types et parsing de messages réseau
├── runtime/ # Logique d'exécution centrale (orchestration)
│ ├── mod.rs
│ └── dispatcher.rs # Dispatcher central (consomme les Event)
```
---
## 🛠️ Tâches à faire dans lordre
1. **Compléter `domain/event.rs`**
- Définir les `enum Event` utilisés dans ton application (ex : `AudioIn`, `EncodedFrame`, etc.)
- Structurer `EventBus` avec `crossbeam_channel`
2. **Écrire un `dispatcher` simple dans `runtime/dispatcher.rs`**
- Boucle bloquante `while let Ok(event) = rx.recv()`
- Match sur chaque type dévénement (→ faire une "god function" au début)
- Appeler directement les fonctions concernées (ex : `udp.send()`, `playback.play()`…)
3. **Initialiser `App` dans `app/app.rs`**
- Créer une struct `App` contenant :
- le `EventBus`
- les instances des modules nécessaires (`capture`, `udp`, etc.)
- Méthode `run()` qui spawn les threads dentrée (`audio`, `network`), et lance le `dispatcher`
4. **Compléter les modules `core/` et `network/`**
- Rendre chaque module autonome, avec une méthode `start()` ou `run()` prenant un `Sender<Event>`
- Exemple : `capture.run(tx)` capture le micro et envoie des `AudioIn(Vec<f32>)`
5. **Dans `main.rs`**
- Créer `App`, puis appeler `app.run()`.
6. (optionnel) **Plus tard : découpler le dispatcher en handlers**
- Créer un `trait EventHandler` pour déléguer proprement la logique métier
- Injecter les handlers dans le dispatcher pour garder `dispatcher.rs` léger
---
## 🔄 Orchestration globale (comment tout communique)
```text
THREADS :
- capture_thread : génère AudioIn
- opus_encode_thread: encode AudioIn → EncodedFrame
- udp_send_thread : reçoit EncodedFrame → envoie réseau
- udp_recv_thread : reçoit NetIn → decode → AudioDecoded
- playback_thread : lit AudioDecoded
TOUS utilisent :
Sender<Event> (cloné)
EventBus central (domain/event.rs)
RECEIVER :
dispatcher (runtime/dispatcher.rs)
→ lit les events
→ décide quoi faire : encode, envoyer, lire...
```
---
## 📝 Remarques
- Tu peux rester **mono-thread** dans un premier temps pour simplifier le flow.
- Lapproche actuelle (dispatcher = "god function") est très bien pour commencer **et lisible**.
- Tu peux logguer chaque événement dans le dispatcher pour debugger le flux.
- `EventBus` est cloné et injecté dans chaque module. **Aucun module ne s'appelle entre eux.**
---
## 🧠 Notes de conception
- Tu as choisi de garder la génération de la trame encodée **dans le thread de capture**, ce qui est parfaitement logique ici : tu as 20 ms de temps CPU garanti dans le callback CPAL pour encoder et publier un `Event::EncodedFrame`.
- Cette approche évite la surcharge dun thread supplémentaire et permet une architecture **performante et simple** tant que tu restes dans les contraintes temps réel.
---
## ❓Question
> Pour le moment, elle ne se pose pas, car en réalité je vais garder l'ancienne logique du "c'est capture qui va générer la frame encodée", vu que le thread a 20 ms pour bosser entre chaque callback cpal, il a largement le temps de le faire.
Donc aucun doute à ce stade. Si tu changes davis ou que la logique devient trop lourde dans `capture`, tu pourras refactorer vers un modèle où `AudioIn` est un event brut et l'encodage est fait côté dispatcher ou dans un worker dédié.

BIN
ox_speak.zip Normal file

Binary file not shown.

41
src/app/app.rs Normal file
View File

@@ -0,0 +1,41 @@
use std::thread;
use crossbeam::channel::Receiver;
use crate::core::capture::AudioCapture;
use crate::domain::event::{Event, EventBus};
use crate::runtime::dispatcher::Dispatcher;
pub struct App {
// Communication inter-thread
event_bus: EventBus,
event_rx: Receiver<Event>,
// audio
audio_capture: AudioCapture
}
impl App {
pub fn new() -> Self {
// Event_bus - communication inter-components
let (event_bus, event_rx) = EventBus::new();
// Audio
let audio_capture = AudioCapture::default(event_bus.clone());
Self {
event_bus,
event_rx,
audio_capture
}
}
pub fn start(&mut self) {
// Dispatcher - lecture et transmission des signaux inter-components
let mut dispatcher = Dispatcher::new(self.event_bus.clone());
let event_rx = self.event_rx.clone();
thread::spawn(move || {
dispatcher.run(event_rx)
});
self.audio_capture.start()
}
}

1
src/app/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod app;

148
src/core/capture.rs Normal file
View File

@@ -0,0 +1,148 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::thread::JoinHandle;
use cpal::{default_host, BufferSize, Device, SampleRate, Stream, StreamConfig, SupportedStreamConfig};
use cpal::traits::{DeviceTrait, HostTrait};
use crate::core::opus::AudioOpus;
use crate::domain::event::{Event, EventBus};
use crate::utils::ringbuf::RingBuffer;
#[derive(Clone)]
struct Microphone {
device: Device,
}
pub struct AudioCapture {
event_bus: EventBus,
microphone: Microphone,
running: Arc<AtomicBool>,
ring_buffer: RingBuffer<i16>,
steam: Option<Stream>,
worker: Option<JoinHandle<()>>,
}
impl Microphone {
pub fn new(device: Device) -> Self {
Self {
device
}
}
pub fn default() -> Self {
let host = default_host();
let device = host.default_input_device().unwrap();
Self::new(device)
}
pub fn get_input_config(&self) -> SupportedStreamConfig {
self.device.default_input_config().unwrap()
}
pub fn get_stream_config(&self) -> StreamConfig {
let config = self.get_input_config();
let mut stream_config: StreamConfig = config.into();
stream_config.channels = 1;
stream_config.sample_rate = SampleRate(48000);
stream_config.buffer_size = BufferSize::Fixed(960);
stream_config
}
pub fn build_stream<F>(&self, callback: F) -> Stream
where
F: FnMut(&[i16], &cpal::InputCallbackInfo) + Send + 'static,
{
let config = self.get_stream_config();
self.device.build_input_stream(
&config,
callback,
|err| println!("Error input stream: {err}"),
None
).unwrap()
}
}
impl AudioCapture {
pub fn new(event_bus: EventBus, microphone: Microphone) -> Self {
Self {
event_bus,
microphone,
running: Arc::new(AtomicBool::new(false)),
ring_buffer: RingBuffer::new(1024),
steam: None,
worker: None,
}
}
pub fn default(event_bus: EventBus) -> Self {
Self::new(event_bus, Microphone::default())
}
pub fn start(&mut self) {
self.running.store(true, Ordering::Relaxed);
// stream cpal
let writer = self.ring_buffer.writer();
let stream_running = self.running.clone();
let stream = self.microphone.build_stream(move |data, _| {
if !stream_running.load(Ordering::Relaxed){
return;
}
writer.push_slice_overwrite(data);
});
self.steam = Some(stream);
// Audio processing worker
self.run_processing_worker();
}
pub fn stop(&mut self) {
self.running.store(false, Ordering::Relaxed);
self.steam = None;
self.ring_buffer.force_wake_up();
if let Some(worker) = self.worker.take() {
worker.join().unwrap();
}
self.ring_buffer.clear();
}
fn run_processing_worker(&mut self){
let worker_running = self.running.clone();
let event_bus = self.event_bus.clone();
let input_config = self.microphone.get_input_config();
let opus = AudioOpus::new(input_config.sample_rate().0, input_config.channels(), "voip");
let mut encoder = opus.create_encoder().unwrap();
let reader = self.ring_buffer.reader();
self.worker = Some(thread::spawn(move || {
let mut frame = [0i16; 960];
while worker_running.load(Ordering::Relaxed) {
let _ = reader.pop_slice_blocking(&mut frame);
if !worker_running.load(Ordering::Relaxed){
break;
}
let raw_data = frame.to_vec();
event_bus.emit(Event::AudioIn(raw_data));
match encoder.encode(&frame){
Ok(encoded_data) => {
event_bus.emit(Event::AudioEncoded(encoded_data))
}
Err(e) => {
println!("Error encoding: {e}");
}
}
}
}));
}
}
impl AudioCapture {
fn audio_processing(){
}
}

0
src/core/mixer.rs Normal file
View File

6
src/core/mod.rs Normal file
View File

@@ -0,0 +1,6 @@
pub mod capture;
pub mod mixer;
pub mod opus;
pub mod playback;
pub mod rms;
pub mod stats;

110
src/core/opus.rs Normal file
View File

@@ -0,0 +1,110 @@
use opus::{Application, Channels, Decoder, Encoder};
#[derive(Clone)]
pub struct AudioOpus{
sample_rate: u32,
channels: u16,
application: Application
}
impl AudioOpus {
pub fn new(sample_rate: u32, channels: u16, application: &str) -> Self {
let application = match application {
"voip" => Application::Voip,
"audio" => Application::Audio,
"lowdelay" => Application::LowDelay,
_ => Application::Voip,
};
Self{sample_rate, channels, application}
}
pub fn create_encoder(&self) -> Result<AudioOpusEncoder, String> {
AudioOpusEncoder::new(self.clone())
}
pub fn create_decoder(&self) -> Result<AudioOpusDecoder, String> {
AudioOpusDecoder::new(self.clone())
}
}
pub struct AudioOpusEncoder{
audio_opus: AudioOpus,
encoder: opus::Encoder,
}
impl AudioOpusEncoder {
fn new(audio_opus: AudioOpus) -> Result<Self, String> {
let opus_channel = match audio_opus.channels {
1 => Channels::Mono,
2 => Channels::Stereo,
_ => Channels::Mono,
};
let mut encoder = Encoder::new(audio_opus.sample_rate, opus_channel, audio_opus.application)
.map_err(|e| format!("Échec de création de l'encodeur: {:?}", e))?;
match audio_opus.application {
Application::Voip => {
// Paramètres optimaux pour VoIP: bonne qualité vocale, CPU modéré
let _ = encoder.set_bitrate(opus::Bitrate::Bits(24000)); // 24kbps est bon pour la voix
let _ = encoder.set_vbr(true); // Variable bitrate économise du CPU
let _ = encoder.set_vbr_constraint(false); // Sans contrainte stricte de débit
// Pas de set_complexity (non supporté par la crate)
},
Application::Audio => {
// Musique: priorité à la qualité
let _ = encoder.set_bitrate(opus::Bitrate::Bits(64000));
let _ = encoder.set_vbr(true);
},
Application::LowDelay => {
// Priorité à la latence et l'efficacité CPU
let _ = encoder.set_bitrate(opus::Bitrate::Bits(18000));
let _ = encoder.set_vbr(true);
},
}
Ok(Self{audio_opus, encoder})
}
pub fn encode(&mut self, frames: &[i16]) -> Result<Vec<u8>, String> {
let mut output = vec![0u8; 1276]; // 1276 octets (la vraie worst-case recommandée par Opus).
let len = self.encoder.encode(frames, output.as_mut_slice())
.map_err(|e| format!("Erreur encodage: {:?}", e))?;
output.truncate(len);
Ok(output)
}
// 🔄 Approche avec buffer réutilisable (encore plus optimal)
fn encode_reuse(&mut self, frames: &[i16], output: &mut Vec<u8>) -> Result<usize, String> {
output.clear();
output.resize(1276, 0);
let len = self.encoder.encode(frames, output.as_mut_slice()).unwrap();
output.truncate(len);
Ok(len)
}
}
pub struct AudioOpusDecoder{
audio_opus: AudioOpus,
decoder: opus::Decoder,
}
impl AudioOpusDecoder {
fn new(audio_opus: AudioOpus) -> Result<Self, String> {
let opus_channel = match audio_opus.channels {
1 => Channels::Mono,
2 => Channels::Stereo,
_ => Channels::Mono,
};
let decoder = Decoder::new(audio_opus.sample_rate, opus_channel)
.map_err(|e| format!("Échec de création du décodeur: {:?}", e))?;;
Ok(Self{audio_opus, decoder})
}
pub fn decode(&mut self, frames: &[u8]) -> Result<Vec<i16>, String> {
let mut output = vec![0i16; 5760];
let len = self.decoder.decode(frames, output.as_mut_slice(), false).map_err(|e| format!("Erreur décodage: {:?}", e))?;
output.truncate(len);
Ok(output)
}
}

0
src/core/playback.rs Normal file
View File

0
src/core/rms.rs Normal file
View File

0
src/core/stats.rs Normal file
View File

29
src/domain/event.rs Normal file
View File

@@ -0,0 +1,29 @@
use crossbeam::channel::{unbounded, Sender, Receiver};
pub enum Event {
AudioIn(Vec<i16>),
AudioEncoded(Vec<u8>),
NetIn(Vec<u8>),
NetOut(Vec<u8>),
}
#[derive(Clone)]
pub struct EventBus {
pub sender: Sender<Event>
}
impl EventBus {
pub fn new() -> (Self, Receiver<Event>) {
let (sender, receiver) = unbounded();
(Self { sender }, receiver)
}
pub fn emit(&self, event: Event) {
// s'utilise de cette façon : bus.emit(Event::AudioIn {Vec[0,1,2,3]}
let _ = self.sender.send(event);
}
pub fn clone_sender(&self) -> Sender<Event> {
self.sender.clone()
}
}

1
src/domain/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod event;

6
src/lib.rs Normal file
View File

@@ -0,0 +1,6 @@
pub mod app;
pub mod core;
pub mod domain;
pub mod network;
pub mod runtime;
pub mod utils;

5
src/main.rs Normal file
View File

@@ -0,0 +1,5 @@
//
fn main() {
println!("Hello, world!");
}

2
src/network/mod.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod protocol;
pub mod udp;

0
src/network/protocol.rs Normal file
View File

0
src/network/udp.rs Normal file
View File

38
src/runtime/dispatcher.rs Normal file
View File

@@ -0,0 +1,38 @@
use crossbeam::channel::Receiver;
use crate::domain::event::{Event, EventBus};
pub struct Dispatcher {
event_bus: EventBus
}
impl Dispatcher {
pub fn new(event_bus: EventBus) -> Self {
Self {
event_bus
}
}
pub fn run(&mut self, receiver: Receiver<Event>) {
while let Ok(event) = receiver.recv() {
match event {
Event::AudioIn(sample) => {
}
Event::AudioEncoded(sample_encoded) => {
}
Event::NetIn(sample_encoded) => {
}
Event::NetOut(sample_encoded) => {
}
_ => {
println!("Event non prit en charge !")
}
}
}
}
}

1
src/runtime/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod dispatcher;

2
src/utils/mod.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod ringbuf;
pub mod real_time_event;

View File

@@ -0,0 +1,46 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use event_listener::{Event, Listener};
struct RealTimeEventInner{
flag: AtomicBool,
event: Event,
}
#[derive(Clone)]
pub struct RealTimeEvent {
inner: Arc<RealTimeEventInner>,
}
impl RealTimeEvent{
pub fn new() -> Self{
Self{
inner: Arc::new(RealTimeEventInner{
flag: AtomicBool::new(false),
event: Event::new(),
})
}
}
pub fn notify(&self){
self.inner.flag.store(true, Ordering::Release);
self.inner.event.notify(usize::MAX);
}
pub fn wait(&self){
loop {
let listener = self.inner.event.listen();
if self.inner.flag.swap(false, Ordering::Acquire){
break
}
listener.wait();
}
}
}
impl Default for RealTimeEvent{
fn default() -> Self{
Self::new()
}
}

772
src/utils/ringbuf.rs Normal file
View File

@@ -0,0 +1,772 @@
// Optimisé pour performance audio temps réel avec overwrite automatique
// Version améliorée avec batch processing et gestion intelligente de l'overwrite
// todo : Code généré par IA, je le comprend pas trop trop encore, à peaufiner quand je maitriserais un peu mieux Rust.
use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use crate::utils::real_time_event::RealTimeEvent;
// ============================================================================
// STRUCTURES PRINCIPALES
// ============================================================================
/// Celui qui écrit dans le buffer (producteur)
pub struct RingBufWriter<T> {
inner: Arc<InnerRingBuf<T>>,
}
/// Celui qui lit depuis le buffer (consommateur)
pub struct RingBufReader<T> {
inner: Arc<InnerRingBuf<T>>,
}
/// Le buffer circulaire interne partagé entre writer et reader
struct InnerRingBuf<T> {
// Le buffer qui contient nos données
buffer: Vec<UnsafeCell<T>>,
// Position où on écrit les nouvelles données
tail: AtomicUsize,
// Position où on lit les données
head: AtomicUsize,
// Pour réveiller le reader quand il y a des nouvelles données
notify: RealTimeEvent,
// Taille du buffer
cap: usize,
// Masque pour optimiser les calculs (cap - 1)
// Au lieu de faire "index % cap", on fait "index & mask" (plus rapide)
mask: usize,
}
// On dit à Rust que c'est safe de partager entre threads
unsafe impl<T: Send> Send for InnerRingBuf<T> {}
unsafe impl<T: Send> Sync for InnerRingBuf<T> {}
// ============================================================================
// FONCTION DE CRÉATION
// ============================================================================
/// Crée un nouveau ring buffer
/// IMPORTANT: cap DOIT être une puissance de 2 (2, 4, 8, 16, 32, 64, 128...)
/// Pourquoi ? Pour l'optimisation avec le masque binaire
pub fn ringbuf<T>(cap: usize) -> (RingBufWriter<T>, RingBufReader<T>) {
let buffer = RingBuffer::new(cap);
buffer.split()
}
#[derive(Clone)]
pub struct RingBuffer<T> {
inner: Arc<InnerRingBuf<T>>,
}
impl<T> RingBuffer<T> {
pub fn new(cap: usize) -> Self {
// Vérifications de sécurité
assert!(cap > 0, "La capacité doit être > 0");
assert!(cap.is_power_of_two(), "La capacité doit être une puissance de 2 (ex: 8, 16, 32...)");
// Crée le buffer avec des cases vides
let mut buffer = Vec::with_capacity(cap);
for _ in 0..cap {
// UnsafeCell permet de modifier même quand c'est partagé entre threads
// On met des valeurs "poubelle" au début
buffer.push(UnsafeCell::new(unsafe { std::mem::zeroed() }));
}
// Crée la structure interne
let inner = Arc::new(InnerRingBuf {
buffer,
tail: AtomicUsize::new(0), // On commence à écrire à l'index 0
head: AtomicUsize::new(0), // On commence à lire à l'index 0
notify: RealTimeEvent::new(),
cap,
mask: cap - 1, // Si cap=8, mask=7. 7 en binaire = 0111
});
Self {
inner
}
}
pub fn writer(&self) -> RingBufWriter<T> {
RingBufWriter { inner: self.inner.clone() }
}
pub fn reader(&self) -> RingBufReader<T> {
RingBufReader { inner: self.inner.clone() }
}
/// Récupère writer et reader en gardant l'accès au buffer original.
/// Utile pour : struct fields, monitoring, accès multiples.
pub fn both(&self) -> (RingBufWriter<T>, RingBufReader<T>) {
(
RingBufWriter { inner: self.inner.clone() },
RingBufReader { inner: self.inner.clone() }
)
}
/// Consomme le buffer et retourne writer/reader (optimisé).
/// Plus efficace que both() - évite 1 clone.
/// Utile pour : setup initial, factory functions.
pub fn split(self) -> (RingBufWriter<T>, RingBufReader<T>) {
(
RingBufWriter { inner: self.inner.clone() },
RingBufReader { inner: self.inner } // Move optimisé
)
}
/// 📊 Méthodes utilitaires directement sur le buffer
pub fn len(&self) -> usize {
let head = self.inner.head.load(Ordering::Relaxed);
let tail = self.inner.tail.load(Ordering::Relaxed);
(tail.wrapping_sub(head)) & self.inner.mask
}
pub fn is_empty(&self) -> bool {
let head = self.inner.head.load(Ordering::Relaxed);
let tail = self.inner.tail.load(Ordering::Relaxed);
head == tail
}
pub fn capacity(&self) -> usize {
self.inner.cap
}
pub fn clear(&self) {
let tail = self.inner.tail.load(Ordering::Acquire);
self.inner.head.store(tail, Ordering::Release);
}
pub fn force_wake_up(&self) {
self.inner.notify.notify()
}
}
// ============================================================================
// IMPLÉMENTATION DU WRITER (celui qui écrit) - VERSION OPTIMISÉE
// ============================================================================
impl<T: Copy> RingBufWriter<T> {
/// Ajoute un élément dans le buffer
/// Si le buffer est plein, écrase les anciens éléments
pub fn push(&self, value: T) {
// 1. Récupère la position actuelle d'écriture
let tail = self.inner.tail.load(Ordering::Relaxed);
// 2. Calcule la prochaine position (avec le masque pour optimiser)
let next_tail = (tail + 1) & self.inner.mask;
// 3. Vérifie si on va rattraper le lecteur
let head = self.inner.head.load(Ordering::Acquire);
if next_tail == head {
// Buffer plein ! On fait avancer le head pour écraser
let new_head = (head + 1) & self.inner.mask;
self.inner.head.store(new_head, Ordering::Release);
}
// 4. Écrit la donnée dans le buffer
unsafe {
// On écrit directement dans la case mémoire
std::ptr::write(self.inner.buffer[tail].get(), value);
}
// 5. Met à jour la position d'écriture
self.inner.tail.store(next_tail, Ordering::Release);
// 6. Réveille le reader s'il attend
self.inner.notify.notify();
}
/// ⚡ VERSION OPTIMISÉE : Ajoute plusieurs éléments d'un coup avec overwrite automatique
/// C'est LA méthode à utiliser pour l'audio temps réel !
pub fn push_slice_overwrite(&self, data: &[T]) -> usize {
let len = data.len();
if len == 0 {
return 0;
}
let mask = self.inner.mask;
let tail = self.inner.tail.load(Ordering::Relaxed);
let head = self.inner.head.load(Ordering::Acquire);
// Calcul de l'espace disponible
let current_used = (tail.wrapping_sub(head)) & mask;
let available = self.inner.cap - current_used;
if len <= available {
// ✅ Assez de place : écriture normale batch (cas le plus fréquent)
self.push_slice_internal(data, tail)
} else {
// ⚡ Pas assez de place : OVERWRITE automatique
// 1. Calculer combien d'éléments anciens on doit écraser
let needed_space = len - available;
// 2. Avancer le head pour libérer exactement l'espace nécessaire
let new_head = (head + needed_space) & mask;
self.inner.head.store(new_head, Ordering::Release);
// 3. Maintenant on a la place, écrire les nouvelles données
self.push_slice_internal(data, tail)
}
}
/// 🚀 Méthode interne optimisée pour l'écriture batch
#[inline]
fn push_slice_internal(&self, data: &[T], tail: usize) -> usize {
let mask = self.inner.mask;
let buffer = &self.inner.buffer;
let len = data.len();
// Optimisation : gestion des cas où on wrap autour du buffer
let tail_pos = tail & mask;
let space_to_end = self.inner.cap - tail_pos;
if len <= space_to_end {
// ✅ Cas simple : tout tient avant la fin du buffer
unsafe {
for (i, &item) in data.iter().enumerate() {
let pos = tail_pos + i;
std::ptr::write(buffer[pos].get(), item);
}
}
} else {
// 🔄 Cas wrap : on doit couper en deux parties
unsafe {
// Première partie : jusqu'à la fin du buffer
for (i, &item) in data[..space_to_end].iter().enumerate() {
let pos = tail_pos + i;
std::ptr::write(buffer[pos].get(), item);
}
// Deuxième partie : depuis le début du buffer
for (i, &item) in data[space_to_end..].iter().enumerate() {
std::ptr::write(buffer[i].get(), item);
}
}
}
// Mettre à jour tail en une seule fois (atomique)
let new_tail = (tail + len) & mask;
self.inner.tail.store(new_tail, Ordering::Release);
// Notifier les readers
self.inner.notify.notify();
len
}
/// Version classique pour compatibilité (utilise push_slice_overwrite en interne)
pub fn push_slice(&self, data: &[T]) -> usize {
self.push_slice_overwrite(data)
}
/// Version spécialisée pour vos frames audio de 960 échantillons
/// Retourne toujours true car overwrite automatique
pub fn push_audio_frame(&self, samples: &[T]) -> bool {
self.push_slice_overwrite(samples);
true // Toujours réussi grâce à l'overwrite
}
/// 📊 Nombre d'éléments qu'on peut écrire sans overwrite
pub fn available_space(&self) -> usize {
let head = self.inner.head.load(Ordering::Relaxed);
let tail = self.inner.tail.load(Ordering::Relaxed);
let used = (tail.wrapping_sub(head)) & self.inner.mask;
self.inner.cap - used
}
/// 📏 Capacité totale du buffer
pub fn capacity(&self) -> usize {
self.inner.cap
}
}
// ============================================================================
// IMPLÉMENTATION DU READER (celui qui lit) - VERSION OPTIMISÉE
// ============================================================================
impl<T: Copy> RingBufReader<T> {
/// Lit un élément en attendant s'il n'y en a pas (BLOQUANT)
pub fn pop_blocking(&self) -> T {
// D'abord on essaie plusieurs fois rapidement (spin)
for _ in 0..100 {
if let Some(val) = self.try_pop() {
return val;
}
// Petite pause pour ne pas surcharger le CPU
std::hint::spin_loop();
}
// Si toujours rien, on attend qu'on nous réveille
loop {
if let Some(val) = self.try_pop() {
return val;
}
// On attend que le writer nous réveille
self.inner.notify.wait();
}
}
/// Essaie de lire un élément (NON-BLOQUANT)
/// Retourne None s'il n'y a rien
pub fn try_pop(&self) -> Option<T> {
// 1. Récupère les positions actuelles
let head = self.inner.head.load(Ordering::Relaxed);
let tail = self.inner.tail.load(Ordering::Acquire);
// 2. Vérifie s'il y a quelque chose à lire
if head == tail {
return None; // Buffer vide
}
// 3. Lit la donnée
let value = unsafe {
std::ptr::read(self.inner.buffer[head & self.inner.mask].get())
};
// 4. Avance la position de lecture
let next_head = (head + 1) & self.inner.mask;
self.inner.head.store(next_head, Ordering::Release);
Some(value)
}
/// 🚀 VERSION OPTIMISÉE : Lit plusieurs éléments d'un coup dans un buffer
pub fn pop_slice(&self, output: &mut [T]) -> usize {
let head = self.inner.head.load(Ordering::Relaxed);
let tail = self.inner.tail.load(Ordering::Acquire);
if head == tail {
return 0; // Buffer vide
}
// Calcule combien d'éléments on peut lire
let available = (tail.wrapping_sub(head)) & self.inner.mask;
let to_read = std::cmp::min(available, output.len());
if to_read == 0 {
return 0;
}
let mask = self.inner.mask;
let buffer = &self.inner.buffer;
let head_pos = head & mask;
let space_to_end = self.inner.cap - head_pos;
if to_read <= space_to_end {
// ✅ Cas simple : tout tient avant la fin du buffer
unsafe {
for i in 0..to_read {
let pos = head_pos + i;
output[i] = std::ptr::read(buffer[pos].get());
}
}
} else {
// 🔄 Cas wrap : on doit lire en deux parties
unsafe {
// Première partie : jusqu'à la fin du buffer
for i in 0..space_to_end {
let pos = head_pos + i;
output[i] = std::ptr::read(buffer[pos].get());
}
// Deuxième partie : depuis le début du buffer
let remaining = to_read - space_to_end;
for i in 0..remaining {
output[space_to_end + i] = std::ptr::read(buffer[i].get());
}
}
}
// Mettre à jour head en une fois
let new_head = (head + to_read) & mask;
self.inner.head.store(new_head, Ordering::Release);
to_read
}
/// Version bloquante pour lire exactement N éléments
pub fn pop_slice_blocking(&self, output: &mut [T]) -> usize {
let mut total_read = 0;
while total_read < output.len() {
let read = self.pop_slice(&mut output[total_read..]);
total_read += read;
if total_read < output.len() {
// Pas assez d'éléments, on attend
self.inner.notify.wait();
}
}
total_read
}
/// Récupère les données disponibles, bloque uniquement si buffer vide
/// Combine la puissance de pop_slice (flexible) avec l'attente automatique
pub fn pop_slice_wait(&self, output: &mut [T]) -> usize {
// ⚡ Tentative non-bloquante d'abord
let read = self.pop_slice(output);
if read > 0 {
return read; // ✅ Données disponibles
}
// 🔔 Buffer vide - attend signal du producteur
self.inner.notify.wait();
// ⚡ Récupère ce qui est maintenant disponible
self.pop_slice(output)
}
/// Vide complètement le buffer
pub fn clear(&self) {
let tail = self.inner.tail.load(Ordering::Acquire);
self.inner.head.store(tail, Ordering::Release);
}
/// Nombre approximatif d'éléments dans le buffer
pub fn len(&self) -> usize {
let head = self.inner.head.load(Ordering::Relaxed);
let tail = self.inner.tail.load(Ordering::Relaxed);
(tail.wrapping_sub(head)) & self.inner.mask
}
/// Le buffer est-il vide ?
pub fn is_empty(&self) -> bool {
let head = self.inner.head.load(Ordering::Relaxed);
let tail = self.inner.tail.load(Ordering::Relaxed);
head == tail
}
/// 📏 Capacité totale du buffer
pub fn capacity(&self) -> usize {
self.inner.cap
}
}
// ============================================================================
// IMPLÉMENTATIONS CLONABLES (pour partager entre threads)
// ============================================================================
impl<T> Clone for RingBufWriter<T> {
fn clone(&self) -> Self {
RingBufWriter {
inner: self.inner.clone(),
}
}
}
impl<T> Clone for RingBufReader<T> {
fn clone(&self) -> Self {
RingBufReader {
inner: self.inner.clone(),
}
}
}
// ============================================================================
// RINGBUFFER AUDIO TEMPS RÉEL - GUIDE COMPLET DES CAS D'USAGE
// ============================================================================
/*
CRÉATION ET CONFIGURATION :
========================
// Création basique (taille DOIT être puissance de 2)
let (writer, reader) = ringbuf::<i16>(1024); // Buffer basique
let (writer, reader) = ringbuf::<f32>(32768); // Audio haute qualité (~0.7s à 48kHz)
let (writer, reader) = ringbuf::<u8>(8192); // Données binaires
// Distribution multi-threads
let buffer = RingBuffer::<i16>::new(16384);
let capture_buffer = buffer.clone(); // Pour thread capture
let encoder_buffer = buffer.clone(); // Pour thread encodage
let stats_buffer = buffer.clone(); // Pour thread statistiques
// Récupération des endpoints
let (writer, reader) = buffer.split(); // Consomme le buffer
let writer = buffer.writer(); // Endpoint writer seul
let reader = buffer.reader(); // Endpoint reader seul
MÉTHODES D'ÉCRITURE (Writer) :
=============================
// Écriture unitaire
writer.push(sample); // Bloque si buffer plein
writer.try_push(sample)?; // Non-bloquant, erreur si plein
// Écriture batch - Mode sécurisé
let written = writer.push_slice(&samples); // Écrit tous ou aucun
writer.try_push_slice(&samples)?; // Non-bloquant
// ⚡ Écriture batch - Mode temps réel (RECOMMANDÉ pour audio)
writer.push_slice_overwrite(&samples); // Jamais bloque, écrase les anciennes données
// Cas d'usage par contexte :
// - Callback audio temps réel
move |audio_data: &[i16], _info| {
writer.push_slice_overwrite(audio_data); // ✅ Performance garantie
}
// - Thread de capture manuel
loop {
let samples = microphone.read_samples()?;
writer.push_slice_overwrite(&samples); // ✅ Jamais de blocage
}
// - Écriture conditionnelle
if buffer.available_write() >= samples.len() {
writer.push_slice(&samples); // Mode sécurisé
} else {
writer.push_slice_overwrite(&samples); // Force l'écriture
}
MÉTHODES DE LECTURE (Reader) :
=============================
// Lecture unitaire
let sample = reader.pop(); // Bloque jusqu'à avoir un élément
let sample = reader.try_pop()?; // Non-bloquant, erreur si vide
// ⚡ Lecture batch - Mode flexible (RECOMMANDÉ)
let mut buffer = vec![0i16; 960];
let read = reader.pop_slice(&mut buffer); // Prend ce qui est dispo (0 à 960)
if read > 0 {
process_audio(&buffer[..read]); // Traite la taille réelle
}
// Lecture batch - Mode blocking (frame exact requis)
let mut buffer = vec![0i16; 960];
let read = reader.pop_slice_blocking(&mut buffer); // Remplit EXACTEMENT le buffer
assert_eq!(read, buffer.len()); // Toujours vrai
encode_fixed_frame(&buffer); // Encodeur exigeant 960 samples
// ⭐ Lecture batch - Mode wait (MEILLEUR DES DEUX)
let mut buffer = vec![0i16; 960];
let read = reader.pop_slice_wait(&mut buffer); // Prend dispo, bloque SEULEMENT si vide
if read > 0 {
process_flexible_audio(&buffer[..read]); // Taille variable OK
}
// Lecture avec timeout
let read = reader.pop_slice_wait_timeout(&mut buffer, Duration::from_millis(10));
match read {
0 => println!("Timeout - pas de données"),
n => process_audio(&buffer[..n]),
}
CAS D'USAGE PAR DOMAINE :
========================
🎵 AUDIO TEMPS RÉEL :
-------------------
// Thread capture (producteur temps réel)
move |data: &[i16], _info| {
writer.push_slice_overwrite(data); // ✅ Jamais bloque
}
// Thread encodage (consommateur temps réel)
loop {
let mut frame = vec![0i16; 960]; // 20ms frame
let samples = reader.pop_slice_wait(&mut frame); // ✅ Prend dispo, attend si vide
if samples >= 480 { // Au moins 10ms
encode_opus(&frame[..samples]);
} else if samples > 0 {
frame[samples..].fill(0); // Padding silence
encode_opus(&frame);
}
}
// Thread playback (deadline critique)
move |output: &mut [i16], _info| {
let read = reader.pop_slice(output); // ✅ Non-bloquant
if read < output.len() {
output[read..].fill(0); // Underrun -> silence
}
}
📊 TRAITEMENT BATCH NON-CRITIQUE :
---------------------------------
// Thread analyse (peut bloquer)
loop {
let mut chunk = vec![0i16; 4800]; // 100ms de données
let read = reader.pop_slice_blocking(&mut chunk); // ✅ OK de bloquer
analyze_frequency_spectrum(&chunk[..read]); // Traitement lourd
}
💾 SAUVEGARDE FICHIER :
----------------------
let mut file = File::create("recording.raw")?;
loop {
let mut buffer = vec![0i16; 8192];
let read = reader.pop_slice_blocking(&mut buffer);
if read == 0 { break; } // EOF
let bytes = bytemuck::cast_slice(&buffer[..read]);
file.write_all(bytes)?; // Écrit séquentiellement
}
🌐 RÉSEAU AVEC BUFFERISATION :
-----------------------------
// Thread envoi réseau
loop {
let mut packet = vec![0u8; 1400]; // MTU Ethernet
let read = reader.pop_slice_wait(&mut packet);
if read > 0 {
udp_socket.send_to(&packet[..read], addr)?;
}
}
// Thread réception réseau
loop {
let mut buffer = [0u8; 1500];
let (size, _addr) = udp_socket.recv_from(&mut buffer)?;
writer.push_slice_overwrite(&buffer[..size]); // Peut perdre paquets
}
PATTERNS AVANCÉS :
=================
🔄 MULTI-PRODUCTEUR, MULTI-CONSOMMATEUR :
----------------------------------------
let buffer = RingBuffer::<i16>::new(32768);
// Plusieurs producteurs (ex: micros)
let mic1_writer = buffer.clone().writer();
let mic2_writer = buffer.clone().writer();
// Plusieurs consommateurs (ex: encodage + stats)
let encoder_reader = buffer.clone().reader();
let stats_reader = buffer.clone().reader();
🏭 PIPELINE AUDIO COMPLEXE :
---------------------------
// Capture -> Filtre -> Encodage -> Réseau
let raw_buffer = RingBuffer::<i16>::new(16384);
let filtered_buffer = RingBuffer::<i16>::new(16384);
// Thread 1: Capture
std::thread::spawn({
let writer = raw_buffer.writer();
move || {
loop {
let samples = capture_audio();
writer.push_slice_overwrite(&samples);
}
}
});
// Thread 2: Filtrage
std::thread::spawn({
let reader = raw_buffer.reader();
let writer = filtered_buffer.writer();
move || {
let mut buffer = vec![0i16; 480];
loop {
let read = reader.pop_slice_wait(&mut buffer);
let filtered = apply_noise_reduction(&buffer[..read]);
writer.push_slice_overwrite(&filtered);
}
}
});
// Thread 3: Encodage + Réseau
std::thread::spawn({
let reader = filtered_buffer.reader();
move || {
let mut buffer = vec![0i16; 960];
loop {
let read = reader.pop_slice_wait(&mut buffer);
let encoded = encode_opus(&buffer[..read]);
send_to_network(&encoded);
}
}
});
OPTIMISATIONS ET BONNES PRATIQUES :
==================================
📏 SIZING DU BUFFER :
--------------------
// Calcul pour audio 48kHz, latence 100ms
const SAMPLE_RATE: usize = 48000;
const LATENCY_MS: usize = 100;
let buffer_size = (SAMPLE_RATE * LATENCY_MS / 1000).next_power_of_two();
let (writer, reader) = ringbuf::<i16>(buffer_size);
💾 GESTION MÉMOIRE :
-------------------
// ✅ Réutiliser les buffers
let mut reusable_buffer = vec![0i16; 960];
loop {
let read = reader.pop_slice(&mut reusable_buffer);
if read > 0 {
process_audio(&reusable_buffer[..read]); // Pas d'allocation
}
}
// ❌ Éviter allocations répétées
loop {
let read = reader.pop_slice_wait(&mut vec![0i16; 960]); // ❌ Alloc à chaque tour
}
📊 MONITORING ET SANTÉ :
-----------------------
// Surveillance utilisation buffer
let usage = buffer.len() as f32 / buffer.capacity() as f32;
match usage {
x if x > 0.9 => println!("⚠️ Buffer presque plein: {:.1}%", x * 100.0),
x if x < 0.1 => println!(" Buffer presque vide: {:.1}%", x * 100.0),
_ => {} // OK
}
// Qualité adaptative selon charge
let quality = match usage {
x if x > 0.8 => AudioQuality::Low, // Réduire latence
x if x < 0.2 => AudioQuality::High, // Augmenter qualité
_ => AudioQuality::Medium,
};
TABLEAU RÉCAPITULATIF DES MÉTHODES :
===================================
| CONTEXTE | ÉCRITURE | LECTURE | RAISON |
|-----------------------|-----------------------|-----------------------|---------------------------|
| Audio temps réel | push_slice_overwrite | pop_slice_wait | Performance + réactivité |
| Callback critique | push_slice_overwrite | pop_slice | Jamais bloquer |
| Traitement batch | push_slice | pop_slice_blocking | Garantie complétude |
| Réseau | push_slice_overwrite | pop_slice_wait | Robustesse + efficacité |
| Sauvegarde fichier | push_slice | pop_slice_blocking | Intégrité données |
| Pipeline flexibile | push_slice_overwrite | pop_slice_wait | Optimal général |
🏆 VOTRE RINGBUFFER = PUISSANCE DES CHANNELS + PERFORMANCE ZERO-COPY ! 🚀
*/