Compare commits

...

3 Commits

Author SHA1 Message Date
62dc6deb79 init 2025-07-29 20:20:02 +02:00
9b8461314f init 2025-07-20 04:17:36 +02:00
0e9c2b08d6 init 2025-07-19 03:45:57 +02:00
41 changed files with 4320 additions and 496 deletions

2
.gitignore vendored
View File

@@ -1 +1,3 @@
/target
.env
db.sqlite

213
.idea/workspace.xml generated Normal file
View File

@@ -0,0 +1,213 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="AnalysisUIOptions">
<option name="ANALYZE_INJECTED_CODE" value="false" />
<option name="SCOPE_TYPE" value="3" />
</component>
<component name="AutoImportSettings">
<option name="autoReloadType" value="ALL" />
</component>
<component name="CargoProjects">
<cargoProject FILE="$PROJECT_DIR$/Cargo.toml">
<package file="$PROJECT_DIR$">
<enabledFeature name="default" />
</package>
</cargoProject>
</component>
<component name="ChangeListManager">
<list default="true" id="ca698286-778f-4335-97c8-da35a666c986" name="Changes" comment="init">
<change afterPath="$PROJECT_DIR$/README.md" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/app/conf.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/domain/models.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/network/http.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/network/http_routes/channel.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/network/http_routes/master.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/network/http_routes/message.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/network/http_routes/mod.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/network/http_routes/user.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/network/http_routes/websocket.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/store/mod.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/store/models/channel.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/store/models/link_sub_server_user.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/store/models/message.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/store/models/mod.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/store/models/sub_server.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/store/models/user.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/store/repositories/channel_repository.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/store/repositories/link_sub_server_user_repository.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/store/repositories/message_repository.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/store/repositories/mod.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/store/repositories/sub_server_repository.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/store/repositories/user_repository.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/store/store_service.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.gitignore" beforeDir="false" afterPath="$PROJECT_DIR$/.gitignore" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/Cargo.lock" beforeDir="false" afterPath="$PROJECT_DIR$/Cargo.lock" afterDir="false" />
<change beforePath="$PROJECT_DIR$/Cargo.toml" beforeDir="false" afterPath="$PROJECT_DIR$/Cargo.toml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/app/app.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/app/app.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/app/mod.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/app/mod.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/domain/mod.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/domain/mod.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/lib.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/lib.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/main.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/network/mod.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/network/mod.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/runtime/dispatcher.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/runtime/dispatcher.rs" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
<option name="LAST_RESOLUTION" value="IGNORE" />
</component>
<component name="ExecutionTargetManager" SELECTED_TARGET="RsBuildProfile:dev" />
<component name="FileTemplateManagerImpl">
<option name="RECENT_TEMPLATES">
<list>
<option value="Rust File" />
</list>
</option>
</component>
<component name="Git.Settings">
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
</component>
<component name="MacroExpansionManager">
<option name="directoryName" value="oBePpDlK" />
</component>
<component name="ProjectColorInfo">{
&quot;associatedIndex&quot;: 5
}</component>
<component name="ProjectId" id="2zaZ93S6zEp6mJe7Xq5WfvIMexv" />
<component name="ProjectViewState">
<option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" />
</component>
<component name="PropertiesComponent">{
&quot;keyToString&quot;: {
&quot;Cargo.Run.executor&quot;: &quot;Run&quot;,
&quot;ModuleVcsDetector.initialDetectionPerformed&quot;: &quot;true&quot;,
&quot;RunOnceActivity.ShowReadmeOnStart&quot;: &quot;true&quot;,
&quot;RunOnceActivity.git.unshallow&quot;: &quot;true&quot;,
&quot;RunOnceActivity.rust.reset.selective.auto.import&quot;: &quot;true&quot;,
&quot;git-widget-placeholder&quot;: &quot;master&quot;,
&quot;ignore.virus.scanning.warn.message&quot;: &quot;true&quot;,
&quot;junie.onboarding.icon.badge.shown&quot;: &quot;true&quot;,
&quot;last_opened_file_path&quot;: &quot;D:/Dev/ox_speak_server/src/network&quot;,
&quot;node.js.detected.package.eslint&quot;: &quot;true&quot;,
&quot;node.js.detected.package.tslint&quot;: &quot;true&quot;,
&quot;node.js.selected.package.eslint&quot;: &quot;(autodetect)&quot;,
&quot;node.js.selected.package.tslint&quot;: &quot;(autodetect)&quot;,
&quot;nodejs_package_manager_path&quot;: &quot;npm&quot;,
&quot;org.rust.cargo.project.model.PROJECT_DISCOVERY&quot;: &quot;true&quot;,
&quot;org.rust.cargo.project.model.impl.CargoExternalSystemProjectAware.subscribe.first.balloon&quot;: &quot;&quot;,
&quot;org.rust.first.attach.projects&quot;: &quot;true&quot;,
&quot;run.code.analysis.last.selected.profile&quot;: &quot;pProject Default&quot;,
&quot;settings.editor.selected.configurable&quot;: &quot;terminal&quot;,
&quot;vue.rearranger.settings.migration&quot;: &quot;true&quot;
}
}</component>
<component name="RecentsManager">
<key name="CopyFile.RECENT_KEYS">
<recent name="D:\Dev\ox_speak_server\src\network" />
</key>
<key name="MoveFile.RECENT_KEYS">
<recent name="D:\Dev\ox_speak_server\src\store" />
<recent name="D:\Dev\ox_speak_server\src\store\repositories" />
</key>
</component>
<component name="RunManager">
<configuration name="Run" type="CargoCommandRunConfiguration" factoryName="Cargo Command">
<option name="command" value="run --package ox_speak_server --bin ox_speak_server" />
<option name="workingDirectory" value="file://$PROJECT_DIR$" />
<envs />
<option name="emulateTerminal" value="true" />
<option name="channel" value="DEFAULT" />
<option name="requiredFeatures" value="true" />
<option name="allFeatures" value="false" />
<option name="withSudo" value="false" />
<option name="buildTarget" value="REMOTE" />
<option name="backtrace" value="SHORT" />
<option name="isRedirectInput" value="false" />
<option name="redirectInputPath" value="" />
<method v="2">
<option name="CARGO.BUILD_TASK_PROVIDER" enabled="true" />
</method>
</configuration>
</component>
<component name="RustProjectSettings">
<option name="toolchainHomeDirectory" value="$USER_HOME$/.cargo/bin" />
</component>
<component name="TaskManager">
<task active="true" id="Default" summary="Default task">
<changelist id="ca698286-778f-4335-97c8-da35a666c986" name="Changes" comment="" />
<created>1751970990022</created>
<option name="number" value="Default" />
<option name="presentableId" value="Default" />
<updated>1751970990022</updated>
<workItem from="1751970991653" duration="7327000" />
<workItem from="1752016714585" duration="1095000" />
<workItem from="1752076174877" duration="11015000" />
<workItem from="1752189013977" duration="4750000" />
<workItem from="1752224469499" duration="14591000" />
<workItem from="1752305152715" duration="19879000" />
<workItem from="1752391851097" duration="12061000" />
<workItem from="1752446764168" duration="8166000" />
<workItem from="1752484190578" duration="17709000" />
<workItem from="1752591656862" duration="429000" />
<workItem from="1752593250094" duration="11969000" />
<workItem from="1752694022400" duration="6212000" />
<workItem from="1752741840195" duration="2946000" />
<workItem from="1752833798325" duration="5366000" />
<workItem from="1752917416027" duration="1192000" />
<workItem from="1752931843330" duration="2938000" />
<workItem from="1752997629708" duration="9070000" />
<workItem from="1753107912894" duration="595000" />
<workItem from="1753225761416" duration="1356000" />
<workItem from="1753282037526" duration="5207000" />
<workItem from="1753397680991" duration="1782000" />
<workItem from="1753399490773" duration="3189000" />
<workItem from="1753436756029" duration="16895000" />
<workItem from="1753521176318" duration="17811000" />
<workItem from="1753601912332" duration="5843000" />
<workItem from="1753718175508" duration="8774000" />
<workItem from="1753800817354" duration="3570000" />
<workItem from="1753804571241" duration="59000" />
<workItem from="1753804642657" duration="236000" />
<workItem from="1753804898179" duration="625000" />
<workItem from="1753805533139" duration="2909000" />
</task>
<task id="LOCAL-00001" summary="init">
<option name="closed" value="true" />
<created>1752591904243</created>
<option name="number" value="00001" />
<option name="presentableId" value="LOCAL-00001" />
<option name="project" value="LOCAL" />
<updated>1752591904243</updated>
</task>
<task id="LOCAL-00002" summary="init">
<option name="closed" value="true" />
<created>1752591989113</created>
<option name="number" value="00002" />
<option name="presentableId" value="LOCAL-00002" />
<option name="project" value="LOCAL" />
<updated>1752591989113</updated>
</task>
<option name="localTasksCounter" value="3" />
<servers />
</component>
<component name="TypeScriptGeneratedFilesManager">
<option name="version" value="3" />
</component>
<component name="Vcs.Log.Tabs.Properties">
<option name="TAB_STATES">
<map>
<entry key="MAIN">
<value>
<State />
</value>
</entry>
</map>
</option>
</component>
<component name="VcsManagerConfiguration">
<MESSAGE value="init" />
<option name="LAST_COMMIT_MESSAGE" value="init" />
</component>
</project>

1840
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -23,3 +23,10 @@ uuid = {version = "1.17", features = ["v4", "serde"] }
event-listener = "5.4"
dashmap = "6.1"
bytes = "1.10"
arc-swap = "1.7"
crossbeam-utils = "0.8"
kanal = "0.1"
axum = { version = "0.8", features = ["ws", "default"] }
chrono = {version = "0.4", features = ["serde"]}
sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "chrono", "uuid", "any", "sqlite", "postgres", "mysql" ] }
dotenvy = "0.15"

4
README.md Normal file
View File

@@ -0,0 +1,4 @@
créer un fichier de migration sqlx :
```shell
sqlx migrate add --source src/store/migrations migration_name
```

View File

@@ -3,50 +3,64 @@ use tokio::sync::mpsc;
use tokio::time::interval;
use crate::domain::client::ClientManager;
use crate::domain::event::{Event, EventBus};
use crate::network::http::HttpServer;
use crate::network::udp::UdpServer;
use crate::runtime::dispatcher::Dispatcher;
use crate::store::store_service::StoreService;
pub struct App {
// Communication inter-components
event_bus: EventBus,
dispatcher: Dispatcher,
event_rx: Option<mpsc::Receiver<Event>>,
event_rx: kanal::AsyncReceiver<Event>,
// Network
udp_server: UdpServer,
http_server: HttpServer,
// Clients
client_manager: ClientManager,
// store
store: StoreService,
}
impl App {
pub async fn new() -> Self {
let (event_bus, event_rx) = EventBus::new();
let store = StoreService::new("./db.sqlite").await.unwrap();
let udp_server = UdpServer::new(event_bus.clone(), "0.0.0.0:5000").await;
let http_server = HttpServer::new(event_bus.clone(), store.clone());
let client_manager = ClientManager::new();
let dispatcher = Dispatcher::new(event_bus.clone(), udp_server.clone(), client_manager.clone()).await;
let dispatcher = Dispatcher::new(event_bus.clone(), udp_server.clone(), client_manager.clone(), store.clone()).await;
Self {
event_bus,
dispatcher,
event_rx: Some(event_rx),
event_rx,
udp_server,
client_manager
http_server,
client_manager,
store
}
}
pub async fn start(&mut self) {
if let Some(event_rx) = self.event_rx.take() {
for i in 0..4 {
let dispatcher = self.dispatcher.clone();
let event_rx = self.event_rx.clone();
tokio::spawn(async move {
dispatcher.start(event_rx).await;
});
}
let _ = self.udp_server.start().await;
let _ = self.http_server.start("0.0.0.0:5000").await;
let _ = self.tick_tasks().await;
println!("App started");
}

28
src/app/conf.rs Normal file
View File

@@ -0,0 +1,28 @@
use std::env;
use std::path::Path;
pub fn load_env() {
// display le répertoire de travail
match env::current_dir() {
Ok(path) => {
println!("Répertoire de travail: {}", path.display())
}
Err(e) => {
eprintln!("Erreur pour obtenir le répertoire: {}", e)
}
}
// Vérifier si le .env existe
let env_path = Path::new(".env");
if env_path.exists() {
println!(".env trouvé");
// Charger le .env
match dotenvy::from_path(env_path) {
Ok(_) => println!("✅ Fichier .env chargé avec succès"),
Err(e) => eprintln!("❌ Erreur lors du chargement du .env: {}", e),
}
} else {
println!("⚠️ Le fichier .env n'existe pas");
}
}

View File

@@ -1 +1,2 @@
pub mod app;
pub mod conf;

View File

@@ -1,34 +1,40 @@
//! Gestion des clients pour les connexions UDP
//!
//! Ce module fournit les structures et méthodes pour gérer les clients
//! connectés au serveur UDP, incluant leur tracking et leurs modifications.
use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::Arc;
use dashmap::DashMap;
use tokio::time::Instant;
use uuid::Uuid;
use std::hash::{Hash, Hasher};
use std::time::Duration;
use crossbeam_utils::atomic::AtomicCell;
use crate::utils::shared_store::SharedArcMap;
/// Représente un client connecté au serveur UDP
///
/// Chaque client est identifié par un UUID unique et contient
/// son adresse réseau ainsi que l'heure de sa dernière activité.
/// Le `last_seen` utilise AtomicCell pour des mises à jour lock-free.
#[derive(Debug)]
pub struct Client {
id: Uuid,
address: SocketAddr,
last_seen: Instant,
last_seen: AtomicCell<Instant>,
}
/// Gestionnaire threadsafe pour les clients connectés
///
/// Utilise `DashMap` pour permettre un accès concurrent sécurisé
/// aux clients depuis plusieurs threads.
/// Utilise `SharedArcMap` pour permettre un accès concurrent sécurisé
/// aux clients depuis plusieurs threads avec des performances optimisées
/// pour les lectures fréquentes. Les clients sont stockés dans des Arc
/// pour éviter les clones coûteux.
#[derive(Clone)]
pub struct ClientManager {
clients: Arc<DashMap<SocketAddr, Client>>,
udp_clients: SharedArcMap<SocketAddr, Client>,
}
impl Client {
@@ -38,7 +44,7 @@ impl Client {
Self {
id,
address,
last_seen: Instant::now(),
last_seen: AtomicCell::new(Instant::now()),
}
}
@@ -53,13 +59,15 @@ impl Client {
}
/// Retourne l'instant de la dernière activité du client
/// Accès lock-free grâce à AtomicCell
pub fn last_seen(&self) -> Instant {
self.last_seen
self.last_seen.load()
}
/// Met à jour l'heure de dernière activité du client à maintenant
pub fn update_last_seen(&mut self) {
self.last_seen = Instant::now();
/// Opération lock-free grâce à AtomicCell
pub fn update_last_seen(&self) {
self.last_seen.store(Instant::now());
}
}
@@ -81,53 +89,96 @@ impl ClientManager {
/// Crée un nouveau gestionnaire de clients vide
pub fn new() -> Self {
Self {
clients: Arc::new(DashMap::new()),
udp_clients: SharedArcMap::new(),
}
}
/// Crée un nouveau gestionnaire de clients avec une capacité initiale
pub fn with_capacity(capacity: usize) -> Self {
Self {
udp_clients: SharedArcMap::with_capacity(capacity),
}
}
/// Ajoute un client au gestionnaire
pub fn add(&self, client: Client) {
self.clients.insert(client.address(), client);
/// Retourne l'ancien client Arc s'il existait déjà
pub fn add(&self, client: Client) -> Option<Arc<Client>> {
self.udp_clients.insert(client.address(), client)
}
/// Supprime un client du gestionnaire
pub fn remove(&self, client: Client) {
self.clients.remove(&client.address());
/// Supprime un client du gestionnaire par son adresse
/// Retourne l'Arc du client supprimé
pub fn remove(&self, address: SocketAddr) -> Option<Arc<Client>> {
self.udp_clients.remove(&address)
}
/// Supprime un client du gestionnaire par son instance
/// Retourne l'Arc du client supprimé
pub fn remove_client(&self, client: &Client) -> Option<Arc<Client>> {
self.udp_clients.remove(&client.address())
}
/// Vérifie si un client existe pour une adresse donnée
pub fn client_exists(&self, address: SocketAddr) -> bool {
self.clients.contains_key(&address)
self.udp_clients.contains_key(&address)
}
/// Récupère une référence vers un client par son adresse
pub fn get_client_by_address(&self, address: SocketAddr) -> Option<dashmap::mapref::one::Ref<SocketAddr, Client>> {
self.clients.get(&address)
/// Récupère un Arc vers le client par son adresse
/// Très efficace - pas de clone du Client
pub fn get_client_by_address(&self, address: SocketAddr) -> Option<Arc<Client>> {
self.udp_clients.get(&address)
}
pub fn get_uuid_by_address(&self, address: SocketAddr) -> Option<Uuid> {
self.udp_clients.get(&address).map(|client| client.id)
}
/// Récupère toutes les adresses des clients connectés
pub fn get_all_adresses(&self) -> Vec<SocketAddr> {
self.clients.iter().map(|entry| *entry.key()).collect()
pub fn get_all_addresses(&self) -> HashSet<SocketAddr> {
self.udp_clients.keys().collect()
}
/// Retourne le nombre de clients connectés
pub fn len(&self) -> usize {
self.udp_clients.len()
}
/// Vérifie si le gestionnaire est vide
pub fn is_empty(&self) -> bool {
self.udp_clients.is_empty()
}
/// Met à jour l'heure de dernière activité d'un client
pub fn update_client_last_seen(&self, address: SocketAddr) {
if let Some(mut client) = self.clients.get_mut(&address) {
/// Utilise la méthode modify de SharedArcMap pour une opération lock-free
pub fn update_client_last_seen(&self, address: SocketAddr) -> bool {
self.udp_clients.modify(&address, |client| {
client.update_last_seen();
})
}
/// Supprime les clients trop vieux
/// Utilise l'accès lock-free pour identifier les clients à supprimer
pub fn cleanup(&self, max_age: Duration) {
let now = Instant::now();
let addresses_to_remove: Vec<SocketAddr> = self
.udp_clients
.read()
.iter()
.filter(|(_, client)| now - client.last_seen() >= max_age)
.map(|(addr, _)| *addr)
.collect();
for address in addresses_to_remove {
self.udp_clients.remove(&address);
}
}
/// Supprimer les clients trop vieux
pub fn cleanup(&self, max_age: Duration) {
let now = Instant::now();
self.clients.retain(|_, client| now - client.last_seen() < max_age);
}
/// Modifie un client via une closure
/// Utilise la méthode modify optimisée de SharedArcMap
///
/// # Arguments
/// * `address` - L'adresse du client à modifier
/// * `f` - La closure qui recevra une référence mutable vers le client
/// * `f` - La closure qui recevra une référence vers le client
///
/// # Returns
/// `true` si le client a été trouvé et modifié, `false` sinon
@@ -137,26 +188,77 @@ impl ClientManager {
/// let client_manager = ClientManager::new();
/// let addr = "127.0.0.1:8080".parse().unwrap();
///
/// // Mise à jour simple
/// // Mise à jour de last_seen (lock-free)
/// client_manager.modify_client(addr, |client| {
/// client.update_last_seen();
/// });
///
/// // Modifications multiples
/// let success = client_manager.modify_client(addr, |client| {
/// client.update_last_seen();
/// // autres modifications...
/// // Accès aux propriétés du client
/// let found = client_manager.modify_client(addr, |client| {
/// println!("Client ID: {}", client.id());
/// println!("Dernière activité: {:?}", client.last_seen());
/// });
/// ```
pub fn modify_client<F>(&self, address: SocketAddr, f: F) -> bool
where
F: FnOnce(&mut Client),
F: FnOnce(&Client),
{
if let Some(mut client) = self.clients.get_mut(&address) {
f(&mut *client);
true
} else {
false
}
self.udp_clients.modify(&address, f)
}
/// Obtient une référence en lecture seule vers tous les clients
/// Accès lock-free ultra-rapide
pub fn read_all(&self) -> Arc<std::collections::HashMap<SocketAddr, Arc<Client>>> {
self.udp_clients.read()
}
/// Vide tous les clients
pub fn clear(&self) {
self.udp_clients.clear();
}
/// Itère sur tous les clients avec leurs Arc
/// Très efficace - pas de clone des clients
pub fn iter(&self) -> impl Iterator<Item = (SocketAddr, Arc<Client>)> {
self.udp_clients.iter()
}
/// Itère sur toutes les adresses des clients
pub fn addresses(&self) -> impl Iterator<Item = SocketAddr> {
self.udp_clients.keys()
}
/// Itère sur tous les Arc des clients
pub fn clients(&self) -> impl Iterator<Item = Arc<Client>> {
self.udp_clients.values()
}
/// Compte le nombre de clients actifs dans une durée donnée
/// Utilise l'accès lock-free pour une performance optimale
pub fn count_active_clients(&self, max_age: Duration) -> usize {
let now = Instant::now();
self.udp_clients
.read()
.iter()
.filter(|(_, client)| now - client.last_seen() < max_age)
.count()
}
/// Obtient les adresses des clients actifs
/// Très efficace grâce à l'accès lock-free
pub fn get_active_addresses(&self, max_age: Duration) -> Vec<SocketAddr> {
let now = Instant::now();
self.udp_clients
.read()
.iter()
.filter(|(_, client)| now - client.last_seen() < max_age)
.map(|(addr, _)| *addr)
.collect()
}
}
impl Default for ClientManager {
fn default() -> Self {
Self::new()
}
}

View File

@@ -1,4 +1,3 @@
use tokio::sync::mpsc;
use crate::network::protocol::{UDPMessage};
#[derive(Clone, Debug)]
@@ -16,12 +15,12 @@ pub enum Event {
#[derive(Clone)]
pub struct EventBus {
pub sender: mpsc::Sender<Event>,
pub sender: kanal::AsyncSender<Event>,
}
impl EventBus {
pub fn new() -> (Self, mpsc::Receiver<Event>) {
let (sender, receiver) = mpsc::channel(10000);
pub fn new() -> (Self, kanal::AsyncReceiver<Event>) {
let (sender, receiver) = kanal::bounded_async::<Event>(4096);
(Self { sender }, receiver)
}
@@ -33,7 +32,7 @@ impl EventBus {
let _ = self.sender.try_send(event);
}
pub fn clone_sender(&self) -> mpsc::Sender<Event> {
pub fn clone_sender(&self) -> kanal::AsyncSender<Event> {
self.sender.clone()
}
}

View File

@@ -1,3 +1,4 @@
pub mod event;
pub mod user;
pub mod client;
pub mod models;

65
src/domain/models.rs Normal file
View File

@@ -0,0 +1,65 @@
use std::sync::Arc;
use parking_lot::Mutex;
use uuid::Uuid;
use chrono::{DateTime, Utc};
/////////////////////////////////////
//////////// Main models ///////////
////////////////////////////////////
struct SubServer {
// Un serveur (master) peu avoir plusieurs subserver
// (le master, pas besoin de le représenter, sa configuration sera depuis un .env)
id: Uuid,
name: String,
password: String, // voir si on le hash, mais sera certainement pas nécessaire.
created_at: DateTime<Utc>,
}
struct Channel {
id: Uuid,
channel_type: ChannelType, // le type ne pourra pas être edit, comme discord
sub_server_id: Uuid, // fk subserver
name: String,
created_at: DateTime<Utc>,
}
struct User {
id: Uuid,
name: String, // je sais pas si il est nécessaire étant donné qu'on utilisera display_name de la relation.
pub_key: String, // l'identification se fera par clé public/privé, comme teamspeak
}
struct Message {
id: Uuid,
channel_id: Uuid,
user_id: Uuid,
content: String,
created_at: DateTime<Utc>,
}
/////////////////////////////////////
////////// n-n relations ///////////
////////////////////////////////////
struct SubServerUser {
sub_server: SubServer,
user: User,
display_name: String,
}
/////////////////////////////////////
//////// Enum Type (choice) ////////
////////////////////////////////////
enum ChannelType {
Text,
Voice,
}
/// Store centralisé pour tous les modèles de données
#[derive(Clone)]
pub struct DataStore {
sub_servers: Arc<Mutex<SubServer>>
}

View File

@@ -4,3 +4,4 @@ pub mod domain;
pub mod network;
pub mod runtime;
pub mod utils;
pub mod store;

View File

@@ -1,8 +1,12 @@
use tokio::signal;
use ox_speak_server_lib::app::app::App;
use ox_speak_server_lib::app::conf::load_env;
#[tokio::main]
async fn main() {
// Charger le .env
load_env();
let mut app = App::new().await;
app.start().await;

70
src/network/http.rs Normal file
View File

@@ -0,0 +1,70 @@
use std::net::SocketAddr;
use std::sync::Arc;
use axum::{
extract::{ws::WebSocket, ws::WebSocketUpgrade, State},
response::Response,
Json, Router,
routing::{get, post}
};
use tokio::net::TcpListener;
use crate::domain::event::EventBus;
use crate::network::http_routes::master;
use crate::store::store_service::StoreService;
#[derive(Clone)]
pub struct HttpState {
pub event_bus: EventBus,
pub store: StoreService
}
#[derive(Clone)]
pub struct HttpServer {
state: HttpState
}
impl HttpServer {
pub fn new(event_bus: EventBus, store: StoreService) -> Self {
Self {
state: HttpState {
event_bus,
store
},
}
}
pub async fn start(&self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
let router = self.create_router();
let listener = TcpListener::bind(addr).await?;
println!("HTTP/WebSocket server listening on addr {}", listener.local_addr()?);
tokio::spawn(async move {
let _ = axum::serve(listener, router).await;
});
Ok(())
}
fn create_router(&self) -> Router {
let api_route = Router::new()
.nest("/master", master::create_router());
Router::new()
.nest("/api", api_route)
// .route("/ws", get(Self::ws))
// .route("/stats/", get(Self::stats))
.with_state(self.state.clone())
}
}
/// Routes
impl HttpServer {
async fn stats(State(state): State<Arc<HttpState>>) -> Json<String> {
todo!("a faire")
}
async fn ws(ws: WebSocketUpgrade, State(state): State<Arc<HttpState>>) -> Response {
ws.on_upgrade(|socket| async move {
todo!("a faire")
})
}
}

View File

View File

@@ -0,0 +1,18 @@
use std::collections::{HashMap};
use std::hash::Hash;
use axum::{
extract::{Path, State},
response::Json,
routing::{get, post, put, delete},
Router,
};
use crate::network::http::HttpState;
pub fn create_router() -> Router<HttpState> {
Router::new()
.route("/auth/", post(join_master_server))
}
pub async fn join_master_server(State(state): State<HttpState>) -> Json<HashMap<String, String>> {
todo!("join master server")
}

View File

View File

@@ -0,0 +1,5 @@
pub mod user;
pub mod channel;
pub mod message;
pub mod websocket;
pub mod master;

View File

View File

View File

@@ -1,2 +1,4 @@
pub mod protocol;
pub mod udp;
pub mod http;
pub mod http_routes;

View File

@@ -1,32 +1,40 @@
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::task::AbortHandle;
use crate::domain::client::ClientManager;
use crate::domain::client::{Client, ClientManager};
use crate::domain::event::{Event, EventBus};
use crate::network::protocol::{UDPMessageType, UDPMessage};
use crate::network::protocol::{UDPMessageType, UDPMessage, UdpBroadcastMessage, UDPMessageData};
use crate::network::udp::UdpServer;
use crate::store::store_service::StoreService;
#[derive(Clone)]
pub struct Dispatcher {
event_bus: EventBus,
event_bus: Arc<EventBus>,
udp_server: UdpServer,
client_manager: ClientManager
udp_server: Arc<UdpServer>,
client_manager: Arc<ClientManager>,
store: StoreService,
}
impl Dispatcher {
pub async fn new(event_bus: EventBus, udp_server: UdpServer, client_manager: ClientManager) -> Self {
pub async fn new(event_bus: EventBus, udp_server: UdpServer, client_manager: ClientManager, store: StoreService) -> Self {
Self {
event_bus,
udp_server,
client_manager,
event_bus: Arc::new(event_bus),
udp_server: Arc::new(udp_server),
client_manager: Arc::new(client_manager),
store,
}
}
pub async fn start(&self, mut receiver: mpsc::Receiver<Event>) {
pub async fn start(&self, receiver: kanal::AsyncReceiver<Event>) {
println!("Dispatcher démarré sur le thread : {:?}", std::thread::current().id());
let (_udp_in_abort_handle, udp_in_sender) = self.udp_in_handler().await;
while let Some(event) = receiver.recv().await {
while let Ok(event) = receiver.recv().await {
match event {
Event::UdpIn(message) => {
let _ = udp_in_sender.send(message).await;
@@ -62,17 +70,36 @@ impl Dispatcher {
pub async fn udp_in_handler(&self) -> (AbortHandle, mpsc::Sender<UDPMessage>) {
let (sender, mut consumer) = mpsc::channel::<UDPMessage>(1024);
let udp_server = self.udp_server.clone();
let client_manager = self.client_manager.clone();
let task = tokio::spawn(async move {
while let Some(message) = consumer.recv().await {
// Traitement direct du message sans double parsing
match message.message_type() {
UDPMessageType::Ping => {
println!("ping from {:?}", message);
let response_message = UDPMessage::server_ping(message.address, message.get_message_id().unwrap());
let _ = udp_server.send_udp_message(&response_message);
if client_manager.client_exists(message.address) {
client_manager.update_client_last_seen(message.address);
}else {
client_manager.add(Client::new(message.address));
println!("new client: {:?}", message.address);
}
let _ = udp_server.send_udp_message(&response_message).await;
}
UDPMessageType::Audio => {
// Traiter l'audio
if let UDPMessageData::ClientAudio { sequence, data } = &message.data {
let addresses = client_manager.get_all_addresses();
if let Some(speaker_uuid) = client_manager.get_uuid_by_address(message.address) {
let response_message = UdpBroadcastMessage::server_audio(addresses, speaker_uuid, *sequence, data.clone());
let _ = udp_server.broadcast_udp_message(&response_message).await;
} else {
// Tu peux gérer ici le cas où lUUID nest pas trouvé (optionnel)
// println!("UUID non trouvé pour l'adresse: {:?}", message.address);
}
}
}
}
}

View File

@@ -0,0 +1,70 @@
-- Add migration script here
-- Création de la table user
CREATE TABLE user
(
id TEXT PRIMARY KEY NOT NULL,
username TEXT NOT NULL UNIQUE,
email TEXT,
avatar_url TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
-- Création de la table sub_server
CREATE TABLE sub_server
(
id TEXT PRIMARY KEY NOT NULL,
name TEXT NOT NULL,
description TEXT,
owner_id TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY (owner_id) REFERENCES user (id)
);
-- Création de la table channel
CREATE TABLE channel
(
id TEXT PRIMARY KEY NOT NULL,
name TEXT NOT NULL,
description TEXT,
channel_type TEXT NOT NULL,
sub_server_id TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY (sub_server_id) REFERENCES sub_server (id)
);
-- Création de la table message
CREATE TABLE message
(
id TEXT PRIMARY KEY NOT NULL,
content TEXT NOT NULL,
author_id TEXT NOT NULL,
channel_id TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY (author_id) REFERENCES user (id),
FOREIGN KEY (channel_id) REFERENCES channel (id)
);
-- Table de liaison pour les utilisateurs et sub_servers (relation N-N)
CREATE TABLE sub_server_user
(
id TEXT PRIMARY KEY NOT NULL,
sub_server_id TEXT NOT NULL,
user_id TEXT NOT NULL,
joined_at TEXT NOT NULL,
role TEXT,
FOREIGN KEY (sub_server_id) REFERENCES sub_server (id),
FOREIGN KEY (user_id) REFERENCES user (id),
UNIQUE (sub_server_id, user_id)
);
-- Index pour améliorer les performances
CREATE INDEX idx_channel_sub_server ON channel (sub_server_id);
CREATE INDEX idx_message_channel ON message (channel_id);
CREATE INDEX idx_message_author ON message (author_id);
CREATE INDEX idx_sub_server_user_sub_server ON sub_server_user (sub_server_id);
CREATE INDEX idx_sub_server_user_user ON sub_server_user (user_id);

3
src/store/mod.rs Normal file
View File

@@ -0,0 +1,3 @@
pub mod models;
pub mod repositories;
pub mod store_service;

View File

@@ -0,0 +1,32 @@
// use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
// use uuid::Uuid;
use sqlx::types::{Uuid, chrono::{{ DateTime, Utc}}};
#[derive(Debug, Clone, sqlx::FromRow, Serialize, Deserialize)]
pub struct Channel {
pub id: Uuid,
pub sub_server_id: Uuid,
pub channel_type: ChannelType,
pub name: String,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Clone, sqlx::Type, Serialize, Deserialize)]
#[repr(i32)]
pub enum ChannelType {
Text = 0,
Voice = 1,
}
impl Channel {
pub fn new(sub_server_id: Uuid, channel_type: ChannelType, name: String) -> Self {
Self {
id: Uuid::new_v4(),
sub_server_id,
channel_type,
name,
created_at: Utc::now(),
}
}
}

View File

@@ -0,0 +1,32 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, sqlx::FromRow, Serialize, Deserialize)]
pub struct LinkSubServerUser {
pub id: Option<u64>,
pub sub_server_id: Uuid,
pub user_id: Uuid,
pub display_username: String,
pub joined_at: DateTime<Utc>,
pub last_seen_at: Option<DateTime<Utc>>,
pub is_admin: bool,
}
impl LinkSubServerUser {
pub fn new(
sub_server_id: Uuid,
user_id: Uuid,
display_username: String
) -> Self {
Self {
id: None,
sub_server_id,
user_id,
display_username,
joined_at: Utc::now(),
last_seen_at: None,
is_admin: false,
}
}
}

View File

@@ -0,0 +1,24 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, sqlx::FromRow, Serialize, Deserialize)]
pub struct Message {
pub id: Uuid,
pub channel_id: Uuid,
pub user_id: Uuid,
pub content: String,
pub created_at: DateTime<Utc>,
}
impl Message {
pub fn new(channel_id: Uuid, user_id: Uuid, content: String) -> Self {
Self {
id: Uuid::new_v4(),
channel_id,
user_id,
content,
created_at: Utc::now(),
}
}
}

11
src/store/models/mod.rs Normal file
View File

@@ -0,0 +1,11 @@
pub mod sub_server;
pub mod channel;
pub mod user;
pub mod message;
pub mod link_sub_server_user;
pub use sub_server::*;
pub use channel::*;
pub use user::*;
pub use message::*;
pub use link_sub_server_user::*;

View File

@@ -0,0 +1,24 @@
// L'application peut avoir plusieurs sous servers
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, sqlx::FromRow, Serialize, Deserialize)]
pub struct SubServer {
pub id: Uuid,
pub name: String,
pub password: String, // voir si on le hash, mais sera certainement pas nécessaire.
pub created_at: DateTime<Utc>,
}
impl SubServer {
pub fn new(name: String, password: String) -> Self {
Self {
id: Uuid::new_v4(),
name,
password,
created_at: Utc::now(),
}
}
}

13
src/store/models/user.rs Normal file
View File

@@ -0,0 +1,13 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, sqlx::FromRow, Serialize, Deserialize)]
pub struct User {
pub id: Uuid,
pub username: String,
pub pub_key: String, // l'identification se fera par clé public/privé, comme teamspeak
pub created_at: DateTime<Utc>,
}

View File

@@ -0,0 +1,100 @@
use std::sync::Arc;
use sqlx::SqlitePool;
use uuid::Uuid;
use crate::store::models::channel::Channel;
use crate::store::store_service::StoreService;
use crate::utils::shared_store::SharedArcMap;
#[derive(Clone)]
pub struct ChannelRepository {
store: StoreService,
}
impl ChannelRepository {
pub fn new(store: StoreService) -> Self {
Self {
store
}
}
}
impl ChannelRepository {
// getters
pub async fn all(&self) -> Vec<Arc<Channel>> {
self.store.channels.values().collect()
}
pub async fn get(&self, id: Uuid) -> Option<Arc<Channel>> {
self.store.channels.get(&id)
}
}
// pub id: Uuid,
// pub sub_server_id: Uuid,
// pub channel_type: ChannelType,
// pub name: String,
// pub created_at: DateTime<Utc>,
impl ChannelRepository {
// writers
pub async fn create(&self, mut channel: Channel) -> Result<Arc<Channel>, sqlx::Error> {
sqlx::query(
"INSERT INTO channel (id, sub_server, channel_type, name, created_at) VALUES (?, ?, ?, ?, ?)"
)
.bind(&channel.id)
.bind(&channel.sub_server_id)
.bind(&channel.channel_type)
.bind(&channel.name)
.bind(&channel.created_at)
.execute(&self.store.db)
.await?;
// ajouter au cache
let arc_server = Arc::new(channel.clone());
self.store.channels.insert_arc(channel.id, arc_server.clone());
Ok(arc_server)
}
pub async fn save(&self, sub_server: &Channel) -> Result<(), sqlx::Error> {
sqlx::query(
"UPDATE channel SET name = ? WHERE id = ?"
)
.bind(&sub_server.name)
.bind(&sub_server.id)
.execute(&self.store.db)
.await?;
// Mettre à jour le cache
self.store.channels.insert(sub_server.id, sub_server.clone());
Ok(())
}
pub async fn delete(&self, id: Uuid) -> Result<bool, sqlx::Error> {
let rows_affected = sqlx::query("DELETE FROM sub_server WHERE id = ?")
.bind(&id)
.execute(&self.store.db)
.await?
.rows_affected();
if rows_affected > 0 {
self.store.channels.remove(&id);
Ok(true)
} else {
Ok(false)
}
}
// Pour initialiser le cache depuis la DB
pub async fn load_all_from_db(&self) -> Result<(), sqlx::Error> {
let servers: Vec<Channel> = sqlx::query_as("SELECT * FROM sub_server")
.fetch_all(&self.store.db)
.await?;
for server in servers {
self.store.channels.insert(server.id, server);
}
Ok(())
}
}

View File

@@ -0,0 +1,67 @@
use std::sync::Arc;
use uuid::Uuid;
use chrono::{DateTime, Utc};
use sqlx::SqlitePool;
use crate::store::models::link_sub_server_user::LinkSubServerUser;
use crate::store::store_service::StoreService;
use crate::utils::shared_store::SharedArcVec;
#[derive(Clone)]
pub struct LinkSubServerUserRepository {
store: StoreService
}
impl LinkSubServerUserRepository {
pub fn new(store: StoreService) -> Self {
Self { store }
}
}
impl LinkSubServerUserRepository {
// getters
/// Obtenir une relation spécifique
pub async fn get_relation(&self, sub_server_id: Uuid, user_id: Uuid) -> Option<LinkSubServerUser> {
self.store.sub_server_users.iter()
.find(|relation| {
relation.sub_server_id == sub_server_id && relation.user_id == user_id
})
.map(|arc| (*arc).clone())
}
pub async fn exists(&self, sub_server_id: Uuid, user_id: Uuid) -> bool {
self.get_relation(sub_server_id, user_id).await.is_some()
}
}
impl LinkSubServerUserRepository {
// writers
/// Créer une nouvelle relation
pub async fn create(&self, through: LinkSubServerUser) -> Result<LinkSubServerUser, sqlx::Error> {
// Vérifier que la relation n'existe pas déjà
if self.exists(through.sub_server_id, through.user_id).await {
return Err(sqlx::Error::RowNotFound); // Ou une erreur custom
}
// Insérer en base
sqlx::query(
"INSERT INTO sub_server_users
(sub_server_id, user_id, display_name, joined_at, last_seen, is_admin)
VALUES (?, ?, ?, ?, ?, ?)"
)
.bind(&through.sub_server_id)
.bind(&through.user_id)
.bind(&through.display_username)
.bind(&through.joined_at)
.bind(&through.last_seen_at)
.bind(&through.is_admin)
.execute(&self.store.db)
.await?;
// Ajouter au cache
self.store.sub_server_users.push(through.clone());
Ok(through)
}
}

View File

@@ -0,0 +1,70 @@
use std::sync::Arc;
use sqlx::SqlitePool;
use uuid::Uuid;
use crate::store::models::message::Message;
use crate::store::store_service::StoreService;
#[derive(Clone)]
pub struct MessageRepository {
store: StoreService
}
impl MessageRepository {
pub fn new(store: StoreService) -> Self {
Self {
store
}
}
}
impl MessageRepository {
// getters
}
impl MessageRepository {
// writers
pub async fn create(&self, mut message: Message) -> Result<Arc<Message>, sqlx::Error> {
sqlx::query(
"INSERT INTO message (id, channel_id, user_id, content, created_at) VALUES (?, ?, ?, ?, ?)"
)
.bind(&message.id)
.bind(&message.channel_id)
.bind(&message.user_id)
.bind(&message.content)
.bind(&message.created_at)
.execute(&self.store.db)
.await?;
// ajouter au cache
let arc_message = Arc::new(message.clone());
Ok(arc_message)
}
pub async fn save(&self, message: &Message) -> Result<(), sqlx::Error> {
sqlx::query(
"UPDATE message SET content = ? WHERE id = ?"
)
.bind(&message.content)
.bind(&message.id)
.execute(&self.store.db)
.await?;
Ok(())
}
pub async fn delete(&self, id: Uuid) -> Result<bool, sqlx::Error> {
let rows_affected = sqlx::query("DELETE FROM message WHERE id = ?")
.bind(&id)
.execute(&self.store.db)
.await?
.rows_affected();
if rows_affected > 0 {
Ok(true)
} else {
Ok(false)
}
}
}

View File

@@ -0,0 +1,11 @@
mod sub_server_repository;
mod channel_repository;
mod user_repository;
mod message_repository;
mod link_sub_server_user_repository;
pub use sub_server_repository::*;
pub use channel_repository::*;
pub use user_repository::*;
pub use message_repository::*;
pub use link_sub_server_user_repository::*;

View File

@@ -0,0 +1,95 @@
use std::sync::Arc;
use sqlx::SqlitePool;
use uuid::Uuid;
use crate::store::models::sub_server::SubServer;
use crate::store::store_service::StoreService;
use crate::utils::shared_store::SharedArcMap;
#[derive(Clone)]
pub struct SubServerRepository {
store: StoreService
}
impl SubServerRepository {
pub fn new(store: StoreService) -> Self {
Self {
store
}
}
}
impl SubServerRepository {
// getters
pub async fn all(&self) -> Vec<Arc<SubServer>> {
self.store.sub_servers.values().collect()
}
pub async fn get(&self, id: Uuid) -> Option<Arc<SubServer>> {
self.store.sub_servers.get(&id)
}
}
impl SubServerRepository {
// writers
pub async fn create(&self, mut sub_server: SubServer) -> Result<Arc<SubServer>, sqlx::Error> {
sqlx::query(
"INSERT INTO sub_server (id, name, password, created_at) VALUES (?, ?, ?, ?)"
)
.bind(&sub_server.id)
.bind(&sub_server.name)
.bind(&sub_server.password)
.bind(&sub_server.created_at)
.execute(&self.store.db)
.await?;
// ajouter au cache
let arc_server = Arc::new(sub_server.clone());
self.store.sub_servers.insert_arc(sub_server.id, arc_server.clone());
Ok(arc_server)
}
pub async fn save(&self, sub_server: &SubServer) -> Result<(), sqlx::Error> {
sqlx::query(
"UPDATE sub_server SET name = ?, password = ? WHERE id = ?"
)
.bind(&sub_server.name)
.bind(&sub_server.password)
.bind(&sub_server.id)
.execute(&self.store.db)
.await?;
// Mettre à jour le cache
self.store.sub_servers.insert(sub_server.id, sub_server.clone());
Ok(())
}
pub async fn delete(&self, id: Uuid) -> Result<bool, sqlx::Error> {
let rows_affected = sqlx::query("DELETE FROM sub_server WHERE id = ?")
.bind(&id)
.execute(&self.store.db)
.await?
.rows_affected();
if rows_affected > 0 {
self.store.sub_servers.remove(&id);
Ok(true)
} else {
Ok(false)
}
}
// Pour initialiser le cache depuis la DB
pub async fn load_all_from_db(&self) -> Result<(), sqlx::Error> {
let servers: Vec<SubServer> = sqlx::query_as("SELECT * FROM sub_server")
.fetch_all(&self.store.db)
.await?;
for server in servers {
self.store.sub_servers.insert(server.id, server);
}
Ok(())
}
}

View File

@@ -0,0 +1,96 @@
use std::sync::Arc;
use sqlx::SqlitePool;
use uuid::Uuid;
use crate::store::models::user::User;
use crate::utils::shared_store::SharedArcMap;
#[derive(Clone)]
pub struct UserRepository {
db: SqlitePool,
cache: SharedArcMap<Uuid, User>
}
impl UserRepository {
pub fn new(db: SqlitePool, cache: SharedArcMap<Uuid, User>) -> Self {
Self {
db,
cache
}
}
}
impl UserRepository {
// getters
pub async fn all(&self) -> Vec<Arc<User>> {
self.cache.values().collect()
}
pub async fn get(&self, id: Uuid) -> Option<Arc<User>> {
self.cache.get(&id)
}
}
impl UserRepository {
// writers
pub async fn create(&self, mut user: User) -> Result<Arc<User>, sqlx::Error> {
sqlx::query(
"INSERT INTO user (id, name, password, created_at) VALUES (?, ?, ?, ?)"
)
.bind(&user.id)
.bind(&user.username)
.bind(&user.pub_key)
.bind(&user.created_at)
.execute(&self.db)
.await?;
// ajouter au cache
let arc_server = Arc::new(user.clone());
self.cache.insert_arc(user.id, arc_server.clone());
Ok(arc_server)
}
pub async fn save(&self, user: &User) -> Result<(), sqlx::Error> {
sqlx::query(
"UPDATE user SET name = ?, password = ? WHERE id = ?"
)
.bind(&user.username)
.bind(&user.pub_key)
.bind(&user.id)
.execute(&self.db)
.await?;
// Mettre à jour le cache
self.cache.insert(user.id, user.clone());
Ok(())
}
pub async fn delete(&self, id: Uuid) -> Result<bool, sqlx::Error> {
let rows_affected = sqlx::query("DELETE FROM user WHERE id = ?")
.bind(&id)
.execute(&self.db)
.await?
.rows_affected();
if rows_affected > 0 {
self.cache.remove(&id);
Ok(true)
} else {
Ok(false)
}
}
// Pour initialiser le cache depuis la DB
pub async fn load_all_from_db(&self) -> Result<(), sqlx::Error> {
let servers: Vec<User> = sqlx::query_as("SELECT * FROM user")
.fetch_all(&self.db)
.await?;
for server in servers {
self.cache.insert(server.id, server);
}
Ok(())
}
}

111
src/store/store_service.rs Normal file
View File

@@ -0,0 +1,111 @@
use std::sync::Arc;
use uuid::Uuid;
use sqlx::{AnyPool, SqlitePool};
use crate::store::models::{SubServer, Channel, User, Message, ChannelType, LinkSubServerUser};
use crate::store::repositories::{
SubServerRepository, ChannelRepository, UserRepository,
MessageRepository, LinkSubServerUserRepository
};
use crate::utils::shared_store::{SharedArcMap, SharedArcVec};
#[derive(Debug, Clone)]
pub enum StoreEvent {
SubServerCreated(Arc<SubServer>),
SubServerUpdated(Arc<SubServer>),
ChannelCreated(Arc<Channel>),
ChannelUpdated(Arc<Channel>),
MessageSent(Message),
UserJoinedSubServer { sub_server_id: Uuid, user_id: Uuid },
}
#[derive(Clone)]
pub struct StoreService {
// Database
pub db: SqlitePool,
// ✅ Caches mémoire centralisés
pub users: SharedArcMap<Uuid, User>,
pub sub_servers: SharedArcMap<Uuid, SubServer>,
pub channels: SharedArcMap<Uuid, Channel>,
pub sub_server_users: SharedArcVec<LinkSubServerUser>,
}
impl StoreService {
pub async fn new(database_url: &str) -> Result<Self, sqlx::Error> {
let connection_url = Self::normalize_database_url(database_url);
println!("🔍 Tentative de connexion à: {}", connection_url);
let db = SqlitePool::connect(&connection_url).await?;
sqlx::migrate!("./src/store/migrations").run(&db).await?;
let service = Self {
db,
users: SharedArcMap::new(),
sub_servers: SharedArcMap::new(),
channels: SharedArcMap::new(),
sub_server_users: SharedArcVec::new(),
};
// Charger tout en mémoire au démarrage
let _ = service.load_all_caches().await;
Ok(service)
}
async fn load_all_caches(&self) -> Result<(), sqlx::Error> {
// Users
let users: Vec<User> = sqlx::query_as("SELECT * FROM user")
.fetch_all(&self.db)
.await?;
for user in users {
self.users.insert(user.id, user);
}
// SubServers
let sub_servers: Vec<SubServer> = sqlx::query_as("SELECT * FROM sub_server")
.fetch_all(&self.db)
.await?;
for sub_server in sub_servers {
self.sub_servers.insert(sub_server.id, sub_server);
}
// Channels
let channels: Vec<Channel> = sqlx::query_as("SELECT * FROM channel")
.fetch_all(&self.db)
.await?;
for channel in channels {
self.channels.insert(channel.id, channel);
}
// Relations N-N
let relations: Vec<LinkSubServerUser> = sqlx::query_as("SELECT * FROM sub_server_user")
.fetch_all(&self.db)
.await?;
for relation in relations {
self.sub_server_users.push(relation);
}
Ok(())
}
}
impl StoreService {
// ===== HELPERS =====
/// ✅ Normalise l'URL pour supporter différentes bases de données
fn normalize_database_url(database_url: &str) -> String {
// Si c'est déjà une URL complète, on la retourne telle quelle
if database_url.contains("://") {
return database_url.to_string();
}
// Sinon, on assume que c'est SQLite (comportement actuel)
if database_url.starts_with("sqlite:") {
database_url.to_string()
} else {
format!("sqlite:{}?mode=rwc", database_url)
}
}
}

View File

@@ -1,398 +0,0 @@
use uuid::Uuid;
/// Helpers pour la manipulation de bytes - idéal pour les protocoles binaires
///
/// Cette structure permet de lire séquentiellement des données binaires
/// en maintenant une position de lecture interne.
pub struct ByteReader<'a> {
/// Référence vers les données à lire
data: &'a [u8],
/// Position actuelle dans le buffer de lecture
position: usize,
}
impl<'a> ByteReader<'a> {
/// Crée un nouveau lecteur de bytes à partir d'un slice
///
/// # Arguments
/// * `data` - Le slice de bytes à lire
///
/// # Example
/// ```text
/// let data = &[0x01, 0x02, 0x03, 0x04];
/// let reader = ByteReader::new(data);
/// ```
pub fn new(data: &'a [u8]) -> Self {
Self { data, position: 0 }
}
/// Retourne le nombre de bytes restants à lire
///
/// Utilise `saturating_sub` pour éviter les débordements
/// si la position dépasse la taille des données
pub fn remaining(&self) -> usize {
self.data.len().saturating_sub(self.position)
}
/// Vérifie si tous les bytes ont été lus
///
/// # Returns
/// `true` si il n'y a plus de bytes à lire
pub fn is_empty(&self) -> bool {
self.remaining() == 0
}
/// Retourne la position actuelle de lecture
pub fn position(&self) -> usize {
self.position
}
/// Déplace la position de lecture à l'index spécifié
///
/// La position est automatiquement limitée à la taille des données
/// pour éviter les débordements
///
/// # Arguments
/// * `position` - Nouvelle position de lecture
pub fn seek(&mut self, position: usize) {
self.position = position.min(self.data.len());
}
/// Lit un byte (u8) à la position actuelle
///
/// # Returns
/// * `Ok(u8)` - La valeur lue si disponible
/// * `Err(&'static str)` - Si la fin du buffer est atteinte
pub fn read_u8(&mut self) -> Result<u8, &'static str> {
if self.position < self.data.len() {
let value = self.data[self.position];
self.position += 1;
Ok(value)
} else {
Err("Not enough data for u8")
}
}
/// Lit un entier 16-bit en big-endian
///
/// # Returns
/// * `Ok(u16)` - La valeur lue si 2 bytes sont disponibles
/// * `Err(&'static str)` - Si moins de 2 bytes sont disponibles
pub fn read_u16_be(&mut self) -> Result<u16, &'static str> {
if self.remaining() >= 2 {
let value = u16::from_be_bytes([
self.data[self.position],
self.data[self.position + 1],
]);
self.position += 2;
Ok(value)
} else {
Err("Not enough data for u16")
}
}
/// Lit un entier 32-bit en big-endian
///
/// # Returns
/// * `Ok(u32)` - La valeur lue si 4 bytes sont disponibles
/// * `Err(&'static str)` - Si moins de 4 bytes sont disponibles
pub fn read_u32_be(&mut self) -> Result<u32, &'static str> {
if self.remaining() >= 4 {
let value = u32::from_be_bytes([
self.data[self.position],
self.data[self.position + 1],
self.data[self.position + 2],
self.data[self.position + 3],
]);
self.position += 4;
Ok(value)
} else {
Err("Not enough data for u32")
}
}
/// Lit un entier 64-bit en big-endian
///
/// # Returns
/// * `Ok(u64)` - La valeur lue si 8 bytes sont disponibles
/// * `Err(&'static str)` - Si moins de 8 bytes sont disponibles
pub fn read_u64_be(&mut self) -> Result<u64, &'static str> {
if self.remaining() >= 8 {
let value = u64::from_be_bytes([
self.data[self.position],
self.data[self.position + 1],
self.data[self.position + 2],
self.data[self.position + 3],
self.data[self.position + 4],
self.data[self.position + 5],
self.data[self.position + 6],
self.data[self.position + 7],
]);
self.position += 8;
Ok(value)
} else {
Err("Not enough data for u64")
}
}
/// Lit un UUID (16 bytes) à la position actuelle
///
/// Les UUIDs sont stockés sous forme de 16 bytes consécutifs.
/// Cette méthode lit ces 16 bytes et les convertit en UUID.
///
/// # Returns
/// * `Ok(Uuid)` - L'UUID lu si 16 bytes sont disponibles
/// * `Err(&'static str)` - Si moins de 16 bytes sont disponibles
pub fn read_uuid(&mut self) -> Result<Uuid, &'static str> {
if self.remaining() >= 16 {
let uuid_bytes = self.read_fixed_bytes::<16>()?;
Ok(Uuid::from_bytes(uuid_bytes))
} else {
Err("Not enough data for UUID")
}
}
/// Lit une séquence de bytes de longueur spécifiée
///
/// # Arguments
/// * `len` - Nombre de bytes à lire
///
/// # Returns
/// * `Ok(&[u8])` - Slice des bytes lus si disponibles
/// * `Err(&'static str)` - Si pas assez de bytes disponibles
pub fn read_bytes(&mut self, len: usize) -> Result<&'a [u8], &'static str> {
if self.remaining() >= len {
let slice = &self.data[self.position..self.position + len];
self.position += len;
Ok(slice)
} else {
Err("Not enough data for bytes")
}
}
/// Lit un tableau de bytes de taille fixe définie à la compilation
///
/// Utilise les generics const pour définir la taille du tableau
///
/// # Returns
/// * `Ok([u8; N])` - Tableau de bytes lu si disponible
/// * `Err(&'static str)` - Si pas assez de bytes disponibles
pub fn read_fixed_bytes<const N: usize>(&mut self) -> Result<[u8; N], &'static str> {
if self.remaining() >= N {
let mut array = [0u8; N];
array.copy_from_slice(&self.data[self.position..self.position + N]);
self.position += N;
Ok(array)
} else {
Err("Not enough data for fixed bytes")
}
}
/// Lit tous les bytes restants dans le buffer
///
/// Après cet appel, le reader sera vide (position = taille totale)
///
/// # Returns
/// Slice contenant tous les bytes restants
pub fn read_remaining(&mut self) -> &'a [u8] {
let slice = &self.data[self.position..];
self.position = self.data.len();
slice
}
}
/// Structure pour construire séquentiellement des données binaires
///
/// Contrairement à ByteReader, cette structure possède ses propres données
/// et permet d'écrire des valeurs de différents types.
pub struct ByteWriter {
/// Buffer interne pour stocker les données écrites
data: Vec<u8>,
}
impl ByteWriter {
/// Crée un nouveau writer avec un Vec vide
pub fn new() -> Self {
Self { data: Vec::new() }
}
/// Crée un nouveau writer avec une capacité pré-allouée
///
/// Utile pour éviter les réallocations si la taille finale
/// est approximativement connue
///
/// # Arguments
/// * `capacity` - Capacité initiale du buffer
pub fn with_capacity(capacity: usize) -> Self {
Self {
data: Vec::with_capacity(capacity),
}
}
/// Écrit un byte (u8) dans le buffer
///
/// # Arguments
/// * `value` - Valeur à écrire
pub fn write_u8(&mut self, value: u8) {
self.data.push(value);
}
/// Écrit un entier 16-bit en big-endian
///
/// # Arguments
/// * `value` - Valeur à écrire
pub fn write_u16_be(&mut self, value: u16) {
self.data.extend_from_slice(&value.to_be_bytes());
}
/// Écrit un entier 32-bit en big-endian
///
/// # Arguments
/// * `value` - Valeur à écrire
pub fn write_u32_be(&mut self, value: u32) {
self.data.extend_from_slice(&value.to_be_bytes());
}
/// Écrit un entier 64-bit en big-endian
///
/// # Arguments
/// * `value` - Valeur à écrire
pub fn write_u64_be(&mut self, value: u64) {
self.data.extend_from_slice(&value.to_be_bytes());
}
/// Écrit une séquence de bytes dans le buffer
///
/// # Arguments
/// * `bytes` - Slice de bytes à écrire
pub fn write_bytes(&mut self, bytes: &[u8]) {
self.data.extend_from_slice(bytes);
}
/// Écrit un tableau de bytes de taille fixe
///
/// # Arguments
/// * `bytes` - Tableau de bytes à écrire
pub fn write_fixed_bytes<const N: usize>(&mut self, bytes: [u8; N]) {
self.data.extend_from_slice(&bytes);
}
/// Consomme le writer et retourne le Vec contenant les données
///
/// # Returns
/// Vec<u8> contenant toutes les données écrites
pub fn into_vec(self) -> Vec<u8> {
self.data
}
/// Retourne une référence vers les données sous forme de slice
///
/// # Returns
/// Slice des données écrites
pub fn as_slice(&self) -> &[u8] {
&self.data
}
/// Retourne la taille actuelle du buffer
pub fn len(&self) -> usize {
self.data.len()
}
/// Vérifie si le buffer est vide
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
}
/// Implémentation du trait Default pour ByteWriter
///
/// Permet d'utiliser ByteWriter::default() comme équivalent de ByteWriter::new()
impl Default for ByteWriter {
fn default() -> Self {
Self::new()
}
}
/// Fonctions utilitaires standalone pour la lecture directe sans état
///
/// Ces fonctions permettent de lire des valeurs à des offsets spécifiques
/// sans avoir besoin de créer un ByteReader.
/// Lit un byte à l'offset spécifié
///
/// # Arguments
/// * `data` - Slice de données source
/// * `offset` - Position de lecture
///
/// # Returns
/// * `Some(u8)` - Valeur lue si l'offset est valide
/// * `None` - Si l'offset dépasse la taille des données
pub fn read_u8_at(data: &[u8], offset: usize) -> Option<u8> {
data.get(offset).copied()
}
/// Lit un entier 16-bit big-endian à l'offset spécifié
///
/// # Arguments
/// * `data` - Slice de données source
/// * `offset` - Position de lecture
///
/// # Returns
/// * `Some(u16)` - Valeur lue si 2 bytes sont disponibles à l'offset
/// * `None` - Si pas assez de bytes disponibles
pub fn read_u16_be_at(data: &[u8], offset: usize) -> Option<u16> {
if data.len() >= offset + 2 {
Some(u16::from_be_bytes([data[offset], data[offset + 1]]))
} else {
None
}
}
/// Lit un entier 32-bit big-endian à l'offset spécifié
///
/// # Arguments
/// * `data` - Slice de données source
/// * `offset` - Position de lecture
///
/// # Returns
/// * `Some(u32)` - Valeur lue si 4 bytes sont disponibles à l'offset
/// * `None` - Si pas assez de bytes disponibles
pub fn read_u32_be_at(data: &[u8], offset: usize) -> Option<u32> {
if data.len() >= offset + 4 {
Some(u32::from_be_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]))
} else {
None
}
}
/// Lit un entier 64-bit big-endian à l'offset spécifié
///
/// # Arguments
/// * `data` - Slice de données source
/// * `offset` - Position de lecture
///
/// # Returns
/// * `Some(u64)` - Valeur lue si 8 bytes sont disponibles à l'offset
/// * `None` - Si pas assez de bytes disponibles
pub fn read_u64_be_at(data: &[u8], offset: usize) -> Option<u64> {
if data.len() >= offset + 8 {
Some(u64::from_be_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
data[offset + 4],
data[offset + 5],
data[offset + 6],
data[offset + 7],
]))
} else {
None
}
}

View File

@@ -1 +1 @@
pub mod byte_utils;
pub mod shared_store;

1114
src/utils/shared_store.rs Normal file

File diff suppressed because it is too large Load Diff