From 0e9c2b08d668687babce79d80ca1b95308764890 Mon Sep 17 00:00:00 2001 From: Nell Date: Sat, 19 Jul 2025 03:45:57 +0200 Subject: [PATCH] init --- .idea/workspace.xml | 161 ++++++ Cargo.lock | 25 + Cargo.toml | 5 +- src/app/app.rs | 8 +- src/domain/client.rs | 186 +++++-- src/domain/event.rs | 9 +- src/runtime/dispatcher.rs | 41 +- src/utils/byte_utils.rs | 398 ------------- src/utils/mod.rs | 2 +- src/utils/shared_store.rs | 1114 +++++++++++++++++++++++++++++++++++++ 10 files changed, 1487 insertions(+), 462 deletions(-) create mode 100644 .idea/workspace.xml delete mode 100644 src/utils/byte_utils.rs create mode 100644 src/utils/shared_store.rs diff --git a/.idea/workspace.xml b/.idea/workspace.xml new file mode 100644 index 0000000..2a7a416 --- /dev/null +++ b/.idea/workspace.xml @@ -0,0 +1,161 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + { + "associatedIndex": 5 +} + + + + { + "keyToString": { + "Cargo.Run.executor": "Run", + "ModuleVcsDetector.initialDetectionPerformed": "true", + "RunOnceActivity.ShowReadmeOnStart": "true", + "RunOnceActivity.git.unshallow": "true", + "RunOnceActivity.rust.reset.selective.auto.import": "true", + "git-widget-placeholder": "master", + "ignore.virus.scanning.warn.message": "true", + "junie.onboarding.icon.badge.shown": "true", + "last_opened_file_path": "D:/Dev/ox_speak_server/src/network", + "node.js.detected.package.eslint": "true", + "node.js.detected.package.tslint": "true", + "node.js.selected.package.eslint": "(autodetect)", + "node.js.selected.package.tslint": "(autodetect)", + "nodejs_package_manager_path": "npm", + "org.rust.cargo.project.model.PROJECT_DISCOVERY": "true", + "org.rust.cargo.project.model.impl.CargoExternalSystemProjectAware.subscribe.first.balloon": "", + "org.rust.first.attach.projects": "true", + "settings.editor.selected.configurable": "terminal", + "vue.rearranger.settings.migration": "true" + } +} + + + + + + + + + + + + + + + 1751970990022 + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 8e37241..b374f53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,12 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "autocfg" version = "1.5.0" @@ -102,6 +108,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + [[package]] name = "getrandom" version = "0.3.3" @@ -159,6 +171,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kanal" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e3953adf0cd667798b396c2fa13552d6d9b3269d7dd1154c4c416442d1ff574" +dependencies = [ + "futures-core", + "lock_api", +] + [[package]] name = "libc" version = "0.2.174" @@ -226,9 +248,12 @@ checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" name = "ox_speak_server" version = "0.1.0" dependencies = [ + "arc-swap", "bytes", + "crossbeam-utils", "dashmap", "event-listener", + "kanal", "parking_lot", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 3a73b3c..806b11b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,4 +22,7 @@ strum = {version = "0.27", features = ["derive"] } uuid = {version = "1.17", features = ["v4", "serde"] } event-listener = "5.4" dashmap = "6.1" -bytes = "1.10" \ No newline at end of file +bytes = "1.10" +arc-swap = "1.7" +crossbeam-utils = "0.8" +kanal = "0.1" \ No newline at end of file diff --git a/src/app/app.rs b/src/app/app.rs index e9af63c..62788f2 100644 --- a/src/app/app.rs +++ b/src/app/app.rs @@ -10,7 +10,7 @@ pub struct App { // Communication inter-components event_bus: EventBus, dispatcher: Dispatcher, - event_rx: Option>, + event_rx: kanal::AsyncReceiver, // Network udp_server: UdpServer, @@ -32,20 +32,22 @@ impl App { Self { event_bus, dispatcher, - event_rx: Some(event_rx), + event_rx, udp_server, client_manager } } pub async fn start(&mut self) { - if let Some(event_rx) = self.event_rx.take() { + for i in 0..4 { let dispatcher = self.dispatcher.clone(); + let event_rx = self.event_rx.clone(); tokio::spawn(async move { dispatcher.start(event_rx).await; }); } + let _ = self.udp_server.start().await; let _ = self.tick_tasks().await; println!("App started"); diff --git a/src/domain/client.rs b/src/domain/client.rs index b62abc6..599700f 100644 --- a/src/domain/client.rs +++ b/src/domain/client.rs @@ -1,34 +1,40 @@ + //! Gestion des clients pour les connexions UDP //! //! Ce module fournit les structures et méthodes pour gérer les clients //! connectés au serveur UDP, incluant leur tracking et leurs modifications. +use std::collections::HashSet; use std::net::SocketAddr; use std::sync::Arc; -use dashmap::DashMap; use tokio::time::Instant; use uuid::Uuid; use std::hash::{Hash, Hasher}; use std::time::Duration; +use crossbeam_utils::atomic::AtomicCell; +use crate::utils::shared_store::SharedArcMap; /// Représente un client connecté au serveur UDP /// /// Chaque client est identifié par un UUID unique et contient /// son adresse réseau ainsi que l'heure de sa dernière activité. +/// Le `last_seen` utilise AtomicCell pour des mises à jour lock-free. #[derive(Debug)] pub struct Client { id: Uuid, address: SocketAddr, - last_seen: Instant, + last_seen: AtomicCell, } /// Gestionnaire threadsafe pour les clients connectés /// -/// Utilise `DashMap` pour permettre un accès concurrent sécurisé -/// aux clients depuis plusieurs threads. +/// Utilise `SharedArcMap` pour permettre un accès concurrent sécurisé +/// aux clients depuis plusieurs threads avec des performances optimisées +/// pour les lectures fréquentes. Les clients sont stockés dans des Arc +/// pour éviter les clones coûteux. #[derive(Clone)] pub struct ClientManager { - clients: Arc>, + udp_clients: SharedArcMap, } impl Client { @@ -38,7 +44,7 @@ impl Client { Self { id, address, - last_seen: Instant::now(), + last_seen: AtomicCell::new(Instant::now()), } } @@ -53,13 +59,15 @@ impl Client { } /// Retourne l'instant de la dernière activité du client + /// Accès lock-free grâce à AtomicCell pub fn last_seen(&self) -> Instant { - self.last_seen + self.last_seen.load() } /// Met à jour l'heure de dernière activité du client à maintenant - pub fn update_last_seen(&mut self) { - self.last_seen = Instant::now(); + /// Opération lock-free grâce à AtomicCell + pub fn update_last_seen(&self) { + self.last_seen.store(Instant::now()); } } @@ -81,53 +89,96 @@ impl ClientManager { /// Crée un nouveau gestionnaire de clients vide pub fn new() -> Self { Self { - clients: Arc::new(DashMap::new()), + udp_clients: SharedArcMap::new(), + } + } + + /// Crée un nouveau gestionnaire de clients avec une capacité initiale + pub fn with_capacity(capacity: usize) -> Self { + Self { + udp_clients: SharedArcMap::with_capacity(capacity), } } /// Ajoute un client au gestionnaire - pub fn add(&self, client: Client) { - self.clients.insert(client.address(), client); + /// Retourne l'ancien client Arc s'il existait déjà + pub fn add(&self, client: Client) -> Option> { + self.udp_clients.insert(client.address(), client) } - /// Supprime un client du gestionnaire - pub fn remove(&self, client: Client) { - self.clients.remove(&client.address()); + /// Supprime un client du gestionnaire par son adresse + /// Retourne l'Arc du client supprimé + pub fn remove(&self, address: SocketAddr) -> Option> { + self.udp_clients.remove(&address) + } + + /// Supprime un client du gestionnaire par son instance + /// Retourne l'Arc du client supprimé + pub fn remove_client(&self, client: &Client) -> Option> { + self.udp_clients.remove(&client.address()) } /// Vérifie si un client existe pour une adresse donnée pub fn client_exists(&self, address: SocketAddr) -> bool { - self.clients.contains_key(&address) + self.udp_clients.contains_key(&address) } - /// Récupère une référence vers un client par son adresse - pub fn get_client_by_address(&self, address: SocketAddr) -> Option> { - self.clients.get(&address) + /// Récupère un Arc vers le client par son adresse + /// Très efficace - pas de clone du Client + pub fn get_client_by_address(&self, address: SocketAddr) -> Option> { + self.udp_clients.get(&address) + } + + pub fn get_uuid_by_address(&self, address: SocketAddr) -> Option { + self.udp_clients.get(&address).map(|client| client.id) } /// Récupère toutes les adresses des clients connectés - pub fn get_all_adresses(&self) -> Vec { - self.clients.iter().map(|entry| *entry.key()).collect() + pub fn get_all_addresses(&self) -> HashSet { + self.udp_clients.keys().collect() + } + + /// Retourne le nombre de clients connectés + pub fn len(&self) -> usize { + self.udp_clients.len() + } + + /// Vérifie si le gestionnaire est vide + pub fn is_empty(&self) -> bool { + self.udp_clients.is_empty() } /// Met à jour l'heure de dernière activité d'un client - pub fn update_client_last_seen(&self, address: SocketAddr) { - if let Some(mut client) = self.clients.get_mut(&address) { + /// Utilise la méthode modify de SharedArcMap pour une opération lock-free + pub fn update_client_last_seen(&self, address: SocketAddr) -> bool { + self.udp_clients.modify(&address, |client| { client.update_last_seen(); + }) + } + + /// Supprime les clients trop vieux + /// Utilise l'accès lock-free pour identifier les clients à supprimer + pub fn cleanup(&self, max_age: Duration) { + let now = Instant::now(); + let addresses_to_remove: Vec = self + .udp_clients + .read() + .iter() + .filter(|(_, client)| now - client.last_seen() >= max_age) + .map(|(addr, _)| *addr) + .collect(); + + for address in addresses_to_remove { + self.udp_clients.remove(&address); } } - /// Supprimer les clients trop vieux - pub fn cleanup(&self, max_age: Duration) { - let now = Instant::now(); - self.clients.retain(|_, client| now - client.last_seen() < max_age); - } - /// Modifie un client via une closure + /// Utilise la méthode modify optimisée de SharedArcMap /// /// # Arguments /// * `address` - L'adresse du client à modifier - /// * `f` - La closure qui recevra une référence mutable vers le client + /// * `f` - La closure qui recevra une référence vers le client /// /// # Returns /// `true` si le client a été trouvé et modifié, `false` sinon @@ -137,26 +188,77 @@ impl ClientManager { /// let client_manager = ClientManager::new(); /// let addr = "127.0.0.1:8080".parse().unwrap(); /// - /// // Mise à jour simple + /// // Mise à jour de last_seen (lock-free) /// client_manager.modify_client(addr, |client| { /// client.update_last_seen(); /// }); /// - /// // Modifications multiples - /// let success = client_manager.modify_client(addr, |client| { - /// client.update_last_seen(); - /// // autres modifications... + /// // Accès aux propriétés du client + /// let found = client_manager.modify_client(addr, |client| { + /// println!("Client ID: {}", client.id()); + /// println!("Dernière activité: {:?}", client.last_seen()); /// }); /// ``` pub fn modify_client(&self, address: SocketAddr, f: F) -> bool where - F: FnOnce(&mut Client), + F: FnOnce(&Client), { - if let Some(mut client) = self.clients.get_mut(&address) { - f(&mut *client); - true - } else { - false - } + self.udp_clients.modify(&address, f) + } + + /// Obtient une référence en lecture seule vers tous les clients + /// Accès lock-free ultra-rapide + pub fn read_all(&self) -> Arc>> { + self.udp_clients.read() + } + + /// Vide tous les clients + pub fn clear(&self) { + self.udp_clients.clear(); + } + + /// Itère sur tous les clients avec leurs Arc + /// Très efficace - pas de clone des clients + pub fn iter(&self) -> impl Iterator)> { + self.udp_clients.iter() + } + + /// Itère sur toutes les adresses des clients + pub fn addresses(&self) -> impl Iterator { + self.udp_clients.keys() + } + + /// Itère sur tous les Arc des clients + pub fn clients(&self) -> impl Iterator> { + self.udp_clients.values() + } + + /// Compte le nombre de clients actifs dans une durée donnée + /// Utilise l'accès lock-free pour une performance optimale + pub fn count_active_clients(&self, max_age: Duration) -> usize { + let now = Instant::now(); + self.udp_clients + .read() + .iter() + .filter(|(_, client)| now - client.last_seen() < max_age) + .count() + } + + /// Obtient les adresses des clients actifs + /// Très efficace grâce à l'accès lock-free + pub fn get_active_addresses(&self, max_age: Duration) -> Vec { + let now = Instant::now(); + self.udp_clients + .read() + .iter() + .filter(|(_, client)| now - client.last_seen() < max_age) + .map(|(addr, _)| *addr) + .collect() + } +} + +impl Default for ClientManager { + fn default() -> Self { + Self::new() } } \ No newline at end of file diff --git a/src/domain/event.rs b/src/domain/event.rs index 1b6e15f..1f0af95 100644 --- a/src/domain/event.rs +++ b/src/domain/event.rs @@ -1,4 +1,3 @@ -use tokio::sync::mpsc; use crate::network::protocol::{UDPMessage}; #[derive(Clone, Debug)] @@ -16,12 +15,12 @@ pub enum Event { #[derive(Clone)] pub struct EventBus { - pub sender: mpsc::Sender, + pub sender: kanal::AsyncSender, } impl EventBus { - pub fn new() -> (Self, mpsc::Receiver) { - let (sender, receiver) = mpsc::channel(10000); + pub fn new() -> (Self, kanal::AsyncReceiver) { + let (sender, receiver) = kanal::bounded_async::(4096); (Self { sender }, receiver) } @@ -33,7 +32,7 @@ impl EventBus { let _ = self.sender.try_send(event); } - pub fn clone_sender(&self) -> mpsc::Sender { + pub fn clone_sender(&self) -> kanal::AsyncSender { self.sender.clone() } } \ No newline at end of file diff --git a/src/runtime/dispatcher.rs b/src/runtime/dispatcher.rs index 1925777..2ee5766 100644 --- a/src/runtime/dispatcher.rs +++ b/src/runtime/dispatcher.rs @@ -1,32 +1,36 @@ +use std::sync::Arc; +use std::thread; use std::time::Duration; use tokio::sync::mpsc; use tokio::task::AbortHandle; -use crate::domain::client::ClientManager; +use crate::domain::client::{Client, ClientManager}; use crate::domain::event::{Event, EventBus}; -use crate::network::protocol::{UDPMessageType, UDPMessage}; +use crate::network::protocol::{UDPMessageType, UDPMessage, UdpBroadcastMessage, UDPMessageData}; use crate::network::udp::UdpServer; #[derive(Clone)] pub struct Dispatcher { - event_bus: EventBus, + event_bus: Arc, - udp_server: UdpServer, - client_manager: ClientManager + udp_server: Arc, + client_manager: Arc } impl Dispatcher { pub async fn new(event_bus: EventBus, udp_server: UdpServer, client_manager: ClientManager) -> Self { Self { - event_bus, - udp_server, - client_manager, + event_bus: Arc::new(event_bus), + udp_server: Arc::new(udp_server), + client_manager: Arc::new(client_manager), } } - pub async fn start(&self, mut receiver: mpsc::Receiver) { + pub async fn start(&self, receiver: kanal::AsyncReceiver) { + println!("Dispatcher démarré sur le thread : {:?}", std::thread::current().id()); + let (_udp_in_abort_handle, udp_in_sender) = self.udp_in_handler().await; - while let Some(event) = receiver.recv().await { + while let Ok(event) = receiver.recv().await { match event { Event::UdpIn(message) => { let _ = udp_in_sender.send(message).await; @@ -62,6 +66,7 @@ impl Dispatcher { pub async fn udp_in_handler(&self) -> (AbortHandle, mpsc::Sender) { let (sender, mut consumer) = mpsc::channel::(1024); let udp_server = self.udp_server.clone(); + let client_manager = self.client_manager.clone(); let task = tokio::spawn(async move { while let Some(message) = consumer.recv().await { @@ -69,10 +74,22 @@ impl Dispatcher { match message.message_type() { UDPMessageType::Ping => { let response_message = UDPMessage::server_ping(message.address, message.get_message_id().unwrap()); - let _ = udp_server.send_udp_message(&response_message); + + if client_manager.client_exists(message.address) { + client_manager.update_client_last_seen(message.address); + }else { + client_manager.add(Client::new(message.address)); + } + let _ = udp_server.send_udp_message(&response_message).await; } UDPMessageType::Audio => { - // Traiter l'audio + if let UDPMessageData::ClientAudio { sequence, data } = &message.data { + let addresses = client_manager.get_all_addresses(); + let speaker_uuid = client_manager.get_uuid_by_address(message.address).unwrap(); + let response_message = UdpBroadcastMessage::server_audio(addresses, speaker_uuid, *sequence, data.clone()); + let _ = udp_server.broadcast_udp_message(&response_message).await; + } + } } } diff --git a/src/utils/byte_utils.rs b/src/utils/byte_utils.rs deleted file mode 100644 index 13d5ebe..0000000 --- a/src/utils/byte_utils.rs +++ /dev/null @@ -1,398 +0,0 @@ -use uuid::Uuid; - -/// Helpers pour la manipulation de bytes - idéal pour les protocoles binaires -/// -/// Cette structure permet de lire séquentiellement des données binaires -/// en maintenant une position de lecture interne. -pub struct ByteReader<'a> { - /// Référence vers les données à lire - data: &'a [u8], - /// Position actuelle dans le buffer de lecture - position: usize, -} - -impl<'a> ByteReader<'a> { - /// Crée un nouveau lecteur de bytes à partir d'un slice - /// - /// # Arguments - /// * `data` - Le slice de bytes à lire - /// - /// # Example - /// ```text - /// let data = &[0x01, 0x02, 0x03, 0x04]; - /// let reader = ByteReader::new(data); - /// ``` - pub fn new(data: &'a [u8]) -> Self { - Self { data, position: 0 } - } - - /// Retourne le nombre de bytes restants à lire - /// - /// Utilise `saturating_sub` pour éviter les débordements - /// si la position dépasse la taille des données - pub fn remaining(&self) -> usize { - self.data.len().saturating_sub(self.position) - } - - /// Vérifie si tous les bytes ont été lus - /// - /// # Returns - /// `true` si il n'y a plus de bytes à lire - pub fn is_empty(&self) -> bool { - self.remaining() == 0 - } - - /// Retourne la position actuelle de lecture - pub fn position(&self) -> usize { - self.position - } - - /// Déplace la position de lecture à l'index spécifié - /// - /// La position est automatiquement limitée à la taille des données - /// pour éviter les débordements - /// - /// # Arguments - /// * `position` - Nouvelle position de lecture - pub fn seek(&mut self, position: usize) { - self.position = position.min(self.data.len()); - } - - /// Lit un byte (u8) à la position actuelle - /// - /// # Returns - /// * `Ok(u8)` - La valeur lue si disponible - /// * `Err(&'static str)` - Si la fin du buffer est atteinte - pub fn read_u8(&mut self) -> Result { - if self.position < self.data.len() { - let value = self.data[self.position]; - self.position += 1; - Ok(value) - } else { - Err("Not enough data for u8") - } - } - - /// Lit un entier 16-bit en big-endian - /// - /// # Returns - /// * `Ok(u16)` - La valeur lue si 2 bytes sont disponibles - /// * `Err(&'static str)` - Si moins de 2 bytes sont disponibles - pub fn read_u16_be(&mut self) -> Result { - if self.remaining() >= 2 { - let value = u16::from_be_bytes([ - self.data[self.position], - self.data[self.position + 1], - ]); - self.position += 2; - Ok(value) - } else { - Err("Not enough data for u16") - } - } - - /// Lit un entier 32-bit en big-endian - /// - /// # Returns - /// * `Ok(u32)` - La valeur lue si 4 bytes sont disponibles - /// * `Err(&'static str)` - Si moins de 4 bytes sont disponibles - pub fn read_u32_be(&mut self) -> Result { - if self.remaining() >= 4 { - let value = u32::from_be_bytes([ - self.data[self.position], - self.data[self.position + 1], - self.data[self.position + 2], - self.data[self.position + 3], - ]); - self.position += 4; - Ok(value) - } else { - Err("Not enough data for u32") - } - } - - /// Lit un entier 64-bit en big-endian - /// - /// # Returns - /// * `Ok(u64)` - La valeur lue si 8 bytes sont disponibles - /// * `Err(&'static str)` - Si moins de 8 bytes sont disponibles - pub fn read_u64_be(&mut self) -> Result { - if self.remaining() >= 8 { - let value = u64::from_be_bytes([ - self.data[self.position], - self.data[self.position + 1], - self.data[self.position + 2], - self.data[self.position + 3], - self.data[self.position + 4], - self.data[self.position + 5], - self.data[self.position + 6], - self.data[self.position + 7], - ]); - self.position += 8; - Ok(value) - } else { - Err("Not enough data for u64") - } - } - - /// Lit un UUID (16 bytes) à la position actuelle - /// - /// Les UUIDs sont stockés sous forme de 16 bytes consécutifs. - /// Cette méthode lit ces 16 bytes et les convertit en UUID. - /// - /// # Returns - /// * `Ok(Uuid)` - L'UUID lu si 16 bytes sont disponibles - /// * `Err(&'static str)` - Si moins de 16 bytes sont disponibles - pub fn read_uuid(&mut self) -> Result { - if self.remaining() >= 16 { - let uuid_bytes = self.read_fixed_bytes::<16>()?; - Ok(Uuid::from_bytes(uuid_bytes)) - } else { - Err("Not enough data for UUID") - } - } - - - /// Lit une séquence de bytes de longueur spécifiée - /// - /// # Arguments - /// * `len` - Nombre de bytes à lire - /// - /// # Returns - /// * `Ok(&[u8])` - Slice des bytes lus si disponibles - /// * `Err(&'static str)` - Si pas assez de bytes disponibles - pub fn read_bytes(&mut self, len: usize) -> Result<&'a [u8], &'static str> { - if self.remaining() >= len { - let slice = &self.data[self.position..self.position + len]; - self.position += len; - Ok(slice) - } else { - Err("Not enough data for bytes") - } - } - - /// Lit un tableau de bytes de taille fixe définie à la compilation - /// - /// Utilise les generics const pour définir la taille du tableau - /// - /// # Returns - /// * `Ok([u8; N])` - Tableau de bytes lu si disponible - /// * `Err(&'static str)` - Si pas assez de bytes disponibles - pub fn read_fixed_bytes(&mut self) -> Result<[u8; N], &'static str> { - if self.remaining() >= N { - let mut array = [0u8; N]; - array.copy_from_slice(&self.data[self.position..self.position + N]); - self.position += N; - Ok(array) - } else { - Err("Not enough data for fixed bytes") - } - } - - /// Lit tous les bytes restants dans le buffer - /// - /// Après cet appel, le reader sera vide (position = taille totale) - /// - /// # Returns - /// Slice contenant tous les bytes restants - pub fn read_remaining(&mut self) -> &'a [u8] { - let slice = &self.data[self.position..]; - self.position = self.data.len(); - slice - } -} - -/// Structure pour construire séquentiellement des données binaires -/// -/// Contrairement à ByteReader, cette structure possède ses propres données -/// et permet d'écrire des valeurs de différents types. -pub struct ByteWriter { - /// Buffer interne pour stocker les données écrites - data: Vec, -} - -impl ByteWriter { - /// Crée un nouveau writer avec un Vec vide - pub fn new() -> Self { - Self { data: Vec::new() } - } - - /// Crée un nouveau writer avec une capacité pré-allouée - /// - /// Utile pour éviter les réallocations si la taille finale - /// est approximativement connue - /// - /// # Arguments - /// * `capacity` - Capacité initiale du buffer - pub fn with_capacity(capacity: usize) -> Self { - Self { - data: Vec::with_capacity(capacity), - } - } - - /// Écrit un byte (u8) dans le buffer - /// - /// # Arguments - /// * `value` - Valeur à écrire - pub fn write_u8(&mut self, value: u8) { - self.data.push(value); - } - - /// Écrit un entier 16-bit en big-endian - /// - /// # Arguments - /// * `value` - Valeur à écrire - pub fn write_u16_be(&mut self, value: u16) { - self.data.extend_from_slice(&value.to_be_bytes()); - } - - /// Écrit un entier 32-bit en big-endian - /// - /// # Arguments - /// * `value` - Valeur à écrire - pub fn write_u32_be(&mut self, value: u32) { - self.data.extend_from_slice(&value.to_be_bytes()); - } - - /// Écrit un entier 64-bit en big-endian - /// - /// # Arguments - /// * `value` - Valeur à écrire - pub fn write_u64_be(&mut self, value: u64) { - self.data.extend_from_slice(&value.to_be_bytes()); - } - - /// Écrit une séquence de bytes dans le buffer - /// - /// # Arguments - /// * `bytes` - Slice de bytes à écrire - pub fn write_bytes(&mut self, bytes: &[u8]) { - self.data.extend_from_slice(bytes); - } - - /// Écrit un tableau de bytes de taille fixe - /// - /// # Arguments - /// * `bytes` - Tableau de bytes à écrire - pub fn write_fixed_bytes(&mut self, bytes: [u8; N]) { - self.data.extend_from_slice(&bytes); - } - - /// Consomme le writer et retourne le Vec contenant les données - /// - /// # Returns - /// Vec contenant toutes les données écrites - pub fn into_vec(self) -> Vec { - self.data - } - - /// Retourne une référence vers les données sous forme de slice - /// - /// # Returns - /// Slice des données écrites - pub fn as_slice(&self) -> &[u8] { - &self.data - } - - /// Retourne la taille actuelle du buffer - pub fn len(&self) -> usize { - self.data.len() - } - - /// Vérifie si le buffer est vide - pub fn is_empty(&self) -> bool { - self.data.is_empty() - } -} - -/// Implémentation du trait Default pour ByteWriter -/// -/// Permet d'utiliser ByteWriter::default() comme équivalent de ByteWriter::new() -impl Default for ByteWriter { - fn default() -> Self { - Self::new() - } -} - -/// Fonctions utilitaires standalone pour la lecture directe sans état -/// -/// Ces fonctions permettent de lire des valeurs à des offsets spécifiques -/// sans avoir besoin de créer un ByteReader. - -/// Lit un byte à l'offset spécifié -/// -/// # Arguments -/// * `data` - Slice de données source -/// * `offset` - Position de lecture -/// -/// # Returns -/// * `Some(u8)` - Valeur lue si l'offset est valide -/// * `None` - Si l'offset dépasse la taille des données -pub fn read_u8_at(data: &[u8], offset: usize) -> Option { - data.get(offset).copied() -} - -/// Lit un entier 16-bit big-endian à l'offset spécifié -/// -/// # Arguments -/// * `data` - Slice de données source -/// * `offset` - Position de lecture -/// -/// # Returns -/// * `Some(u16)` - Valeur lue si 2 bytes sont disponibles à l'offset -/// * `None` - Si pas assez de bytes disponibles -pub fn read_u16_be_at(data: &[u8], offset: usize) -> Option { - if data.len() >= offset + 2 { - Some(u16::from_be_bytes([data[offset], data[offset + 1]])) - } else { - None - } -} - -/// Lit un entier 32-bit big-endian à l'offset spécifié -/// -/// # Arguments -/// * `data` - Slice de données source -/// * `offset` - Position de lecture -/// -/// # Returns -/// * `Some(u32)` - Valeur lue si 4 bytes sont disponibles à l'offset -/// * `None` - Si pas assez de bytes disponibles -pub fn read_u32_be_at(data: &[u8], offset: usize) -> Option { - if data.len() >= offset + 4 { - Some(u32::from_be_bytes([ - data[offset], - data[offset + 1], - data[offset + 2], - data[offset + 3], - ])) - } else { - None - } -} - -/// Lit un entier 64-bit big-endian à l'offset spécifié -/// -/// # Arguments -/// * `data` - Slice de données source -/// * `offset` - Position de lecture -/// -/// # Returns -/// * `Some(u64)` - Valeur lue si 8 bytes sont disponibles à l'offset -/// * `None` - Si pas assez de bytes disponibles -pub fn read_u64_be_at(data: &[u8], offset: usize) -> Option { - if data.len() >= offset + 8 { - Some(u64::from_be_bytes([ - data[offset], - data[offset + 1], - data[offset + 2], - data[offset + 3], - data[offset + 4], - data[offset + 5], - data[offset + 6], - data[offset + 7], - ])) - } else { - None - } -} \ No newline at end of file diff --git a/src/utils/mod.rs b/src/utils/mod.rs index a238d3f..cf747ee 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1 +1 @@ -pub mod byte_utils; \ No newline at end of file +pub mod shared_store; \ No newline at end of file diff --git a/src/utils/shared_store.rs b/src/utils/shared_store.rs new file mode 100644 index 0000000..190b403 --- /dev/null +++ b/src/utils/shared_store.rs @@ -0,0 +1,1114 @@ +// high read performance collections thread safe +// !!!! ATTENTION !!!! +// Ce fichier est vraiment fait pour la haute performance en lecture, dès qu'on fait une modif, ça réécris la collection entière +// Mais ça veut aussi dire que chaque élément stocker doit penser pour être thread safe +// (en soit, ce module est fait pour sécuriser la collection en elle même) + +use std::collections::{HashMap, HashSet}; +use std::hash::Hash; +use std::sync::Arc; +use arc_swap::ArcSwap; + +/// SharedVec - Vec thread-safe optimisé pour les lectures fréquentes +/// +/// Exemples d'usage : +/// ```ignore +/// // Gestion de la liste des clients connectés +/// let clients = SharedVec::new(); +/// clients.push(client_socket); +/// clients.push(another_client); +/// +/// // Lecture ultra-rapide depuis n'importe quel thread +/// let client_count = clients.len(); +/// let first_client = clients.get(0); +/// +/// // Broadcast à tous les clients +/// for client in clients.iter() { +/// send_message(client); +/// } +/// ``` +#[derive(Debug)] +pub struct SharedVec { + inner: Arc>> +} + +/// SharedHashSet - HashSet thread-safe optimisé pour les lectures fréquentes +/// +/// Exemples d'usage : +/// ```ignore +/// // Gestion des utilisateurs connectés (pas de doublons) +/// let online_users = SharedHashSet::new(); +/// online_users.insert(user_id); +/// +/// // Vérification ultra-rapide +/// if online_users.contains(&user_id) { +/// println!("Utilisateur en ligne !"); +/// } +/// +/// // Opérations sur les ensembles +/// let banned_users = SharedHashSet::new(); +/// let can_speak = online_users.difference(&banned_users.to_hashset()); +/// ``` +pub struct SharedHashSet { + inner: Arc>> +} + +/// SharedMap - HashMap thread-safe optimisé pour les lectures fréquentes +/// +/// Exemples d'usage : +/// ```ignore +/// // Cache de sessions utilisateurs +/// let sessions = SharedMap::new(); +/// sessions.insert(session_id, user_data); +/// +/// // Lecture ultra-rapide +/// if let Some(user) = sessions.get(&session_id) { +/// println!("Session trouvée pour {}", user.name); +/// } +/// +/// // Configuration globale +/// let config = SharedMap::new(); +/// config.insert("max_users".to_string(), 100); +/// ``` +pub struct SharedMap { + inner: Arc>> +} + +// ===== COLLECTIONS ARC (HAUTE PERFORMANCE) ===== + +/// SharedArcVec - Vec thread-safe avec Arc automatique pour objets complexes +/// +/// Exemples d'usage : +/// ```ignore +/// // Gestion des clients connectés (pas de clone massif) +/// let clients: SharedArcVec = SharedArcVec::new(); +/// clients.push(Client::new(address)); +/// +/// // Accès ultra-rapide - Arc partagé +/// if let Some(client) = clients.get(0) { +/// client.update_last_seen(); // Modification directe ! +/// } +/// ``` +#[derive(Debug)] +pub struct SharedArcVec { + inner: Arc>>> +} + +/// SharedArcHashSet - HashSet thread-safe avec Arc automatique +/// +/// Exemples d'usage : +/// ```ignore +/// // Gestion des utilisateurs connectés (objets complexes) +/// let online_users: SharedArcHashSet = SharedArcHashSet::new(); +/// online_users.insert(User::new(user_id)); +/// +/// // Recherche ultra-rapide avec modification possible +/// if let Some(user) = online_users.get(&user_key) { +/// user.update_activity(); // Modification directe ! +/// } +/// ``` +pub struct SharedArcHashSet { + inner: Arc>>> +} + +/// SharedArcMap - HashMap thread-safe avec Arc automatique sur les valeurs +/// +/// Exemples d'usage : +/// ```ignore +/// // Cache de sessions utilisateurs (objets complexes) +/// let sessions: SharedArcMap = SharedArcMap::new(); +/// sessions.insert(session_id, UserSession::new(user_data)); +/// +/// // Accès ultra-rapide avec modification possible +/// if let Some(session) = sessions.get(&session_id) { +/// session.update_last_activity(); // Modification directe ! +/// } +/// ``` +pub struct SharedArcMap { + inner: Arc>>> +} + + +// ===== IMPLÉMENTATIONS COMMUNES ===== + +impl SharedVec { + /// Crée un nouveau SharedVec vide + /// + /// Exemple : `let message_queue = SharedVec::new();` + pub fn new() -> Self { + Self { + inner: Arc::new(ArcSwap::new(Arc::new(Vec::new()))) + } + } + + /// Crée un nouveau SharedVec avec une capacité initiale + /// CRUCIAL pour éviter les réallocations lors des writes + /// + /// Exemple : `let clients = SharedVec::with_capacity(1000);` + pub fn with_capacity(capacity: usize) -> Self { + Self { + inner: Arc::new(ArcSwap::new(Arc::new(Vec::with_capacity(capacity)))) + } + } + + /// Crée un nouveau SharedVec à partir d'un Vec existant + /// + /// Exemple : `let shared = SharedVec::from_vec(vec![1, 2, 3]);` + pub fn from_vec(vec: Vec) -> Self { + Self { + inner: Arc::new(ArcSwap::new(Arc::new(vec))) + } + } + + /// LECTURE ULTRA-RAPIDE - Accès direct lock-free + /// + /// Exemple : `let snapshot = clients.read(); // Arc>` + #[inline] + pub fn read(&self) -> Arc> { + self.inner.load_full() + } + + /// Count optimisé pour les lectures fréquentes + /// + /// Exemple : `println!("Clients connectés : {}", clients.len());` + #[inline] + pub fn len(&self) -> usize { + self.inner.load().len() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.inner.load().is_empty() + } + + /// Accès direct par index - ultra-rapide + /// + /// Exemple : `if let Some(first) = clients.get(0) { ... }` + #[inline] + pub fn get(&self, index: usize) -> Option + where + T: Clone, + { + self.inner.load().get(index).cloned() + } + + /// Accès unsafe ultra-rapide (si vous êtes sûr de l'index) + /// + /// Exemple : `let first = unsafe { clients.get_unchecked(0) };` + #[inline] + pub unsafe fn get_unchecked(&self, index: usize) -> T + where + T: Clone, + { + self.inner.load().get_unchecked(index).clone() + } + + /// Append - Optimisé pour minimiser les clones + /// + /// Exemple : `clients.push(new_client_socket);` + pub fn push(&self, item: T) + where + T: Clone, + { + let current = self.inner.load_full(); + let mut new_vec = Vec::with_capacity(current.len() + 1); + new_vec.clone_from(¤t); + new_vec.push(item); + self.inner.store(Arc::new(new_vec)); + } + + /// Append collection - Un seul remplacement atomique + /// + /// Exemple : `clients.extend(vec![client1, client2, client3]);` + pub fn extend(&self, items: I) + where + T: Clone, + I: IntoIterator, + { + let items: Vec = items.into_iter().collect(); + if items.is_empty() { + return; + } + + let current = self.inner.load_full(); + let mut new_vec = Vec::with_capacity(current.len() + items.len()); + new_vec.clone_from(¤t); + new_vec.extend(items); + self.inner.store(Arc::new(new_vec)); + } + + /// Remplacement complet - très efficace + /// + /// Exemple : `clients.replace(new_client_list);` + pub fn replace(&self, new_vec: Vec) { + self.inner.store(Arc::new(new_vec)); + } + + /// Clear optimisé + /// + /// Exemple : `clients.clear(); // Vide la liste` + pub fn clear(&self) { + self.inner.store(Arc::new(Vec::new())); + } + + /// Iteration lock-free + /// + /// Exemple : `for client in clients.iter() { send_ping(client); }` + pub fn iter(&self) -> impl Iterator + '_ + where + T: Clone, + { + let vec_ref = self.inner.load_full(); + (0..vec_ref.len()).map(move |i| vec_ref[i].clone()) + } +} + +// Clone gratuit (juste Arc::clone) +impl Clone for SharedVec { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner) + } + } +} + +impl Default for SharedVec { + fn default() -> Self { + Self::new() + } +} + +impl SharedHashSet +where + T: Hash + Eq + Clone, +{ + /// Crée un nouveau SharedHashSet vide + /// + /// Exemple : `let online_users = SharedHashSet::new();` + pub fn new() -> Self { + Self { + inner: Arc::new(ArcSwap::new(Arc::new(HashSet::new()))) + } + } + + /// Crée un nouveau SharedHashSet avec une capacité initiale + /// + /// Exemple : `let banned_users = SharedHashSet::with_capacity(100);` + pub fn with_capacity(capacity: usize) -> Self { + Self { + inner: Arc::new(ArcSwap::new(Arc::new(HashSet::with_capacity(capacity)))) + } + } + + /// Crée un nouveau SharedHashSet à partir d'un HashSet existant + /// + /// Exemple : `let shared = SharedHashSet::from_hashset(existing_set);` + pub fn from_hashset(set: HashSet) -> Self { + Self { + inner: Arc::new(ArcSwap::new(Arc::new(set))) + } + } + + /// LECTURE ULTRA-RAPIDE - Accès direct lock-free + /// + /// Exemple : `let snapshot = online_users.read(); // Arc>` + #[inline] + pub fn read(&self) -> Arc> { + self.inner.load_full() + } + + /// Contains optimisé pour les lectures fréquentes - LE PLUS IMPORTANT + /// + /// Exemple : `if online_users.contains(&user_id) { allow_message(); }` + #[inline] + pub fn contains(&self, value: &T) -> bool { + self.inner.load().contains(value) + } + + /// Count optimisé pour les lectures fréquentes + /// + /// Exemple : `println!("Utilisateurs en ligne : {}", online_users.len());` + #[inline] + pub fn len(&self) -> usize { + self.inner.load().len() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.inner.load().is_empty() + } + + /// Intersection simple - retourne un nouveau HashSet + /// + /// Exemple : `let common = channel_a_users.intersection(&channel_b_users);` + pub fn intersection(&self, other: &HashSet) -> HashSet { + let current = self.inner.load_full(); + current.intersection(other).cloned().collect() + } + + /// Union simple - retourne un nouveau HashSet + /// + /// Exemple : `let all_users = online_users.union(&offline_users);` + pub fn union(&self, other: &HashSet) -> HashSet { + let current = self.inner.load_full(); + current.union(other).cloned().collect() + } + + /// Différence simple - retourne un nouveau HashSet + /// + /// Exemple : `let can_speak = online_users.difference(&muted_users);` + pub fn difference(&self, other: &HashSet) -> HashSet { + let current = self.inner.load_full(); + current.difference(other).cloned().collect() + } + + /// Différence symétrique - éléments dans l'un OU l'autre, mais pas les deux + /// + /// Exemple : `let unique_to_each = set1.symmetric_difference(&set2);` + pub fn symmetric_difference(&self, other: &HashSet) -> HashSet { + let current = self.inner.load_full(); + current.symmetric_difference(other).cloned().collect() + } + + /// Vérifie si c'est un sous-ensemble + /// + /// Exemple : `if admins.is_subset(&all_users) { ... }` + pub fn is_subset(&self, other: &HashSet) -> bool { + let current = self.inner.load_full(); + current.is_subset(other) + } + + /// Vérifie si c'est un sur-ensemble + /// + /// Exemple : `if all_users.is_superset(&admins) { ... }` + pub fn is_superset(&self, other: &HashSet) -> bool { + let current = self.inner.load_full(); + current.is_superset(other) + } + + /// Vérifie si les deux ensembles sont disjoints (aucun élément commun) + /// + /// Exemple : `if online_users.is_disjoint(&banned_users) { ... }` + pub fn is_disjoint(&self, other: &HashSet) -> bool { + let current = self.inner.load_full(); + current.is_disjoint(other) + } + + // API CALLBACK - Pour optimisation avancée + /// Intersection avec callback - ultra-efficace pour opérations ponctuelles + /// + /// Exemple : `let count = users.with_intersection(&other, |result| result.len());` + pub fn with_intersection(&self, other: &HashSet, f: F) -> R + where + F: FnOnce(&HashSet) -> R, + { + let current = self.inner.load_full(); + let result: HashSet = current.intersection(other).cloned().collect(); + f(&result) + } + + /// Union avec callback + /// + /// Exemple : `users.with_union(&other, |result| broadcast_to_all(result));` + pub fn with_union(&self, other: &HashSet, f: F) -> R + where + F: FnOnce(&HashSet) -> R, + { + let current = self.inner.load_full(); + let result: HashSet = current.union(other).cloned().collect(); + f(&result) + } + + /// Différence avec callback + /// + /// Exemple : `users.with_difference(&banned, |can_speak| notify(can_speak));` + pub fn with_difference(&self, other: &HashSet, f: F) -> R + where + F: FnOnce(&HashSet) -> R, + { + let current = self.inner.load_full(); + let result: HashSet = current.difference(other).cloned().collect(); + f(&result) + } + + /// Insert - Optimisé pour minimiser les clones + /// + /// Exemple : `online_users.insert(user_id); // true si nouveau` + pub fn insert(&self, item: T) -> bool { + let current = self.inner.load_full(); + + // Optimisation : si déjà présent, pas de clone + if current.contains(&item) { + return false; + } + + let mut new_set = current.as_ref().clone(); + let was_new = new_set.insert(item); + self.inner.store(Arc::new(new_set)); + was_new + } + + /// Remove - Optimisé + /// + /// Exemple : `online_users.remove(&user_id); // true si était présent` + pub fn remove(&self, item: &T) -> bool { + let current = self.inner.load_full(); + + // Optimisation : si pas présent, pas de clone + if !current.contains(item) { + return false; + } + + let mut new_set = current.as_ref().clone(); + let was_present = new_set.remove(item); + self.inner.store(Arc::new(new_set)); + was_present + } + + /// Extend - Un seul remplacement atomique + /// + /// Exemple : `online_users.extend(vec![user1, user2, user3]);` + pub fn extend(&self, items: I) + where + I: IntoIterator, + { + let items: Vec = items.into_iter().collect(); + if items.is_empty() { + return; + } + + let current = self.inner.load_full(); + let mut new_set = current.as_ref().clone(); + new_set.extend(items); + self.inner.store(Arc::new(new_set)); + } + + /// Remplacement complet - très efficace + /// + /// Exemple : `online_users.replace(new_user_set);` + pub fn replace(&self, new_set: HashSet) { + self.inner.store(Arc::new(new_set)); + } + + /// Clear optimisé + /// + /// Exemple : `online_users.clear(); // Vide l'ensemble` + pub fn clear(&self) { + self.inner.store(Arc::new(HashSet::new())); + } + + /// Iteration lock-free + /// + /// Exemple : `for user in online_users.iter() { send_notification(user); }` + pub fn iter(&self) -> impl Iterator + '_ { + let set_ref = self.inner.load_full(); + set_ref.iter().cloned().collect::>().into_iter() + } + + /// Retourne une copie du HashSet (pour compatibility) + /// + /// Exemple : `let snapshot = online_users.to_hashset();` + pub fn to_hashset(&self) -> HashSet { + self.inner.load().as_ref().clone() + } + + /// Operations batch optimisées + /// + /// Exemple : `let added = users.insert_many(vec![user1, user2]); // Retourne le nombre ajouté` + pub fn insert_many(&self, items: I) -> usize + where + I: IntoIterator, + { + let items: Vec = items.into_iter().collect(); + if items.is_empty() { + return 0; + } + + let current = self.inner.load_full(); + let mut new_set = current.as_ref().clone(); + let original_len = new_set.len(); + + for item in items { + new_set.insert(item); + } + + let inserted_count = new_set.len() - original_len; + self.inner.store(Arc::new(new_set)); + inserted_count + } + + /// Remove many - batch optimisé + /// + /// Exemple : `let removed = users.remove_many(vec![user1, user2]); // Retourne le nombre supprimé` + pub fn remove_many(&self, items: I) -> usize + where + I: IntoIterator, + { + let items: Vec = items.into_iter().collect(); + if items.is_empty() { + return 0; + } + + let current = self.inner.load_full(); + let mut new_set = current.as_ref().clone(); + let original_len = new_set.len(); + + for item in items { + new_set.remove(&item); + } + + let removed_count = original_len - new_set.len(); + self.inner.store(Arc::new(new_set)); + removed_count + } +} + +// Clone gratuit (juste Arc::clone) +impl Clone for SharedHashSet { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner) + } + } +} + +impl Default for SharedHashSet +where + T: Hash + Eq + Clone, +{ + fn default() -> Self { + Self::new() + } +} + +impl SharedMap +where + K: Hash + Eq + Clone, + V: Clone, +{ + /// Crée un nouveau SharedMap vide + /// + /// Exemple : `let sessions = SharedMap::new();` + pub fn new() -> Self { + Self { + inner: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))) + } + } + + /// Crée un nouveau SharedMap avec une capacité initiale + /// + /// Exemple : `let cache = SharedMap::with_capacity(1000);` + pub fn with_capacity(capacity: usize) -> Self { + Self { + inner: Arc::new(ArcSwap::new(Arc::new(HashMap::with_capacity(capacity)))) + } + } + + /// Crée un nouveau SharedMap à partir d'un HashMap existant + /// + /// Exemple : `let shared = SharedMap::from_hashmap(existing_map);` + pub fn from_hashmap(map: HashMap) -> Self { + Self { + inner: Arc::new(ArcSwap::new(Arc::new(map))) + } + } + + /// LECTURE ULTRA-RAPIDE - Accès direct lock-free + /// + /// Exemple : `let snapshot = sessions.read(); // Arc>` + #[inline] + pub fn read(&self) -> Arc> { + self.inner.load_full() + } + + /// Get optimisé pour les lectures fréquentes + /// + /// Exemple : `if let Some(user) = sessions.get(&session_id) { ... }` + #[inline] + pub fn get(&self, key: &K) -> Option { + self.inner.load().get(key).cloned() + } + + /// Contains key optimisé pour les lectures fréquentes + /// + /// Exemple : `if sessions.contains_key(&session_id) { ... }` + #[inline] + pub fn contains_key(&self, key: &K) -> bool { + self.inner.load().contains_key(key) + } + + /// Count optimisé pour les lectures fréquentes + /// + /// Exemple : `println!("Sessions actives : {}", sessions.len());` + #[inline] + pub fn len(&self) -> usize { + self.inner.load().len() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.inner.load().is_empty() + } + + /// Insert - Optimisé pour minimiser les clones + /// + /// Exemple : `sessions.insert(session_id, user_data); // Retourne l'ancienne valeur` + pub fn insert(&self, key: K, value: V) -> Option { + let current = self.inner.load_full(); + let mut new_map = current.as_ref().clone(); + let old_value = new_map.insert(key, value); + self.inner.store(Arc::new(new_map)); + old_value + } + + /// Remove - Optimisé + /// + /// Exemple : `sessions.remove(&session_id); // Retourne l'ancienne valeur` + pub fn remove(&self, key: &K) -> Option { + let current = self.inner.load_full(); + + // Optimisation : si pas présent, pas de clone + if !current.contains_key(key) { + return None; + } + + let mut new_map = current.as_ref().clone(); + let old_value = new_map.remove(key); + self.inner.store(Arc::new(new_map)); + old_value + } + + /// Clear optimisé + /// + /// Exemple : `sessions.clear(); // Vide la map` + pub fn clear(&self) { + self.inner.store(Arc::new(HashMap::new())); + } + + /// Retourne une copie du HashMap (pour compatibility) + /// + /// Exemple : `let snapshot = sessions.to_hashmap();` + pub fn to_hashmap(&self) -> HashMap { + self.inner.load().as_ref().clone() + } + + /// Iteration lock-free des clés + /// + /// Exemple : `for session_id in sessions.keys() { cleanup(session_id); }` + pub fn keys(&self) -> impl Iterator + '_ { + let map_ref = self.inner.load_full(); + map_ref.keys().cloned().collect::>().into_iter() + } + + /// Iteration lock-free des valeurs + /// + /// Exemple : `for user_data in sessions.values() { send_ping(user_data); }` + pub fn values(&self) -> impl Iterator + '_ { + let map_ref = self.inner.load_full(); + map_ref.values().cloned().collect::>().into_iter() + } +} + +// Clone gratuit (juste Arc::clone) +impl Clone for SharedMap { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner) + } + } +} + +impl Default for SharedMap +where + K: Hash + Eq + Clone, + V: Clone, +{ + fn default() -> Self { + Self::new() + } +} + +// ===== IMPLÉMENTATIONS ARC ===== + +impl SharedArcVec { + /// Crée un nouveau SharedArcVec vide + pub fn new() -> Self { + Self { + inner: Arc::new(ArcSwap::new(Arc::new(Vec::new()))) + } + } + + /// Crée un nouveau SharedArcVec avec une capacité initiale + pub fn with_capacity(capacity: usize) -> Self { + Self { + inner: Arc::new(ArcSwap::new(Arc::new(Vec::with_capacity(capacity)))) + } + } + + /// LECTURE ULTRA-RAPIDE - Accès direct lock-free + #[inline] + pub fn read(&self) -> Arc>> { + self.inner.load_full() + } + + /// Accès direct par index - retourne Arc + #[inline] + pub fn get(&self, index: usize) -> Option> { + self.inner.load().get(index).cloned() + } + + /// Append avec Arc automatique + pub fn push(&self, item: T) { + let arc_item = Arc::new(item); + let current = self.inner.load_full(); + let mut new_vec = Vec::with_capacity(current.len() + 1); + new_vec.clone_from(¤t); + new_vec.push(arc_item); + self.inner.store(Arc::new(new_vec)); + } + + /// Append d'un Arc existant (évite double-wrapping) + pub fn push_arc(&self, item: Arc) { + let current = self.inner.load_full(); + let mut new_vec = Vec::with_capacity(current.len() + 1); + new_vec.clone_from(¤t); + new_vec.push(item); + self.inner.store(Arc::new(new_vec)); + } + + /// Extend avec Arc automatique + pub fn extend(&self, items: I) + where + I: IntoIterator, + { + let arc_items: Vec> = items.into_iter().map(Arc::new).collect(); + if arc_items.is_empty() { + return; + } + + let current = self.inner.load_full(); + let mut new_vec = Vec::with_capacity(current.len() + arc_items.len()); + new_vec.clone_from(¤t); + new_vec.extend(arc_items); + self.inner.store(Arc::new(new_vec)); + } + + /// Modification d'un élément par index + pub fn modify(&self, index: usize, f: F) -> bool + where + F: FnOnce(&T), + { + if let Some(arc_item) = self.get(index) { + f(&arc_item); + true + } else { + false + } + } + + #[inline] + pub fn len(&self) -> usize { + self.inner.load().len() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.inner.load().is_empty() + } + + pub fn clear(&self) { + self.inner.store(Arc::new(Vec::new())); + } + + /// Iteration retournant Arc + pub fn iter(&self) -> impl Iterator> + '_ { + let vec_ref = self.inner.load_full(); + (0..vec_ref.len()).map(move |i| vec_ref[i].clone()) + } +} + +impl SharedArcHashSet +where + T: Hash + Eq, +{ + /// Crée un nouveau SharedArcHashSet vide + pub fn new() -> Self { + Self { + inner: Arc::new(ArcSwap::new(Arc::new(HashSet::new()))) + } + } + + /// Crée un nouveau SharedArcHashSet avec une capacité initiale + pub fn with_capacity(capacity: usize) -> Self { + Self { + inner: Arc::new(ArcSwap::new(Arc::new(HashSet::with_capacity(capacity)))) + } + } + + /// LECTURE ULTRA-RAPIDE - Accès direct lock-free + #[inline] + pub fn read(&self) -> Arc>> { + self.inner.load_full() + } + + /// Contains avec recherche par référence + #[inline] + pub fn contains(&self, value: &T) -> bool { + self.inner.load().iter().any(|arc_item| arc_item.as_ref() == value) + } + + /// Get retourne Arc si trouvé + pub fn get(&self, value: &T) -> Option> { + self.inner.load().iter() + .find(|arc_item| arc_item.as_ref() == value) + .cloned() + } + + /// Insert avec Arc automatique + pub fn insert(&self, item: T) -> bool { + let current = self.inner.load_full(); + + // Vérifier si déjà présent + if current.iter().any(|arc_item| arc_item.as_ref() == &item) { + return false; + } + + let mut new_set = current.as_ref().clone(); + let was_new = new_set.insert(Arc::new(item)); + self.inner.store(Arc::new(new_set)); + was_new + } + + /// Insert d'un Arc existant + pub fn insert_arc(&self, item: Arc) -> bool { + let current = self.inner.load_full(); + + if current.contains(&item) { + return false; + } + + let mut new_set = current.as_ref().clone(); + let was_new = new_set.insert(item); + self.inner.store(Arc::new(new_set)); + was_new + } + + /// Remove par référence + pub fn remove(&self, item: &T) -> bool { + let current = self.inner.load_full(); + + // Trouver l'Arc correspondant + let arc_to_remove = current.iter() + .find(|arc_item| arc_item.as_ref() == item) + .cloned(); + + if let Some(arc_item) = arc_to_remove { + let mut new_set = current.as_ref().clone(); + let was_present = new_set.remove(&arc_item); + self.inner.store(Arc::new(new_set)); + was_present + } else { + false + } + } + + /// Modification d'un élément + pub fn modify(&self, key: &T, f: F) -> bool + where + F: FnOnce(&T), + { + if let Some(arc_item) = self.get(key) { + f(&arc_item); + true + } else { + false + } + } + + #[inline] + pub fn len(&self) -> usize { + self.inner.load().len() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.inner.load().is_empty() + } + + pub fn clear(&self) { + self.inner.store(Arc::new(HashSet::new())); + } + + /// Iteration retournant Arc + pub fn iter(&self) -> impl Iterator> + '_ { + let set_ref = self.inner.load_full(); + set_ref.iter().cloned().collect::>().into_iter() + } +} + +impl SharedArcMap +where + K: Hash + Eq + Clone, +{ + /// Crée un nouveau SharedArcMap vide + pub fn new() -> Self { + Self { + inner: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))) + } + } + + /// Crée un nouveau SharedArcMap avec une capacité initiale + pub fn with_capacity(capacity: usize) -> Self { + Self { + inner: Arc::new(ArcSwap::new(Arc::new(HashMap::with_capacity(capacity)))) + } + } + + /// LECTURE ULTRA-RAPIDE - Accès direct lock-free + #[inline] + pub fn read(&self) -> Arc>> { + self.inner.load_full() + } + + /// Get retourne Arc + #[inline] + pub fn get(&self, key: &K) -> Option> { + self.inner.load().get(key).cloned() + } + + /// Contains key + #[inline] + pub fn contains_key(&self, key: &K) -> bool { + self.inner.load().contains_key(key) + } + + /// Insert avec Arc automatique + pub fn insert(&self, key: K, value: V) -> Option> { + let arc_value = Arc::new(value); + let current = self.inner.load_full(); + let mut new_map = current.as_ref().clone(); + let old_value = new_map.insert(key, arc_value); + self.inner.store(Arc::new(new_map)); + old_value + } + + /// Insert d'un Arc existant + pub fn insert_arc(&self, key: K, value: Arc) -> Option> { + let current = self.inner.load_full(); + let mut new_map = current.as_ref().clone(); + let old_value = new_map.insert(key, value); + self.inner.store(Arc::new(new_map)); + old_value + } + + /// Remove + pub fn remove(&self, key: &K) -> Option> { + let current = self.inner.load_full(); + + if !current.contains_key(key) { + return None; + } + + let mut new_map = current.as_ref().clone(); + let old_value = new_map.remove(key); + self.inner.store(Arc::new(new_map)); + old_value + } + + /// Modification d'une valeur + pub fn modify(&self, key: &K, f: F) -> bool + where + F: FnOnce(&V), + { + if let Some(arc_value) = self.get(key) { + f(&arc_value); + true + } else { + false + } + } + + #[inline] + pub fn len(&self) -> usize { + self.inner.load().len() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.inner.load().is_empty() + } + + pub fn clear(&self) { + self.inner.store(Arc::new(HashMap::new())); + } + + /// Iteration des clés + pub fn keys(&self) -> impl Iterator + '_ { + let map_ref = self.inner.load_full(); + map_ref.keys().cloned().collect::>().into_iter() + } + + /// Iteration des valeurs Arc + pub fn values(&self) -> impl Iterator> + '_ { + let map_ref = self.inner.load_full(); + map_ref.values().cloned().collect::>().into_iter() + } + + /// Iteration des paires (K, Arc) + pub fn iter(&self) -> impl Iterator)> + '_ { + let map_ref = self.inner.load_full(); + map_ref.iter().map(|(k, v)| (k.clone(), v.clone())).collect::>().into_iter() + } +} + +// ===== IMPLÉMENTATIONS CLONE ===== + +impl Clone for SharedArcVec { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner) + } + } +} + +impl Clone for SharedArcHashSet { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner) + } + } +} + +impl Clone for SharedArcMap { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner) + } + } +} + +// ===== IMPLÉMENTATIONS DEFAULT ===== + +impl Default for SharedArcVec { + fn default() -> Self { + Self::new() + } +} + +impl Default for SharedArcHashSet +where + T: Hash + Eq, +{ + fn default() -> Self { + Self::new() + } +} + +impl Default for SharedArcMap +where + K: Hash + Eq + Clone, +{ + fn default() -> Self { + Self::new() + } +}