This commit is contained in:
2025-07-19 03:45:57 +02:00
parent 51118fee63
commit 0e9c2b08d6
10 changed files with 1487 additions and 462 deletions

161
.idea/workspace.xml generated Normal file
View File

@@ -0,0 +1,161 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="AutoImportSettings">
<option name="autoReloadType" value="ALL" />
</component>
<component name="CargoProjects">
<cargoProject FILE="$PROJECT_DIR$/Cargo.toml">
<package file="$PROJECT_DIR$">
<enabledFeature name="default" />
</package>
</cargoProject>
</component>
<component name="ChangeListManager">
<list default="true" id="ca698286-778f-4335-97c8-da35a666c986" name="Changes" comment="init">
<change afterPath="$PROJECT_DIR$/src/utils/shared_store.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/Cargo.lock" beforeDir="false" afterPath="$PROJECT_DIR$/Cargo.lock" afterDir="false" />
<change beforePath="$PROJECT_DIR$/Cargo.toml" beforeDir="false" afterPath="$PROJECT_DIR$/Cargo.toml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/app/app.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/app/app.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/domain/client.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/domain/client.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/domain/event.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/domain/event.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/runtime/dispatcher.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/runtime/dispatcher.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/utils/byte_utils.rs" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/src/utils/mod.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/utils/mod.rs" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
<option name="LAST_RESOLUTION" value="IGNORE" />
</component>
<component name="ExecutionTargetManager" SELECTED_TARGET="RsBuildProfile:dev" />
<component name="FileTemplateManagerImpl">
<option name="RECENT_TEMPLATES">
<list>
<option value="Rust File" />
</list>
</option>
</component>
<component name="Git.Settings">
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
</component>
<component name="MacroExpansionManager">
<option name="directoryName" value="oBePpDlK" />
</component>
<component name="ProjectColorInfo">{
&quot;associatedIndex&quot;: 5
}</component>
<component name="ProjectId" id="2zaZ93S6zEp6mJe7Xq5WfvIMexv" />
<component name="ProjectViewState">
<option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" />
</component>
<component name="PropertiesComponent">{
&quot;keyToString&quot;: {
&quot;Cargo.Run.executor&quot;: &quot;Run&quot;,
&quot;ModuleVcsDetector.initialDetectionPerformed&quot;: &quot;true&quot;,
&quot;RunOnceActivity.ShowReadmeOnStart&quot;: &quot;true&quot;,
&quot;RunOnceActivity.git.unshallow&quot;: &quot;true&quot;,
&quot;RunOnceActivity.rust.reset.selective.auto.import&quot;: &quot;true&quot;,
&quot;git-widget-placeholder&quot;: &quot;master&quot;,
&quot;ignore.virus.scanning.warn.message&quot;: &quot;true&quot;,
&quot;junie.onboarding.icon.badge.shown&quot;: &quot;true&quot;,
&quot;last_opened_file_path&quot;: &quot;D:/Dev/ox_speak_server/src/network&quot;,
&quot;node.js.detected.package.eslint&quot;: &quot;true&quot;,
&quot;node.js.detected.package.tslint&quot;: &quot;true&quot;,
&quot;node.js.selected.package.eslint&quot;: &quot;(autodetect)&quot;,
&quot;node.js.selected.package.tslint&quot;: &quot;(autodetect)&quot;,
&quot;nodejs_package_manager_path&quot;: &quot;npm&quot;,
&quot;org.rust.cargo.project.model.PROJECT_DISCOVERY&quot;: &quot;true&quot;,
&quot;org.rust.cargo.project.model.impl.CargoExternalSystemProjectAware.subscribe.first.balloon&quot;: &quot;&quot;,
&quot;org.rust.first.attach.projects&quot;: &quot;true&quot;,
&quot;settings.editor.selected.configurable&quot;: &quot;terminal&quot;,
&quot;vue.rearranger.settings.migration&quot;: &quot;true&quot;
}
}</component>
<component name="RecentsManager">
<key name="CopyFile.RECENT_KEYS">
<recent name="D:\Dev\ox_speak_server\src\network" />
</key>
</component>
<component name="RunManager">
<configuration name="Run" type="CargoCommandRunConfiguration" factoryName="Cargo Command">
<option name="command" value="run --package ox_speak_server --bin ox_speak_server" />
<option name="workingDirectory" value="file://$PROJECT_DIR$" />
<envs />
<option name="emulateTerminal" value="true" />
<option name="channel" value="DEFAULT" />
<option name="requiredFeatures" value="true" />
<option name="allFeatures" value="false" />
<option name="withSudo" value="false" />
<option name="buildTarget" value="REMOTE" />
<option name="backtrace" value="SHORT" />
<option name="isRedirectInput" value="false" />
<option name="redirectInputPath" value="" />
<method v="2">
<option name="CARGO.BUILD_TASK_PROVIDER" enabled="true" />
</method>
</configuration>
</component>
<component name="RustProjectSettings">
<option name="toolchainHomeDirectory" value="$USER_HOME$/.cargo/bin" />
</component>
<component name="TaskManager">
<task active="true" id="Default" summary="Default task">
<changelist id="ca698286-778f-4335-97c8-da35a666c986" name="Changes" comment="" />
<created>1751970990022</created>
<option name="number" value="Default" />
<option name="presentableId" value="Default" />
<updated>1751970990022</updated>
<workItem from="1751970991653" duration="7327000" />
<workItem from="1752016714585" duration="1095000" />
<workItem from="1752076174877" duration="11015000" />
<workItem from="1752189013977" duration="4750000" />
<workItem from="1752224469499" duration="14591000" />
<workItem from="1752305152715" duration="19879000" />
<workItem from="1752391851097" duration="12061000" />
<workItem from="1752446764168" duration="8166000" />
<workItem from="1752484190578" duration="17709000" />
<workItem from="1752591656862" duration="429000" />
<workItem from="1752593250094" duration="11969000" />
<workItem from="1752694022400" duration="6212000" />
<workItem from="1752741840195" duration="2946000" />
<workItem from="1752833798325" duration="5366000" />
</task>
<task id="LOCAL-00001" summary="init">
<option name="closed" value="true" />
<created>1752591904243</created>
<option name="number" value="00001" />
<option name="presentableId" value="LOCAL-00001" />
<option name="project" value="LOCAL" />
<updated>1752591904243</updated>
</task>
<task id="LOCAL-00002" summary="init">
<option name="closed" value="true" />
<created>1752591989113</created>
<option name="number" value="00002" />
<option name="presentableId" value="LOCAL-00002" />
<option name="project" value="LOCAL" />
<updated>1752591989113</updated>
</task>
<option name="localTasksCounter" value="3" />
<servers />
</component>
<component name="TypeScriptGeneratedFilesManager">
<option name="version" value="3" />
</component>
<component name="Vcs.Log.Tabs.Properties">
<option name="TAB_STATES">
<map>
<entry key="MAIN">
<value>
<State />
</value>
</entry>
</map>
</option>
</component>
<component name="VcsManagerConfiguration">
<MESSAGE value="init" />
<option name="LAST_COMMIT_MESSAGE" value="init" />
</component>
</project>

25
Cargo.lock generated
View File

@@ -17,6 +17,12 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa"
[[package]]
name = "arc-swap"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.5.0" version = "1.5.0"
@@ -102,6 +108,12 @@ dependencies = [
"pin-project-lite", "pin-project-lite",
] ]
[[package]]
name = "futures-core"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.3.3" version = "0.3.3"
@@ -159,6 +171,16 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "kanal"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e3953adf0cd667798b396c2fa13552d6d9b3269d7dd1154c4c416442d1ff574"
dependencies = [
"futures-core",
"lock_api",
]
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.174" version = "0.2.174"
@@ -226,9 +248,12 @@ checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
name = "ox_speak_server" name = "ox_speak_server"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"arc-swap",
"bytes", "bytes",
"crossbeam-utils",
"dashmap", "dashmap",
"event-listener", "event-listener",
"kanal",
"parking_lot", "parking_lot",
"serde", "serde",
"serde_json", "serde_json",

View File

@@ -22,4 +22,7 @@ strum = {version = "0.27", features = ["derive"] }
uuid = {version = "1.17", features = ["v4", "serde"] } uuid = {version = "1.17", features = ["v4", "serde"] }
event-listener = "5.4" event-listener = "5.4"
dashmap = "6.1" dashmap = "6.1"
bytes = "1.10" bytes = "1.10"
arc-swap = "1.7"
crossbeam-utils = "0.8"
kanal = "0.1"

View File

@@ -10,7 +10,7 @@ pub struct App {
// Communication inter-components // Communication inter-components
event_bus: EventBus, event_bus: EventBus,
dispatcher: Dispatcher, dispatcher: Dispatcher,
event_rx: Option<mpsc::Receiver<Event>>, event_rx: kanal::AsyncReceiver<Event>,
// Network // Network
udp_server: UdpServer, udp_server: UdpServer,
@@ -32,20 +32,22 @@ impl App {
Self { Self {
event_bus, event_bus,
dispatcher, dispatcher,
event_rx: Some(event_rx), event_rx,
udp_server, udp_server,
client_manager client_manager
} }
} }
pub async fn start(&mut self) { pub async fn start(&mut self) {
if let Some(event_rx) = self.event_rx.take() { for i in 0..4 {
let dispatcher = self.dispatcher.clone(); let dispatcher = self.dispatcher.clone();
let event_rx = self.event_rx.clone();
tokio::spawn(async move { tokio::spawn(async move {
dispatcher.start(event_rx).await; dispatcher.start(event_rx).await;
}); });
} }
let _ = self.udp_server.start().await; let _ = self.udp_server.start().await;
let _ = self.tick_tasks().await; let _ = self.tick_tasks().await;
println!("App started"); println!("App started");

View File

@@ -1,34 +1,40 @@
//! Gestion des clients pour les connexions UDP //! Gestion des clients pour les connexions UDP
//! //!
//! Ce module fournit les structures et méthodes pour gérer les clients //! Ce module fournit les structures et méthodes pour gérer les clients
//! connectés au serveur UDP, incluant leur tracking et leurs modifications. //! connectés au serveur UDP, incluant leur tracking et leurs modifications.
use std::collections::HashSet;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use dashmap::DashMap;
use tokio::time::Instant; use tokio::time::Instant;
use uuid::Uuid; use uuid::Uuid;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::time::Duration; use std::time::Duration;
use crossbeam_utils::atomic::AtomicCell;
use crate::utils::shared_store::SharedArcMap;
/// Représente un client connecté au serveur UDP /// Représente un client connecté au serveur UDP
/// ///
/// Chaque client est identifié par un UUID unique et contient /// Chaque client est identifié par un UUID unique et contient
/// son adresse réseau ainsi que l'heure de sa dernière activité. /// son adresse réseau ainsi que l'heure de sa dernière activité.
/// Le `last_seen` utilise AtomicCell pour des mises à jour lock-free.
#[derive(Debug)] #[derive(Debug)]
pub struct Client { pub struct Client {
id: Uuid, id: Uuid,
address: SocketAddr, address: SocketAddr,
last_seen: Instant, last_seen: AtomicCell<Instant>,
} }
/// Gestionnaire threadsafe pour les clients connectés /// Gestionnaire threadsafe pour les clients connectés
/// ///
/// Utilise `DashMap` pour permettre un accès concurrent sécurisé /// Utilise `SharedArcMap` pour permettre un accès concurrent sécurisé
/// aux clients depuis plusieurs threads. /// aux clients depuis plusieurs threads avec des performances optimisées
/// pour les lectures fréquentes. Les clients sont stockés dans des Arc
/// pour éviter les clones coûteux.
#[derive(Clone)] #[derive(Clone)]
pub struct ClientManager { pub struct ClientManager {
clients: Arc<DashMap<SocketAddr, Client>>, udp_clients: SharedArcMap<SocketAddr, Client>,
} }
impl Client { impl Client {
@@ -38,7 +44,7 @@ impl Client {
Self { Self {
id, id,
address, address,
last_seen: Instant::now(), last_seen: AtomicCell::new(Instant::now()),
} }
} }
@@ -53,13 +59,15 @@ impl Client {
} }
/// Retourne l'instant de la dernière activité du client /// Retourne l'instant de la dernière activité du client
/// Accès lock-free grâce à AtomicCell
pub fn last_seen(&self) -> Instant { pub fn last_seen(&self) -> Instant {
self.last_seen self.last_seen.load()
} }
/// Met à jour l'heure de dernière activité du client à maintenant /// Met à jour l'heure de dernière activité du client à maintenant
pub fn update_last_seen(&mut self) { /// Opération lock-free grâce à AtomicCell
self.last_seen = Instant::now(); pub fn update_last_seen(&self) {
self.last_seen.store(Instant::now());
} }
} }
@@ -81,53 +89,96 @@ impl ClientManager {
/// Crée un nouveau gestionnaire de clients vide /// Crée un nouveau gestionnaire de clients vide
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
clients: Arc::new(DashMap::new()), udp_clients: SharedArcMap::new(),
}
}
/// Crée un nouveau gestionnaire de clients avec une capacité initiale
pub fn with_capacity(capacity: usize) -> Self {
Self {
udp_clients: SharedArcMap::with_capacity(capacity),
} }
} }
/// Ajoute un client au gestionnaire /// Ajoute un client au gestionnaire
pub fn add(&self, client: Client) { /// Retourne l'ancien client Arc s'il existait déjà
self.clients.insert(client.address(), client); pub fn add(&self, client: Client) -> Option<Arc<Client>> {
self.udp_clients.insert(client.address(), client)
} }
/// Supprime un client du gestionnaire /// Supprime un client du gestionnaire par son adresse
pub fn remove(&self, client: Client) { /// Retourne l'Arc du client supprimé
self.clients.remove(&client.address()); pub fn remove(&self, address: SocketAddr) -> Option<Arc<Client>> {
self.udp_clients.remove(&address)
}
/// Supprime un client du gestionnaire par son instance
/// Retourne l'Arc du client supprimé
pub fn remove_client(&self, client: &Client) -> Option<Arc<Client>> {
self.udp_clients.remove(&client.address())
} }
/// Vérifie si un client existe pour une adresse donnée /// Vérifie si un client existe pour une adresse donnée
pub fn client_exists(&self, address: SocketAddr) -> bool { pub fn client_exists(&self, address: SocketAddr) -> bool {
self.clients.contains_key(&address) self.udp_clients.contains_key(&address)
} }
/// Récupère une référence vers un client par son adresse /// Récupère un Arc vers le client par son adresse
pub fn get_client_by_address(&self, address: SocketAddr) -> Option<dashmap::mapref::one::Ref<SocketAddr, Client>> { /// Très efficace - pas de clone du Client
self.clients.get(&address) pub fn get_client_by_address(&self, address: SocketAddr) -> Option<Arc<Client>> {
self.udp_clients.get(&address)
}
pub fn get_uuid_by_address(&self, address: SocketAddr) -> Option<Uuid> {
self.udp_clients.get(&address).map(|client| client.id)
} }
/// Récupère toutes les adresses des clients connectés /// Récupère toutes les adresses des clients connectés
pub fn get_all_adresses(&self) -> Vec<SocketAddr> { pub fn get_all_addresses(&self) -> HashSet<SocketAddr> {
self.clients.iter().map(|entry| *entry.key()).collect() self.udp_clients.keys().collect()
}
/// Retourne le nombre de clients connectés
pub fn len(&self) -> usize {
self.udp_clients.len()
}
/// Vérifie si le gestionnaire est vide
pub fn is_empty(&self) -> bool {
self.udp_clients.is_empty()
} }
/// Met à jour l'heure de dernière activité d'un client /// Met à jour l'heure de dernière activité d'un client
pub fn update_client_last_seen(&self, address: SocketAddr) { /// Utilise la méthode modify de SharedArcMap pour une opération lock-free
if let Some(mut client) = self.clients.get_mut(&address) { pub fn update_client_last_seen(&self, address: SocketAddr) -> bool {
self.udp_clients.modify(&address, |client| {
client.update_last_seen(); client.update_last_seen();
})
}
/// Supprime les clients trop vieux
/// Utilise l'accès lock-free pour identifier les clients à supprimer
pub fn cleanup(&self, max_age: Duration) {
let now = Instant::now();
let addresses_to_remove: Vec<SocketAddr> = self
.udp_clients
.read()
.iter()
.filter(|(_, client)| now - client.last_seen() >= max_age)
.map(|(addr, _)| *addr)
.collect();
for address in addresses_to_remove {
self.udp_clients.remove(&address);
} }
} }
/// Supprimer les clients trop vieux
pub fn cleanup(&self, max_age: Duration) {
let now = Instant::now();
self.clients.retain(|_, client| now - client.last_seen() < max_age);
}
/// Modifie un client via une closure /// Modifie un client via une closure
/// Utilise la méthode modify optimisée de SharedArcMap
/// ///
/// # Arguments /// # Arguments
/// * `address` - L'adresse du client à modifier /// * `address` - L'adresse du client à modifier
/// * `f` - La closure qui recevra une référence mutable vers le client /// * `f` - La closure qui recevra une référence vers le client
/// ///
/// # Returns /// # Returns
/// `true` si le client a été trouvé et modifié, `false` sinon /// `true` si le client a été trouvé et modifié, `false` sinon
@@ -137,26 +188,77 @@ impl ClientManager {
/// let client_manager = ClientManager::new(); /// let client_manager = ClientManager::new();
/// let addr = "127.0.0.1:8080".parse().unwrap(); /// let addr = "127.0.0.1:8080".parse().unwrap();
/// ///
/// // Mise à jour simple /// // Mise à jour de last_seen (lock-free)
/// client_manager.modify_client(addr, |client| { /// client_manager.modify_client(addr, |client| {
/// client.update_last_seen(); /// client.update_last_seen();
/// }); /// });
/// ///
/// // Modifications multiples /// // Accès aux propriétés du client
/// let success = client_manager.modify_client(addr, |client| { /// let found = client_manager.modify_client(addr, |client| {
/// client.update_last_seen(); /// println!("Client ID: {}", client.id());
/// // autres modifications... /// println!("Dernière activité: {:?}", client.last_seen());
/// }); /// });
/// ``` /// ```
pub fn modify_client<F>(&self, address: SocketAddr, f: F) -> bool pub fn modify_client<F>(&self, address: SocketAddr, f: F) -> bool
where where
F: FnOnce(&mut Client), F: FnOnce(&Client),
{ {
if let Some(mut client) = self.clients.get_mut(&address) { self.udp_clients.modify(&address, f)
f(&mut *client); }
true
} else { /// Obtient une référence en lecture seule vers tous les clients
false /// Accès lock-free ultra-rapide
} pub fn read_all(&self) -> Arc<std::collections::HashMap<SocketAddr, Arc<Client>>> {
self.udp_clients.read()
}
/// Vide tous les clients
pub fn clear(&self) {
self.udp_clients.clear();
}
/// Itère sur tous les clients avec leurs Arc
/// Très efficace - pas de clone des clients
pub fn iter(&self) -> impl Iterator<Item = (SocketAddr, Arc<Client>)> {
self.udp_clients.iter()
}
/// Itère sur toutes les adresses des clients
pub fn addresses(&self) -> impl Iterator<Item = SocketAddr> {
self.udp_clients.keys()
}
/// Itère sur tous les Arc des clients
pub fn clients(&self) -> impl Iterator<Item = Arc<Client>> {
self.udp_clients.values()
}
/// Compte le nombre de clients actifs dans une durée donnée
/// Utilise l'accès lock-free pour une performance optimale
pub fn count_active_clients(&self, max_age: Duration) -> usize {
let now = Instant::now();
self.udp_clients
.read()
.iter()
.filter(|(_, client)| now - client.last_seen() < max_age)
.count()
}
/// Obtient les adresses des clients actifs
/// Très efficace grâce à l'accès lock-free
pub fn get_active_addresses(&self, max_age: Duration) -> Vec<SocketAddr> {
let now = Instant::now();
self.udp_clients
.read()
.iter()
.filter(|(_, client)| now - client.last_seen() < max_age)
.map(|(addr, _)| *addr)
.collect()
}
}
impl Default for ClientManager {
fn default() -> Self {
Self::new()
} }
} }

View File

@@ -1,4 +1,3 @@
use tokio::sync::mpsc;
use crate::network::protocol::{UDPMessage}; use crate::network::protocol::{UDPMessage};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@@ -16,12 +15,12 @@ pub enum Event {
#[derive(Clone)] #[derive(Clone)]
pub struct EventBus { pub struct EventBus {
pub sender: mpsc::Sender<Event>, pub sender: kanal::AsyncSender<Event>,
} }
impl EventBus { impl EventBus {
pub fn new() -> (Self, mpsc::Receiver<Event>) { pub fn new() -> (Self, kanal::AsyncReceiver<Event>) {
let (sender, receiver) = mpsc::channel(10000); let (sender, receiver) = kanal::bounded_async::<Event>(4096);
(Self { sender }, receiver) (Self { sender }, receiver)
} }
@@ -33,7 +32,7 @@ impl EventBus {
let _ = self.sender.try_send(event); let _ = self.sender.try_send(event);
} }
pub fn clone_sender(&self) -> mpsc::Sender<Event> { pub fn clone_sender(&self) -> kanal::AsyncSender<Event> {
self.sender.clone() self.sender.clone()
} }
} }

View File

@@ -1,32 +1,36 @@
use std::sync::Arc;
use std::thread;
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::task::AbortHandle; use tokio::task::AbortHandle;
use crate::domain::client::ClientManager; use crate::domain::client::{Client, ClientManager};
use crate::domain::event::{Event, EventBus}; use crate::domain::event::{Event, EventBus};
use crate::network::protocol::{UDPMessageType, UDPMessage}; use crate::network::protocol::{UDPMessageType, UDPMessage, UdpBroadcastMessage, UDPMessageData};
use crate::network::udp::UdpServer; use crate::network::udp::UdpServer;
#[derive(Clone)] #[derive(Clone)]
pub struct Dispatcher { pub struct Dispatcher {
event_bus: EventBus, event_bus: Arc<EventBus>,
udp_server: UdpServer, udp_server: Arc<UdpServer>,
client_manager: ClientManager client_manager: Arc<ClientManager>
} }
impl Dispatcher { impl Dispatcher {
pub async fn new(event_bus: EventBus, udp_server: UdpServer, client_manager: ClientManager) -> Self { pub async fn new(event_bus: EventBus, udp_server: UdpServer, client_manager: ClientManager) -> Self {
Self { Self {
event_bus, event_bus: Arc::new(event_bus),
udp_server, udp_server: Arc::new(udp_server),
client_manager, client_manager: Arc::new(client_manager),
} }
} }
pub async fn start(&self, mut receiver: mpsc::Receiver<Event>) { pub async fn start(&self, receiver: kanal::AsyncReceiver<Event>) {
println!("Dispatcher démarré sur le thread : {:?}", std::thread::current().id());
let (_udp_in_abort_handle, udp_in_sender) = self.udp_in_handler().await; let (_udp_in_abort_handle, udp_in_sender) = self.udp_in_handler().await;
while let Some(event) = receiver.recv().await { while let Ok(event) = receiver.recv().await {
match event { match event {
Event::UdpIn(message) => { Event::UdpIn(message) => {
let _ = udp_in_sender.send(message).await; let _ = udp_in_sender.send(message).await;
@@ -62,6 +66,7 @@ impl Dispatcher {
pub async fn udp_in_handler(&self) -> (AbortHandle, mpsc::Sender<UDPMessage>) { pub async fn udp_in_handler(&self) -> (AbortHandle, mpsc::Sender<UDPMessage>) {
let (sender, mut consumer) = mpsc::channel::<UDPMessage>(1024); let (sender, mut consumer) = mpsc::channel::<UDPMessage>(1024);
let udp_server = self.udp_server.clone(); let udp_server = self.udp_server.clone();
let client_manager = self.client_manager.clone();
let task = tokio::spawn(async move { let task = tokio::spawn(async move {
while let Some(message) = consumer.recv().await { while let Some(message) = consumer.recv().await {
@@ -69,10 +74,22 @@ impl Dispatcher {
match message.message_type() { match message.message_type() {
UDPMessageType::Ping => { UDPMessageType::Ping => {
let response_message = UDPMessage::server_ping(message.address, message.get_message_id().unwrap()); let response_message = UDPMessage::server_ping(message.address, message.get_message_id().unwrap());
let _ = udp_server.send_udp_message(&response_message);
if client_manager.client_exists(message.address) {
client_manager.update_client_last_seen(message.address);
}else {
client_manager.add(Client::new(message.address));
}
let _ = udp_server.send_udp_message(&response_message).await;
} }
UDPMessageType::Audio => { UDPMessageType::Audio => {
// Traiter l'audio if let UDPMessageData::ClientAudio { sequence, data } = &message.data {
let addresses = client_manager.get_all_addresses();
let speaker_uuid = client_manager.get_uuid_by_address(message.address).unwrap();
let response_message = UdpBroadcastMessage::server_audio(addresses, speaker_uuid, *sequence, data.clone());
let _ = udp_server.broadcast_udp_message(&response_message).await;
}
} }
} }
} }

View File

@@ -1,398 +0,0 @@
use uuid::Uuid;
/// Helpers pour la manipulation de bytes - idéal pour les protocoles binaires
///
/// Cette structure permet de lire séquentiellement des données binaires
/// en maintenant une position de lecture interne.
pub struct ByteReader<'a> {
/// Référence vers les données à lire
data: &'a [u8],
/// Position actuelle dans le buffer de lecture
position: usize,
}
impl<'a> ByteReader<'a> {
/// Crée un nouveau lecteur de bytes à partir d'un slice
///
/// # Arguments
/// * `data` - Le slice de bytes à lire
///
/// # Example
/// ```text
/// let data = &[0x01, 0x02, 0x03, 0x04];
/// let reader = ByteReader::new(data);
/// ```
pub fn new(data: &'a [u8]) -> Self {
Self { data, position: 0 }
}
/// Retourne le nombre de bytes restants à lire
///
/// Utilise `saturating_sub` pour éviter les débordements
/// si la position dépasse la taille des données
pub fn remaining(&self) -> usize {
self.data.len().saturating_sub(self.position)
}
/// Vérifie si tous les bytes ont été lus
///
/// # Returns
/// `true` si il n'y a plus de bytes à lire
pub fn is_empty(&self) -> bool {
self.remaining() == 0
}
/// Retourne la position actuelle de lecture
pub fn position(&self) -> usize {
self.position
}
/// Déplace la position de lecture à l'index spécifié
///
/// La position est automatiquement limitée à la taille des données
/// pour éviter les débordements
///
/// # Arguments
/// * `position` - Nouvelle position de lecture
pub fn seek(&mut self, position: usize) {
self.position = position.min(self.data.len());
}
/// Lit un byte (u8) à la position actuelle
///
/// # Returns
/// * `Ok(u8)` - La valeur lue si disponible
/// * `Err(&'static str)` - Si la fin du buffer est atteinte
pub fn read_u8(&mut self) -> Result<u8, &'static str> {
if self.position < self.data.len() {
let value = self.data[self.position];
self.position += 1;
Ok(value)
} else {
Err("Not enough data for u8")
}
}
/// Lit un entier 16-bit en big-endian
///
/// # Returns
/// * `Ok(u16)` - La valeur lue si 2 bytes sont disponibles
/// * `Err(&'static str)` - Si moins de 2 bytes sont disponibles
pub fn read_u16_be(&mut self) -> Result<u16, &'static str> {
if self.remaining() >= 2 {
let value = u16::from_be_bytes([
self.data[self.position],
self.data[self.position + 1],
]);
self.position += 2;
Ok(value)
} else {
Err("Not enough data for u16")
}
}
/// Lit un entier 32-bit en big-endian
///
/// # Returns
/// * `Ok(u32)` - La valeur lue si 4 bytes sont disponibles
/// * `Err(&'static str)` - Si moins de 4 bytes sont disponibles
pub fn read_u32_be(&mut self) -> Result<u32, &'static str> {
if self.remaining() >= 4 {
let value = u32::from_be_bytes([
self.data[self.position],
self.data[self.position + 1],
self.data[self.position + 2],
self.data[self.position + 3],
]);
self.position += 4;
Ok(value)
} else {
Err("Not enough data for u32")
}
}
/// Lit un entier 64-bit en big-endian
///
/// # Returns
/// * `Ok(u64)` - La valeur lue si 8 bytes sont disponibles
/// * `Err(&'static str)` - Si moins de 8 bytes sont disponibles
pub fn read_u64_be(&mut self) -> Result<u64, &'static str> {
if self.remaining() >= 8 {
let value = u64::from_be_bytes([
self.data[self.position],
self.data[self.position + 1],
self.data[self.position + 2],
self.data[self.position + 3],
self.data[self.position + 4],
self.data[self.position + 5],
self.data[self.position + 6],
self.data[self.position + 7],
]);
self.position += 8;
Ok(value)
} else {
Err("Not enough data for u64")
}
}
/// Lit un UUID (16 bytes) à la position actuelle
///
/// Les UUIDs sont stockés sous forme de 16 bytes consécutifs.
/// Cette méthode lit ces 16 bytes et les convertit en UUID.
///
/// # Returns
/// * `Ok(Uuid)` - L'UUID lu si 16 bytes sont disponibles
/// * `Err(&'static str)` - Si moins de 16 bytes sont disponibles
pub fn read_uuid(&mut self) -> Result<Uuid, &'static str> {
if self.remaining() >= 16 {
let uuid_bytes = self.read_fixed_bytes::<16>()?;
Ok(Uuid::from_bytes(uuid_bytes))
} else {
Err("Not enough data for UUID")
}
}
/// Lit une séquence de bytes de longueur spécifiée
///
/// # Arguments
/// * `len` - Nombre de bytes à lire
///
/// # Returns
/// * `Ok(&[u8])` - Slice des bytes lus si disponibles
/// * `Err(&'static str)` - Si pas assez de bytes disponibles
pub fn read_bytes(&mut self, len: usize) -> Result<&'a [u8], &'static str> {
if self.remaining() >= len {
let slice = &self.data[self.position..self.position + len];
self.position += len;
Ok(slice)
} else {
Err("Not enough data for bytes")
}
}
/// Lit un tableau de bytes de taille fixe définie à la compilation
///
/// Utilise les generics const pour définir la taille du tableau
///
/// # Returns
/// * `Ok([u8; N])` - Tableau de bytes lu si disponible
/// * `Err(&'static str)` - Si pas assez de bytes disponibles
pub fn read_fixed_bytes<const N: usize>(&mut self) -> Result<[u8; N], &'static str> {
if self.remaining() >= N {
let mut array = [0u8; N];
array.copy_from_slice(&self.data[self.position..self.position + N]);
self.position += N;
Ok(array)
} else {
Err("Not enough data for fixed bytes")
}
}
/// Lit tous les bytes restants dans le buffer
///
/// Après cet appel, le reader sera vide (position = taille totale)
///
/// # Returns
/// Slice contenant tous les bytes restants
pub fn read_remaining(&mut self) -> &'a [u8] {
let slice = &self.data[self.position..];
self.position = self.data.len();
slice
}
}
/// Structure pour construire séquentiellement des données binaires
///
/// Contrairement à ByteReader, cette structure possède ses propres données
/// et permet d'écrire des valeurs de différents types.
pub struct ByteWriter {
/// Buffer interne pour stocker les données écrites
data: Vec<u8>,
}
impl ByteWriter {
/// Crée un nouveau writer avec un Vec vide
pub fn new() -> Self {
Self { data: Vec::new() }
}
/// Crée un nouveau writer avec une capacité pré-allouée
///
/// Utile pour éviter les réallocations si la taille finale
/// est approximativement connue
///
/// # Arguments
/// * `capacity` - Capacité initiale du buffer
pub fn with_capacity(capacity: usize) -> Self {
Self {
data: Vec::with_capacity(capacity),
}
}
/// Écrit un byte (u8) dans le buffer
///
/// # Arguments
/// * `value` - Valeur à écrire
pub fn write_u8(&mut self, value: u8) {
self.data.push(value);
}
/// Écrit un entier 16-bit en big-endian
///
/// # Arguments
/// * `value` - Valeur à écrire
pub fn write_u16_be(&mut self, value: u16) {
self.data.extend_from_slice(&value.to_be_bytes());
}
/// Écrit un entier 32-bit en big-endian
///
/// # Arguments
/// * `value` - Valeur à écrire
pub fn write_u32_be(&mut self, value: u32) {
self.data.extend_from_slice(&value.to_be_bytes());
}
/// Écrit un entier 64-bit en big-endian
///
/// # Arguments
/// * `value` - Valeur à écrire
pub fn write_u64_be(&mut self, value: u64) {
self.data.extend_from_slice(&value.to_be_bytes());
}
/// Écrit une séquence de bytes dans le buffer
///
/// # Arguments
/// * `bytes` - Slice de bytes à écrire
pub fn write_bytes(&mut self, bytes: &[u8]) {
self.data.extend_from_slice(bytes);
}
/// Écrit un tableau de bytes de taille fixe
///
/// # Arguments
/// * `bytes` - Tableau de bytes à écrire
pub fn write_fixed_bytes<const N: usize>(&mut self, bytes: [u8; N]) {
self.data.extend_from_slice(&bytes);
}
/// Consomme le writer et retourne le Vec contenant les données
///
/// # Returns
/// Vec<u8> contenant toutes les données écrites
pub fn into_vec(self) -> Vec<u8> {
self.data
}
/// Retourne une référence vers les données sous forme de slice
///
/// # Returns
/// Slice des données écrites
pub fn as_slice(&self) -> &[u8] {
&self.data
}
/// Retourne la taille actuelle du buffer
pub fn len(&self) -> usize {
self.data.len()
}
/// Vérifie si le buffer est vide
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
}
/// Implémentation du trait Default pour ByteWriter
///
/// Permet d'utiliser ByteWriter::default() comme équivalent de ByteWriter::new()
impl Default for ByteWriter {
fn default() -> Self {
Self::new()
}
}
/// Fonctions utilitaires standalone pour la lecture directe sans état
///
/// Ces fonctions permettent de lire des valeurs à des offsets spécifiques
/// sans avoir besoin de créer un ByteReader.
/// Lit un byte à l'offset spécifié
///
/// # Arguments
/// * `data` - Slice de données source
/// * `offset` - Position de lecture
///
/// # Returns
/// * `Some(u8)` - Valeur lue si l'offset est valide
/// * `None` - Si l'offset dépasse la taille des données
pub fn read_u8_at(data: &[u8], offset: usize) -> Option<u8> {
data.get(offset).copied()
}
/// Lit un entier 16-bit big-endian à l'offset spécifié
///
/// # Arguments
/// * `data` - Slice de données source
/// * `offset` - Position de lecture
///
/// # Returns
/// * `Some(u16)` - Valeur lue si 2 bytes sont disponibles à l'offset
/// * `None` - Si pas assez de bytes disponibles
pub fn read_u16_be_at(data: &[u8], offset: usize) -> Option<u16> {
if data.len() >= offset + 2 {
Some(u16::from_be_bytes([data[offset], data[offset + 1]]))
} else {
None
}
}
/// Lit un entier 32-bit big-endian à l'offset spécifié
///
/// # Arguments
/// * `data` - Slice de données source
/// * `offset` - Position de lecture
///
/// # Returns
/// * `Some(u32)` - Valeur lue si 4 bytes sont disponibles à l'offset
/// * `None` - Si pas assez de bytes disponibles
pub fn read_u32_be_at(data: &[u8], offset: usize) -> Option<u32> {
if data.len() >= offset + 4 {
Some(u32::from_be_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]))
} else {
None
}
}
/// Lit un entier 64-bit big-endian à l'offset spécifié
///
/// # Arguments
/// * `data` - Slice de données source
/// * `offset` - Position de lecture
///
/// # Returns
/// * `Some(u64)` - Valeur lue si 8 bytes sont disponibles à l'offset
/// * `None` - Si pas assez de bytes disponibles
pub fn read_u64_be_at(data: &[u8], offset: usize) -> Option<u64> {
if data.len() >= offset + 8 {
Some(u64::from_be_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
data[offset + 4],
data[offset + 5],
data[offset + 6],
data[offset + 7],
]))
} else {
None
}
}

View File

@@ -1 +1 @@
pub mod byte_utils; pub mod shared_store;

1114
src/utils/shared_store.rs Normal file

File diff suppressed because it is too large Load Diff