Compare commits

...

3 Commits

Author SHA1 Message Date
a6626571fa init 2025-10-05 21:47:49 +02:00
04c2750f0b init 2025-09-28 18:38:53 +02:00
b8c797a2fc init 2025-09-28 18:38:48 +02:00
79 changed files with 1540 additions and 2692 deletions

124
.idea/workspace.xml generated
View File

@@ -6,62 +6,33 @@
<component name="CargoProjects">
<cargoProject FILE="$PROJECT_DIR$/Cargo.toml">
<package file="$PROJECT_DIR$">
<enabledFeature name="default" />
<feature name="default" enabled="true" />
</package>
</cargoProject>
</component>
<component name="ChangeListManager">
<list default="true" id="b2b598c9-ef0b-4cbc-8852-cfbc8ce3920e" name="Changes" comment="">
<change beforePath="$PROJECT_DIR$/.idea/dataSources.local.xml" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/dataSources.xml" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/dataSources/26059583-0fdb-4f6f-ad11-10388e9658c2.xml" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/dataSources/26059583-0fdb-4f6f-ad11-10388e9658c2/storage_v2/_src_/schema/main.uQUzAA.meta" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/modules.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/modules.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/ox_speak_server.iml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/ox_speak_server.iml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/vcs.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/vcs.xml" afterDir="false" />
<list default="true" id="b2b598c9-ef0b-4cbc-8852-cfbc8ce3920e" name="Changes" comment="init">
<change afterPath="$PROJECT_DIR$/src/db/mod.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/db/models/_channel_user.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/db/models/_server_user.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/db/models/attachment.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/db/models/category.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/db/models/channel.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/db/models/message.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/db/models/mod.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/db/models/server.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/db/models/user.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/db/repositories/_server_user_repository.rs" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/db/repositories/mod.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/Cargo.lock" beforeDir="false" afterPath="$PROJECT_DIR$/Cargo.lock" afterDir="false" />
<change beforePath="$PROJECT_DIR$/Cargo.toml" beforeDir="false" afterPath="$PROJECT_DIR$/Cargo.toml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/README.md" beforeDir="false" afterPath="$PROJECT_DIR$/README.md" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/app/app.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/app/app.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/app/conf.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/app/conf.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/app/mod.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/app/mod.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/domain/client.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/domain/client.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/domain/event.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/domain/event.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/domain/mod.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/domain/mod.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/domain/models.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/domain/models.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/domain/user.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/domain/user.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/lib.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/lib.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/network/http.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/network/http.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/network/http_routes/channel.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/network/http_routes/channel.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/network/http_routes/master.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/network/http_routes/master.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/network/http_routes/message.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/network/http_routes/message.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/network/http_routes/mod.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/network/http_routes/mod.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/network/http_routes/sub_server.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/network/http_routes/sub_server.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/network/http_routes/user.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/network/http_routes/user.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/network/http_routes/websocket.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/network/http_routes/websocket.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/network/http/mod.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/network/http/mod.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/network/http/server.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/network/http/server.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/network/mod.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/network/mod.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/network/protocol.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/network/protocol.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/network/udp.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/network/udp.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/network/udp_back.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/network/udp_back.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/runtime/dispatcher.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/runtime/dispatcher.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/store/migrations/001_init.sqlite.sql" beforeDir="false" afterPath="$PROJECT_DIR$/src/store/migrations/001_init.sqlite.sql" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/store/mod.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/store/mod.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/store/models/channel.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/store/models/channel.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/store/models/link_sub_server_user.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/store/models/link_sub_server_user.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/store/models/message.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/store/models/message.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/store/models/mod.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/store/models/mod.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/store/models/sub_server.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/store/models/sub_server.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/store/models/user.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/store/models/user.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/store/repositories/channel_repository.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/store/repositories/channel_repository.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/store/repositories/link_sub_server_user_repository.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/store/repositories/link_sub_server_user_repository.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/store/repositories/message_repository.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/store/repositories/message_repository.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/store/repositories/mod.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/store/repositories/mod.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/store/repositories/sub_server_repository.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/store/repositories/sub_server_repository.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/store/repositories/user_repository.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/store/repositories/user_repository.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/store/session/client.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/store/session/client.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/store/store_service.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/store/store_service.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/utils/shared_store.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/utils/shared_store.rs" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/store/mod.rs" beforeDir="false" afterPath="$PROJECT_DIR$/src/db/repositories/_channel_user_repository.rs" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
@@ -69,7 +40,19 @@
<option name="LAST_RESOLUTION" value="IGNORE" />
</component>
<component name="ExecutionTargetManager" SELECTED_TARGET="RsBuildProfile:dev" />
<component name="FileTemplateManagerImpl">
<option name="RECENT_TEMPLATES">
<list>
<option value="Rust File" />
</list>
</option>
</component>
<component name="Git.Settings">
<option name="RECENT_BRANCH_BY_REPOSITORY">
<map>
<entry key="$PROJECT_DIR$" value="master" />
</map>
</option>
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
</component>
<component name="MacroExpansionManager">
@@ -91,7 +74,7 @@
&quot;RunOnceActivity.TerminalTabsStorage.copyFrom.TerminalArrangementManager.252&quot;: &quot;true&quot;,
&quot;RunOnceActivity.git.unshallow&quot;: &quot;true&quot;,
&quot;RunOnceActivity.rust.reset.selective.auto.import&quot;: &quot;true&quot;,
&quot;git-widget-placeholder&quot;: &quot;master&quot;,
&quot;git-widget-placeholder&quot;: &quot;v2&quot;,
&quot;ignore.virus.scanning.warn.message&quot;: &quot;true&quot;,
&quot;junie.onboarding.icon.badge.shown&quot;: &quot;true&quot;,
&quot;last_opened_file_path&quot;: &quot;//wsl.localhost/Debian/home/Nell/linux_dev/ox_speak_server&quot;,
@@ -108,6 +91,14 @@
&quot;vue.rearranger.settings.migration&quot;: &quot;true&quot;
}
}</component>
<component name="RecentsManager">
<key name="MoveFile.RECENT_KEYS">
<recent name="\\wsl.localhost\Debian\home\Nell\linux_dev\ox_speak_server\src" />
<recent name="\\wsl.localhost\Debian\home\Nell\linux_dev\ox_speak_server\src\store\db" />
<recent name="\\wsl.localhost\Debian\home\Nell\linux_dev\ox_speak_server\src\app\http" />
<recent name="\\wsl.localhost\Debian\home\Nell\linux_dev\ox_speak_server\src\network" />
</key>
</component>
<component name="RunManager" selected="Cargo.Run ox_speak_server">
<configuration name="Run ox_speak_server" type="CargoCommandRunConfiguration" factoryName="Cargo Command">
<option name="buildProfileId" value="dev" />
@@ -157,12 +148,51 @@
<updated>1755963473950</updated>
<workItem from="1755963475439" duration="2760000" />
<workItem from="1756139974994" duration="110000" />
<workItem from="1756217915613" duration="12208000" />
<workItem from="1758356408901" duration="789000" />
<workItem from="1758380522178" duration="7468000" />
<workItem from="1758439568539" duration="4787000" />
<workItem from="1758951672825" duration="25708000" />
<workItem from="1759042700460" duration="16518000" />
<workItem from="1759562385229" duration="218000" />
<workItem from="1759562619209" duration="13323000" />
<workItem from="1759649037947" duration="3578000" />
<workItem from="1759657715670" duration="10824000" />
</task>
<task id="LOCAL-00001" summary="init">
<option name="closed" value="true" />
<created>1756218076891</created>
<option name="number" value="00001" />
<option name="presentableId" value="LOCAL-00001" />
<option name="project" value="LOCAL" />
<updated>1756218076891</updated>
</task>
<task id="LOCAL-00002" summary="init">
<option name="closed" value="true" />
<created>1759077528343</created>
<option name="number" value="00002" />
<option name="presentableId" value="LOCAL-00002" />
<option name="project" value="LOCAL" />
<updated>1759077528343</updated>
</task>
<task id="LOCAL-00003" summary="init">
<option name="closed" value="true" />
<created>1759077533456</created>
<option name="number" value="00003" />
<option name="presentableId" value="LOCAL-00003" />
<option name="project" value="LOCAL" />
<updated>1759077533456</updated>
</task>
<option name="localTasksCounter" value="4" />
<servers />
</component>
<component name="TypeScriptGeneratedFilesManager">
<option name="version" value="3" />
</component>
<component name="VcsManagerConfiguration">
<MESSAGE value="init" />
<option name="LAST_COMMIT_MESSAGE" value="init" />
</component>
<component name="XSLT-Support.FileAssociations.UIState">
<expand />
<select />

679
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -14,19 +14,21 @@ crate-type = ["staticlib", "cdylib", "rlib"]
debug = true
[dependencies]
serde = { version = "1", features = ["derive"] }
serde_json = "1"
parking_lot = "0.12"
log = "0.4"
env_logger = "0.11"
tokio = { version = "1.47", features = ["full"] }
strum = {version = "0.27", features = ["derive"] }
uuid = {version = "1.18", features = ["v4", "serde"] }
event-listener = "5.4"
dashmap = "6.1"
bytes = "1.10"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
arc-swap = "1.7"
crossbeam-utils = "0.8"
kanal = "0.1"
axum = { version = "0.8", features = ["ws", "default"] }
chrono = {version = "0.4", features = ["serde"]}
sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "chrono", "uuid", "any", "sqlite", "postgres", "mysql" ] }
dotenvy = "0.15"
envy = "0.4"
socket2 = "0.6"
parking_lot = "0.12"
axum = "0.8"
tower = "0.5"
tower-http = "0.6"
hyper = "1.7"
sqlx = { version = "0.8", features = ["runtime-tokio", "tls-rustls", "sqlite", "postgres", "mysql", "uuid", "chrono", "migrate"] }
uuid = { version = "1.18", features = ["v4", "v7", "serde"] }
chrono = { version = "0.4", features = ["serde"] }

View File

@@ -1,6 +1,15 @@
L'idée est d'avoir une séparation avec une gestion à la "Qt". En gros, avoir des composants "indépendants" qui connaisse leur propre état
et mettre à dispo une succession de méthodes pour interagir avec ces composants dans "App"
## Modules :
- App : Execution générale et manipulation des autres composant en "haut niveau", interconnexion entre les app
- Network :
- Http : API
- WebSocket : Un server websocket avec ses clients (pour la gestion haut niveau)
- UDP : Un server UDP avec ses clients (pour la gestion haut niveau)
créer un fichier de migration sqlx :
```shell
sqlx migrate add --source src/store/migrations migration_name
```
# Example

View File

@@ -1,80 +1,61 @@
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::interval;
use crate::domain::client::ClientManager;
use crate::domain::event::{Event, EventBus};
use crate::network::http::HttpServer;
use crate::network::udp::UdpServer;
use crate::runtime::dispatcher::Dispatcher;
use crate::store::store_service::StoreService;
use std::io;
use std::sync::Arc;
use std::net::SocketAddr;
use crate::app::http::router::configure_routes;
use crate::network::http::server::HttpServer;
use crate::network::udp::server::UdpServer;
use crate::utils::config::Config;
use crate::utils::logger::ContextLogger;
pub struct App {
// Communication inter-components
event_bus: EventBus,
dispatcher: Dispatcher,
event_rx: kanal::AsyncReceiver<Event>,
config: Config,
context: Arc<Context>,
// Network
udp_server: UdpServer,
http_server: HttpServer,
// Clients
client_manager: ClientManager,
// store
store: StoreService,
logger: ContextLogger,
}
impl App {
pub async fn new() -> Self {
let (event_bus, event_rx) = EventBus::new();
pub async fn new(config: Config) -> io::Result<Self> {
let logger = ContextLogger::new("APP");
let store = StoreService::new("./db.sqlite").await.unwrap();
// initialise context
let udp_server = match UdpServer::new("127.0.0.1:8080") {
Ok(udp_server) => udp_server,
Err(e) => {
return Err(io::Error::new(io::ErrorKind::Other, format!("Failed to create UDP server: {}", e)))
}
};
let udp_server = UdpServer::new(event_bus.clone(), "0.0.0.0:5000").await;
let http_server = HttpServer::new(event_bus.clone(), store.clone());
let client_manager = ClientManager::new();
let http_server = match HttpServer::new("127.0.0.1:8080") {
Ok(http_server) => http_server,
Err(e) => {
return Err(io::Error::new(io::ErrorKind::Other, format!("Failed to create HTTP server: {}", e)))
}
};
let dispatcher = Dispatcher::new(event_bus.clone(), udp_server.clone(), client_manager.clone(), store.clone()).await;
Self {
event_bus,
dispatcher,
event_rx,
let context = Context {
udp_server,
http_server,
client_manager,
store
};
Ok(Self {
config,
context: Arc::new(context),
logger
})
}
pub async fn run(self) -> io::Result<()> {
self.logger.info("Starting application");
self.context.udp_server.run().await?;
self.context.http_server.run().await?;
Ok(())
}
}
pub async fn start(&mut self) {
for i in 0..4 {
let dispatcher = self.dispatcher.clone();
let event_rx = self.event_rx.clone();
tokio::spawn(async move {
dispatcher.start(event_rx).await;
});
}
let _ = self.udp_server.start().await;
let _ = self.http_server.start("0.0.0.0:5000").await;
let _ = self.tick_tasks().await;
println!("App started");
}
async fn tick_tasks(&self) {
let event_bus = self.event_bus.clone();
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(1));
loop {
// println!("Tick");
interval.tick().await;
let _ = event_bus.emit(Event::TickSeconds).await;
}
});
}
struct Context {
udp_server: UdpServer,
http_server: HttpServer
}

View File

@@ -1,28 +0,0 @@
use std::env;
use std::path::Path;
pub fn load_env() {
// display le répertoire de travail
match env::current_dir() {
Ok(path) => {
println!("Répertoire de travail: {}", path.display())
}
Err(e) => {
eprintln!("Erreur pour obtenir le répertoire: {}", e)
}
}
// Vérifier si le .env existe
let env_path = Path::new(".env");
if env_path.exists() {
println!(".env trouvé");
// Charger le .env
match dotenvy::from_path(env_path) {
Ok(_) => println!("✅ Fichier .env chargé avec succès"),
Err(e) => eprintln!("❌ Erreur lors du chargement du .env: {}", e),
}
} else {
println!("⚠️ Le fichier .env n'existe pas");
}
}

28
src/app/http/api/core.rs Normal file
View File

@@ -0,0 +1,28 @@
use axum::http::StatusCode;
use axum::{Json, Router};
use axum::routing::get;
use serde_json::json;
pub fn routes() -> Router {
Router::new()
.route("/", get(root))
.route("/health", get(health))
.route("/status", get(status))
}
async fn root() -> &'static str {
"OX Speak Server - HTTP API"
}
async fn health() -> StatusCode {
StatusCode::OK
}
async fn status() -> Json<serde_json::Value> {
Json(json!({
"status": "ok",
"service": "ox-speak-server",
"version": env!("CARGO_PKG_VERSION"),
"workers": tokio::runtime::Handle::current().metrics().num_workers()
}))
}

View File

@@ -0,0 +1,6 @@
use axum::Router;
pub fn routes() -> Router {
Router::new()
.nest("/message", Router::new())
}

3
src/app/http/api/mod.rs Normal file
View File

@@ -0,0 +1,3 @@
pub mod core;
pub mod message;
pub mod server;

3
src/app/http/mod.rs Normal file
View File

@@ -0,0 +1,3 @@
pub mod router;
pub mod api;

13
src/app/http/router.rs Normal file
View File

@@ -0,0 +1,13 @@
use axum::Router;
use crate::app::http::api;
use crate::app::http::api::core;
pub fn configure_routes() -> Router {
let api_router = Router::new();
Router::new()
.merge(core::routes())
.nest("/api", api_router )
}

View File

@@ -1,2 +1,2 @@
pub mod app;
pub mod conf;
pub mod http;

57
src/db/context.rs Normal file
View File

@@ -0,0 +1,57 @@
use std::sync::Arc;
use sqlx::SqlitePool;
use crate::db::{AttachmentRepository, CategoryRepository, ChannelRepository, ChannelUser, ChannelUserRepository, MessageRepository, ServerRepository, ServerUser, ServerUserRepository, UserRepository};
use crate::utils::logger::ContextLogger;
struct Repositories {
server_repository: ServerRepository,
category_repository: CategoryRepository,
channel_repository: ChannelRepository,
user_repository: UserRepository,
message_repository: MessageRepository,
attachment_repository: AttachmentRepository,
server_user: ServerUserRepository,
channel_user: ChannelUserRepository
}
pub struct DbContext {
pool: Arc<SqlitePool>,
repositories: Arc<Repositories>,
// logger
logger: ContextLogger,
}
impl DbContext {
pub async fn new(database_url: &str) -> Result<Self, sqlx::Error> {
let logger = ContextLogger::new("DB");
logger.info(&format!("Creating DB context on {}", database_url));
let pool = SqlitePool::connect(database_url).await?;
logger.info("DB context created");
let pool = Arc::new(pool);
let repositories = Arc::new(Repositories {
server_repository: ServerRepository::new(pool.clone()),
category_repository: CategoryRepository::new(pool.clone()),
channel_repository: ChannelRepository::new(pool.clone()),
user_repository: UserRepository::new(pool.clone()),
message_repository: MessageRepository::new(pool.clone()),
attachment_repository: AttachmentRepository::new(pool.clone()),
server_user: ServerUserRepository::new(pool.clone()),
channel_user: ChannelUserRepository::new(pool.clone())
});
Ok(Self {
pool,
repositories,
logger,
})
}
pub fn repositories(&self) -> Arc<Repositories> {
self.repositories.clone()
}
}

6
src/db/mod.rs Normal file
View File

@@ -0,0 +1,6 @@
pub mod models;
pub mod repositories;
mod context;
pub use models::*;
pub use repositories::*;

View File

@@ -0,0 +1,29 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use uuid::Uuid;
#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
pub struct ChannelUser {
pub id: Uuid,
pub channel_id: Uuid,
pub user_id: Uuid,
pub username: Option<String>,
pub joined_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub last_read_at: DateTime<Utc>,
}
impl ChannelUser {
pub fn new(channel_id: Uuid, user_id: Uuid) -> Self {
Self {
id: Uuid::now_v7(),
channel_id,
user_id,
username: None,
joined_at: Utc::now(),
updated_at: Utc::now(),
last_read_at: Utc::now(),
}
}
}

View File

@@ -0,0 +1,28 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use uuid::Uuid;
#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
pub struct ServerUser {
pub id: Uuid,
pub server_id: Uuid,
pub user_id: Uuid,
#[sqlx(default)]
pub username: Option<String>,
pub joined_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
impl ServerUser {
pub fn new(server_id: Uuid, user_id: Uuid) -> Self {
Self {
id: Uuid::now_v7(),
server_id,
user_id,
username: None,
joined_at: Utc::now(),
updated_at: Utc::now(),
}
}
}

View File

@@ -0,0 +1,27 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use uuid::Uuid;
#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
pub struct Attachment {
pub id: Uuid,
pub message_id: Uuid,
pub filename: String,
pub file_size: i64,
pub mime_type: String,
pub created_at: DateTime<Utc>,
}
impl Attachment {
pub fn new(message_id: Uuid, filename: String, file_size: i64, mime_type: String) -> Self {
Self {
id: Uuid::new_v4(),
message_id,
filename,
file_size,
mime_type,
created_at: Utc::now(),
}
}
}

25
src/db/models/category.rs Normal file
View File

@@ -0,0 +1,25 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use uuid::Uuid;
#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
pub struct Category {
pub id: Uuid,
pub server_id: Uuid,
pub name: String,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
impl Category {
pub fn new(server_id: Uuid, name: String) -> Self {
Self {
id: Uuid::now_v7(),
server_id,
name,
created_at: Utc::now(),
updated_at: Utc::now(),
}
}
}

56
src/db/models/channel.rs Normal file
View File

@@ -0,0 +1,56 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, sqlx::FromRow, Serialize, Deserialize)]
pub struct Channel {
pub id: Uuid, // Blob(16) sqlite
#[sqlx(default)]
pub server_id: Option<Uuid>,
#[sqlx(default)]
pub category_id: Option<Uuid>,
#[sqlx(default)]
pub position: i32,
#[sqlx(rename = "type")]
pub channel_type: ChannelType,
pub name: Option<String>, // Not necessary for DMs
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::Type)]
#[sqlx(type_name = "text")]
#[serde(rename_all = "snake_case")]
pub enum ChannelType {
Text,
Voice,
Dm
}
impl Channel {
pub fn new_server_channel(server_id: Uuid, name: String, channel_type: ChannelType) -> Self {
Self {
id: Uuid::now_v7(),
server_id: Some(server_id),
category_id: None,
position: 0,
channel_type,
name: Some(name),
created_at: Utc::now(),
updated_at: Utc::now()
}
}
pub fn new_dm_channel() -> Self {
Self {
id: Uuid::now_v7(),
server_id: None,
category_id: None,
channel_type: ChannelType::Dm,
name: None,
position: 0,
created_at: Utc::now(),
updated_at: Utc::now(),
}
}
}

41
src/db/models/message.rs Normal file
View File

@@ -0,0 +1,41 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use uuid::Uuid;
#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
pub struct Message {
pub id: Uuid,
pub channel_id: Uuid,
pub user_id: Uuid,
pub content: String,
pub created_at: DateTime<Utc>,
pub edited_at: DateTime<Utc>,
pub reply_to_id: Option<Uuid>,
}
impl Message {
pub fn new(channel_id: Uuid, user_id: Uuid, content: String) -> Self {
Self {
id: Uuid::now_v7(),
channel_id,
user_id,
content,
created_at: Utc::now(),
edited_at: Utc::now(),
reply_to_id: None,
}
}
pub fn with_reply(channel_id: Uuid, user_id: Uuid, content: String, reply_to_id: Uuid) -> Self {
Self {
id: Uuid::now_v7(),
channel_id,
user_id,
content,
created_at: Utc::now(),
edited_at: Utc::now(),
reply_to_id: Some(reply_to_id),
}
}
}

17
src/db/models/mod.rs Normal file
View File

@@ -0,0 +1,17 @@
mod user;
mod server;
mod _server_user;
mod channel;
mod category;
mod _channel_user;
mod message;
mod attachment;
pub use user::*;
pub use server::*;
pub use channel::*;
pub use category::*;
pub use message::*;
pub use attachment::*;
pub use _server_user::*;
pub use _channel_user::*;

25
src/db/models/server.rs Normal file
View File

@@ -0,0 +1,25 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use uuid::Uuid;
#[derive(Debug, Clone, PartialEq, FromRow, Serialize, Deserialize)]
pub struct Server {
pub id: Uuid, // Blob(16) sqlite
pub username: String,
pub password: Option<String>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
impl Server {
pub fn new(username: String, password: Option<String>) -> Self {
Self {
id: Uuid::now_v7(),
username,
password,
created_at: Utc::now(),
updated_at: Utc::now(),
}
}
}

25
src/db/models/user.rs Normal file
View File

@@ -0,0 +1,25 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use uuid::Uuid;
#[derive(Debug, Clone, PartialEq, FromRow, Serialize, Deserialize)]
pub struct User {
pub id: Uuid, // Blob(16) sqlite
pub username: String,
pub pub_key: String, // TEXT
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
impl User {
pub fn new(username: String, pub_key: String) -> Self {
Self {
id: Uuid::now_v7(),
username,
pub_key,
created_at: Utc::now(),
updated_at: Utc::now(),
}
}
}

View File

@@ -0,0 +1,14 @@
use std::sync::Arc;
use sqlx::SqlitePool;
pub struct ChannelUserRepository {
pool: Arc<SqlitePool>
}
impl ChannelUserRepository {
pub fn new(pool: Arc<SqlitePool>) -> Self {
Self {
pool
}
}
}

View File

@@ -0,0 +1,14 @@
use std::sync::Arc;
use sqlx::SqlitePool;
pub struct ServerUserRepository {
pool: Arc<SqlitePool>
}
impl ServerUserRepository {
pub fn new(pool: Arc<SqlitePool>) -> Self {
Self {
pool
}
}
}

View File

@@ -0,0 +1,14 @@
use std::sync::Arc;
use sqlx::SqlitePool;
pub struct AttachmentRepository {
pool: Arc<SqlitePool>
}
impl AttachmentRepository {
pub fn new(pool: Arc<SqlitePool>) -> Self {
Self {
pool
}
}
}

View File

@@ -0,0 +1,14 @@
use std::sync::Arc;
use sqlx::SqlitePool;
pub struct CategoryRepository{
pool: Arc<SqlitePool>
}
impl CategoryRepository {
pub fn new(pool: Arc<SqlitePool>) -> Self {
Self {
pool
}
}
}

View File

@@ -0,0 +1,15 @@
use std::sync::Arc;
use sqlx::SqlitePool;
pub struct ChannelRepository {
pool: Arc<SqlitePool>
}
impl ChannelRepository {
pub fn new(pool: Arc<SqlitePool>) -> Self {
Self {
pool
}
}
}

View File

@@ -0,0 +1,15 @@
use std::sync::Arc;
use sqlx::SqlitePool;
pub struct MessageRepository{
pool: Arc<SqlitePool>
}
impl MessageRepository {
pub fn new(pool: Arc<SqlitePool>) -> Self {
Self {
pool
}
}
}

View File

@@ -0,0 +1,17 @@
mod _channel_user_repository;
mod _server_user_repository;
mod user_repository;
mod server_repository;
mod channel_repository;
mod category_repository;
mod message_repository;
mod attachment_repository;
pub use user_repository::*;
pub use server_repository::*;
pub use channel_repository::*;
pub use category_repository::*;
pub use message_repository::*;
pub use attachment_repository::*;
pub use _server_user_repository::*;
pub use _channel_user_repository::*;

View File

@@ -0,0 +1,15 @@
use std::sync::Arc;
use sqlx::SqlitePool;
pub struct ServerRepository{
pool: Arc<SqlitePool>
}
impl ServerRepository {
pub fn new(pool: Arc<SqlitePool>) -> Self {
Self{
pool
}
}
}

View File

@@ -0,0 +1,14 @@
use std::sync::Arc;
use sqlx::{SqlitePool};
pub struct UserRepository {
pool: Arc<SqlitePool>
}
impl UserRepository {
pub fn new(pool: Arc<SqlitePool>) -> Self {
Self {
pool
}
}
}

View File

@@ -1,264 +0,0 @@
//! Gestion des clients pour les connexions UDP
//!
//! Ce module fournit les structures et méthodes pour gérer les clients
//! connectés au serveur UDP, incluant leur tracking et leurs modifications.
use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::time::Instant;
use uuid::Uuid;
use std::hash::{Hash, Hasher};
use std::time::Duration;
use crossbeam_utils::atomic::AtomicCell;
use crate::utils::shared_store::SharedArcMap;
/// Représente un client connecté au serveur UDP
///
/// Chaque client est identifié par un UUID unique et contient
/// son adresse réseau ainsi que l'heure de sa dernière activité.
/// Le `last_seen` utilise AtomicCell pour des mises à jour lock-free.
#[derive(Debug)]
pub struct Client {
id: Uuid,
address: SocketAddr,
last_seen: AtomicCell<Instant>,
}
/// Gestionnaire threadsafe pour les clients connectés
///
/// Utilise `SharedArcMap` pour permettre un accès concurrent sécurisé
/// aux clients depuis plusieurs threads avec des performances optimisées
/// pour les lectures fréquentes. Les clients sont stockés dans des Arc
/// pour éviter les clones coûteux.
#[derive(Clone)]
pub struct ClientManager {
udp_clients: SharedArcMap<SocketAddr, Client>,
}
impl Client {
/// Crée un nouveau client avec un UUID généré automatiquement
pub fn new(address: SocketAddr) -> Self {
let id = Uuid::new_v4();
Self {
id,
address,
last_seen: AtomicCell::new(Instant::now()),
}
}
/// Retourne le UUID unique du client
pub fn id(&self) -> Uuid {
self.id
}
/// Retourne l'adresse socket du client
pub fn address(&self) -> SocketAddr {
self.address
}
/// Retourne l'instant de la dernière activité du client
/// Accès lock-free grâce à AtomicCell
pub fn last_seen(&self) -> Instant {
self.last_seen.load()
}
/// Met à jour l'heure de dernière activité du client à maintenant
/// Opération lock-free grâce à AtomicCell
pub fn update_last_seen(&self) {
self.last_seen.store(Instant::now());
}
}
impl Hash for Client {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}
impl PartialEq for Client {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl Eq for Client {}
impl ClientManager {
/// Crée un nouveau gestionnaire de clients vide
pub fn new() -> Self {
Self {
udp_clients: SharedArcMap::new(),
}
}
/// Crée un nouveau gestionnaire de clients avec une capacité initiale
pub fn with_capacity(capacity: usize) -> Self {
Self {
udp_clients: SharedArcMap::with_capacity(capacity),
}
}
/// Ajoute un client au gestionnaire
/// Retourne l'ancien client Arc s'il existait déjà
pub fn add(&self, client: Client) -> Option<Arc<Client>> {
self.udp_clients.insert(client.address(), client)
}
/// Supprime un client du gestionnaire par son adresse
/// Retourne l'Arc du client supprimé
pub fn remove(&self, address: SocketAddr) -> Option<Arc<Client>> {
self.udp_clients.remove(&address)
}
/// Supprime un client du gestionnaire par son instance
/// Retourne l'Arc du client supprimé
pub fn remove_client(&self, client: &Client) -> Option<Arc<Client>> {
self.udp_clients.remove(&client.address())
}
/// Vérifie si un client existe pour une adresse donnée
pub fn client_exists(&self, address: SocketAddr) -> bool {
self.udp_clients.contains_key(&address)
}
/// Récupère un Arc vers le client par son adresse
/// Très efficace - pas de clone du Client
pub fn get_client_by_address(&self, address: SocketAddr) -> Option<Arc<Client>> {
self.udp_clients.get(&address)
}
pub fn get_uuid_by_address(&self, address: SocketAddr) -> Option<Uuid> {
self.udp_clients.get(&address).map(|client| client.id)
}
/// Récupère toutes les adresses des clients connectés
pub fn get_all_addresses(&self) -> HashSet<SocketAddr> {
self.udp_clients.keys().collect()
}
/// Retourne le nombre de clients connectés
pub fn len(&self) -> usize {
self.udp_clients.len()
}
/// Vérifie si le gestionnaire est vide
pub fn is_empty(&self) -> bool {
self.udp_clients.is_empty()
}
/// Met à jour l'heure de dernière activité d'un client
/// Utilise la méthode modify de SharedArcMap pour une opération lock-free
pub fn update_client_last_seen(&self, address: SocketAddr) -> bool {
self.udp_clients.modify(&address, |client| {
client.update_last_seen();
})
}
/// Supprime les clients trop vieux
/// Utilise l'accès lock-free pour identifier les clients à supprimer
pub fn cleanup(&self, max_age: Duration) {
let now = Instant::now();
let addresses_to_remove: Vec<SocketAddr> = self
.udp_clients
.read()
.iter()
.filter(|(_, client)| now - client.last_seen() >= max_age)
.map(|(addr, _)| *addr)
.collect();
for address in addresses_to_remove {
self.udp_clients.remove(&address);
}
}
/// Modifie un client via une closure
/// Utilise la méthode modify optimisée de SharedArcMap
///
/// # Arguments
/// * `address` - L'adresse du client à modifier
/// * `f` - La closure qui recevra une référence vers le client
///
/// # Returns
/// `true` si le client a été trouvé et modifié, `false` sinon
///
/// # Examples
/// ```ignore
/// let client_manager = ClientManager::new();
/// let addr = "127.0.0.1:8080".parse().unwrap();
///
/// // Mise à jour de last_seen (lock-free)
/// client_manager.modify_client(addr, |client| {
/// client.update_last_seen();
/// });
///
/// // Accès aux propriétés du client
/// let found = client_manager.modify_client(addr, |client| {
/// println!("Client ID: {}", client.id());
/// println!("Dernière activité: {:?}", client.last_seen());
/// });
/// ```
pub fn modify_client<F>(&self, address: SocketAddr, f: F) -> bool
where
F: FnOnce(&Client),
{
self.udp_clients.modify(&address, f)
}
/// Obtient une référence en lecture seule vers tous les clients
/// Accès lock-free ultra-rapide
pub fn read_all(&self) -> Arc<std::collections::HashMap<SocketAddr, Arc<Client>>> {
self.udp_clients.read()
}
/// Vide tous les clients
pub fn clear(&self) {
self.udp_clients.clear();
}
/// Itère sur tous les clients avec leurs Arc
/// Très efficace - pas de clone des clients
pub fn iter(&self) -> impl Iterator<Item = (SocketAddr, Arc<Client>)> {
self.udp_clients.iter()
}
/// Itère sur toutes les adresses des clients
pub fn addresses(&self) -> impl Iterator<Item = SocketAddr> {
self.udp_clients.keys()
}
/// Itère sur tous les Arc des clients
pub fn clients(&self) -> impl Iterator<Item = Arc<Client>> {
self.udp_clients.values()
}
/// Compte le nombre de clients actifs dans une durée donnée
/// Utilise l'accès lock-free pour une performance optimale
pub fn count_active_clients(&self, max_age: Duration) -> usize {
let now = Instant::now();
self.udp_clients
.read()
.iter()
.filter(|(_, client)| now - client.last_seen() < max_age)
.count()
}
/// Obtient les adresses des clients actifs
/// Très efficace grâce à l'accès lock-free
pub fn get_active_addresses(&self, max_age: Duration) -> Vec<SocketAddr> {
let now = Instant::now();
self.udp_clients
.read()
.iter()
.filter(|(_, client)| now - client.last_seen() < max_age)
.map(|(addr, _)| *addr)
.collect()
}
}
impl Default for ClientManager {
fn default() -> Self {
Self::new()
}
}

View File

@@ -1,38 +0,0 @@
use crate::network::protocol::{UDPMessage};
#[derive(Clone, Debug)]
pub enum Event {
AppStarted,
AppStopped,
UdpStarted,
UdpStopped,
UdpIn(UDPMessage),
UdpOut(UDPMessage),
TickSeconds
}
#[derive(Clone)]
pub struct EventBus {
pub sender: kanal::AsyncSender<Event>,
}
impl EventBus {
pub fn new() -> (Self, kanal::AsyncReceiver<Event>) {
let (sender, receiver) = kanal::bounded_async::<Event>(4096);
(Self { sender }, receiver)
}
pub async fn emit(&self, event: Event) {
let _ = self.sender.send(event).await;
}
pub fn emit_sync(&self, event: Event) {
let _ = self.sender.try_send(event);
}
pub fn clone_sender(&self) -> kanal::AsyncSender<Event> {
self.sender.clone()
}
}

View File

@@ -1,4 +0,0 @@
pub mod event;
pub mod user;
pub mod client;
pub mod models;

View File

@@ -1,65 +0,0 @@
use std::sync::Arc;
use parking_lot::Mutex;
use uuid::Uuid;
use chrono::{DateTime, Utc};
/////////////////////////////////////
//////////// Main models ///////////
////////////////////////////////////
struct SubServer {
// Un serveur (master) peu avoir plusieurs subserver
// (le master, pas besoin de le représenter, sa configuration sera depuis un .env)
id: Uuid,
name: String,
password: String, // voir si on le hash, mais sera certainement pas nécessaire.
created_at: DateTime<Utc>,
}
struct Channel {
id: Uuid,
channel_type: ChannelType, // le type ne pourra pas être edit, comme discord
sub_server_id: Uuid, // fk subserver
name: String,
created_at: DateTime<Utc>,
}
struct User {
id: Uuid,
name: String, // je sais pas si il est nécessaire étant donné qu'on utilisera display_name de la relation.
pub_key: String, // l'identification se fera par clé public/privé, comme teamspeak
}
struct Message {
id: Uuid,
channel_id: Uuid,
user_id: Uuid,
content: String,
created_at: DateTime<Utc>,
}
/////////////////////////////////////
////////// n-n relations ///////////
////////////////////////////////////
struct SubServerUser {
sub_server: SubServer,
user: User,
display_name: String,
}
/////////////////////////////////////
//////// Enum Type (choice) ////////
////////////////////////////////////
enum ChannelType {
Text,
Voice,
}
/// Store centralisé pour tous les modèles de données
#[derive(Clone)]
pub struct DataStore {
sub_servers: Arc<Mutex<SubServer>>
}

View File

@@ -1,47 +0,0 @@
use std::net::SocketAddr;
use std::sync::Arc;
use dashmap::DashMap;
use uuid::Uuid;
pub struct User {
id: Uuid,
udp_addr: Option<SocketAddr>,
}
#[derive(Clone)]
pub struct UserManager {
users: Arc<DashMap<Uuid, User>>,
}
impl User {
pub fn new(id: Uuid) -> Self {
Self {
id,
udp_addr: None,
}
}
pub fn default() -> Self {
Self::new(Uuid::new_v4())
}
pub fn set_udp_addr(&mut self, udp_addr: SocketAddr) {
self.udp_addr = Some(udp_addr);
}
}
impl UserManager {
pub fn new() -> Self {
Self {
users: Arc::new(DashMap::new())
}
}
pub fn add_user(&self, user: User) {
self.users.insert(user.id, user);
}
pub fn delete_user(&self, user: User) {
self.users.remove(&user.id);
}
}

View File

@@ -1,7 +1,5 @@
pub mod app;
pub mod core;
pub mod domain;
pub mod network;
pub mod runtime;
pub mod utils;
pub mod store;
pub mod network;
pub mod app;
pub mod db;

View File

@@ -1,14 +1,29 @@
use std::sync::Arc;
use parking_lot::RwLock;
use tokio::signal;
use ox_speak_server_lib::utils::config::Config;
use ox_speak_server_lib::app::app::App;
use ox_speak_server_lib::app::conf::load_env;
use ox_speak_server_lib::utils::logger::init_logger;
#[tokio::main]
async fn main() {
// Charger le .env
load_env();
let mut app = App::new().await;
app.start().await;
init_logger("debug");
// Charger le .env
let config = match Config::load() {
Ok(config) => config,
Err(err) => {
eprintln!("Failed to load configuration: {}", err);
return;
}
};
// Initialiser le logger
// init_logger(&config.log_level);
let app = App::new(config).await;
// app.start().await;
// Attendre le signal Ctrl+C
match signal::ctrl_c().await {
@@ -19,5 +34,4 @@ async fn main() {
eprintln!("Erreur lors de l'écoute du signal: {}", err);
}
}
}

View File

@@ -1,89 +0,0 @@
use std::net::SocketAddr;
use std::sync::Arc;
use axum::{extract::{ws::WebSocket, ws::WebSocketUpgrade, State}, response::Response, Json, Router, routing::{get, post}, middleware};
use axum::body::Body;
use axum::http::{HeaderValue, Request, StatusCode};
use axum::middleware::Next;
use tokio::net::TcpListener;
use crate::domain::event::EventBus;
use crate::network::http_routes::master;
use crate::store::store_service::StoreService;
#[derive(Clone)]
pub struct HttpState {
pub event_bus: EventBus,
pub store: StoreService
}
#[derive(Clone)]
pub struct HttpServer {
state: HttpState
}
impl HttpServer {
pub fn new(event_bus: EventBus, store: StoreService) -> Self {
Self {
state: HttpState {
event_bus,
store
},
}
}
pub async fn start(&self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
let router = self.create_router();
let listener = TcpListener::bind(addr).await?;
println!("HTTP/WebSocket server listening on addr {}", listener.local_addr()?);
tokio::spawn(async move {
let _ = axum::serve(listener, router).await;
});
Ok(())
}
fn create_router(&self) -> Router {
let api_route = Router::new()
.nest("/master", master::create_router())
.layer(middleware::from_fn(app_only_middleware));
Router::new()
.nest("/api", api_route)
// .route("/ws", get(Self::ws))
// .route("/stats/", get(Self::stats))
.with_state(self.state.clone())
}
}
/// Routes
impl HttpServer {
async fn stats(State(state): State<Arc<HttpState>>) -> Json<String> {
todo!("a faire")
}
async fn ws(ws: WebSocketUpgrade, State(state): State<Arc<HttpState>>) -> Response {
ws.on_upgrade(|socket| async move {
todo!("a faire")
})
}
}
// Middlewares
async fn app_only_middleware(request: Request<Body>, next: Next) -> Result<Response, StatusCode> {
let headers = request.headers();
let expected_client = HeaderValue::from_static("ox_speak");
let expected_version = HeaderValue::from_static("1.0");
let expected_user_agent = HeaderValue::from_static("OxSpeak/1.0");
if headers.get("X-Client-Type") != Some(&expected_client) ||
headers.get("X-Protocol-Version") != Some(&expected_version) ||
headers.get("User-Agent") != Some(&expected_user_agent) {
return Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.body("Invalid client".into())
.unwrap());
}
Ok(next.run(request).await)
}

1
src/network/http/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod server;

View File

@@ -0,0 +1,32 @@
use std::io;
use std::net::SocketAddr;
use axum::{Router};
use tokio::net::TcpListener;
use crate::utils::logger::ContextLogger;
pub struct HttpServer {
bind_addr: SocketAddr,
logger: ContextLogger,
}
impl HttpServer {
pub fn new(bind_addr: &str) -> io::Result<Self> {
let bind_addr = bind_addr.parse::<SocketAddr>()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
let logger = ContextLogger::new("HTTP");
logger.info(&format!("Creating HTTP server on {}", bind_addr));
Ok(Self { bind_addr, logger })
}
pub async fn run(self, router: Router) -> io::Result<()> {
self.logger.info(&format!("Starting HTTP server on {}", self.bind_addr));
let listener = TcpListener::bind(self.bind_addr).await?;
axum::serve(listener, router)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
}

View File

@@ -1,8 +0,0 @@
use axum::Router;
use axum::routing::post;
use crate::network::http::HttpState;
pub fn create_router() -> Router<HttpState> {
Router::new()
// .route("/auth/", post(join_master_server))
}

View File

@@ -1,46 +0,0 @@
use std::collections::{HashMap};
use std::hash::Hash;
use axum::{
extract::{Path, State},
response::Json,
routing::{get, post, put, delete},
Router,
};
use crate::network::http::HttpState;
pub fn create_router() -> Router<HttpState> {
Router::new()
.route("/auth/", post(join))
}
pub async fn challenge(State(state): State<HttpState>) -> Json<HashMap<String, String>> {
// permet de sécuriser les échanges entre le client et le serveur pour qu'elle soit propre à l'application
// l'idée est d'éviter que les système comme les navigateurs, curl ... puisse accéder à l'application
todo!("challenge master server")
}
pub async fn join(State(state): State<HttpState>) -> Json<HashMap<String, String>> {
// Cette page est systématiquement appelé à la première connexion du client
// dans le payload de la requête, on aura les info client
// public key
// dans la réponse, on aura le status
// success, error, password_needed
// match determine_server_state().await {
// ServerState::FirstSetup => {
// // Logique de création du super admin
// // Validation des champs requis (password, username)
// }
// ServerState::RequiresPassword => {
// // Logique d'authentification
// // Validation du mot de passe serveur
// }
// ServerState::Ready => {
// // Logique de connexion normale
// // Juste la public key suffit
// }
// }
todo!("join master server")
}

View File

@@ -1,8 +0,0 @@
use axum::Router;
use axum::routing::post;
use crate::network::http::HttpState;
pub fn create_router() -> Router<HttpState> {
Router::new()
// .route("/auth/", post(join_master_server))
}

View File

@@ -1,6 +0,0 @@
pub mod user;
pub mod channel;
pub mod message;
pub mod websocket;
pub mod master;
mod sub_server;

View File

@@ -1,9 +0,0 @@
use axum::Router;
use axum::routing::post;
use crate::network::http::HttpState;
pub fn create_router() -> Router<HttpState> {
Router::new()
// .route("/auth/", post(join_master_server))
}

View File

@@ -1,8 +0,0 @@
use axum::Router;
use axum::routing::post;
use crate::network::http::HttpState;
pub fn create_router() -> Router<HttpState> {
Router::new()
// .route("/auth/", post(join_master_server))
}

View File

@@ -1,8 +0,0 @@
use axum::Router;
use axum::routing::post;
use crate::network::http::HttpState;
pub fn create_router() -> Router<HttpState> {
Router::new()
// .route("/auth/", post(join_master_server))
}

View File

@@ -1,4 +1,2 @@
pub mod protocol;
pub mod udp;
pub mod http;
pub mod http_routes;

View File

@@ -1,246 +0,0 @@
use std::collections::HashSet;
use bytes::{Bytes, BytesMut, Buf, BufMut};
use std::net::SocketAddr;
use uuid::Uuid;
use strum::{EnumIter, FromRepr};
#[repr(u8)]
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, EnumIter, FromRepr)]
pub enum UDPMessageType {
Ping = 0,
Audio = 1,
// Futurs types ici...
}
#[derive(Debug, Clone, PartialEq)]
pub enum UDPMessageData {
// Client messages - Zero-copy avec Bytes
ClientPing { message_id: Uuid },
ClientAudio { sequence: u16, data: Bytes },
// Server messages
ServerPing { message_id: Uuid },
ServerAudio { user: Uuid, sequence: u16, data: Bytes },
}
#[derive(Debug, Clone, PartialEq)]
pub struct UDPMessage {
pub data: UDPMessageData,
pub address: SocketAddr,
pub size: usize,
}
#[derive(Debug, Clone, PartialEq)]
pub struct UdpBroadcastMessage {
pub data: UDPMessageData,
pub addresses: HashSet<SocketAddr>, // ou Vec<SocketAddr> selon besoins
pub size: usize,
}
#[derive(Debug, Clone, PartialEq)]
pub enum ParseError {
EmptyData,
InvalidData,
InvalidMessageType,
InvalidUuid,
}
impl From<uuid::Error> for ParseError {
fn from(_: uuid::Error) -> Self {
ParseError::InvalidUuid
}
}
impl UDPMessageData {
// Parsing zero-copy depuis Bytes
pub fn from_client_bytes(mut data: Bytes) -> Result<Self, ParseError> {
if data.is_empty() {
return Err(ParseError::EmptyData);
}
let msg_type = data.get_u8(); // Consomme 1 byte
match msg_type {
0 => { // Ping
if data.remaining() < 16 {
return Err(ParseError::InvalidData);
}
let uuid_bytes = data.split_to(16); // Zero-copy split
let message_id = Uuid::from_slice(&uuid_bytes)?;
Ok(Self::ClientPing { message_id })
}
1 => { // Audio
if data.remaining() < 2 {
return Err(ParseError::InvalidData);
}
let sequence = data.get_u16(); // Big-endian par défaut
let audio_data = data; // Le reste pour l'audio
Ok(Self::ClientAudio { sequence, data: audio_data })
}
_ => Err(ParseError::InvalidMessageType),
}
}
// Constructeurs server
pub fn server_ping(message_id: Uuid) -> Self {
Self::ServerPing { message_id }
}
pub fn server_audio(user: Uuid, sequence: u16, data: Bytes) -> Self {
Self::ServerAudio { user, sequence, data }
}
// Sérialisation optimisée avec BytesMut
pub fn to_bytes(&self) -> Bytes {
match self {
Self::ServerPing { message_id } => {
let mut buf = BytesMut::with_capacity(17);
buf.put_u8(0); // Message type
buf.put_slice(message_id.as_bytes());
buf.freeze()
}
Self::ServerAudio { user, sequence, data } => {
let mut buf = BytesMut::with_capacity(19 + data.len());
buf.put_u8(1); // Message type
buf.put_slice(user.as_bytes());
buf.put_u16(*sequence);
buf.put_slice(data);
buf.freeze()
}
_ => panic!("Client messages cannot be serialized"),
}
}
pub fn to_vec(&self) -> Vec<u8> {
// pas très optimisé
self.to_bytes().to_vec()
}
pub fn message_type(&self) -> UDPMessageType {
match self {
Self::ClientPing { .. } | Self::ServerPing { .. } => UDPMessageType::Ping,
Self::ClientAudio { .. } | Self::ServerAudio { .. } => UDPMessageType::Audio,
}
}
// Calcule la taille du message sérialisé
pub fn size(&self) -> usize {
match self {
Self::ClientPing { .. } | Self::ServerPing { .. } => 17, // 1 + 16 (UUID)
Self::ClientAudio { data, .. } => 3 + data.len(), // 1 + 2 + audio_data
Self::ServerAudio { data, .. } => 19 + data.len(), // 1 + 16 + 2 + audio_data
}
}
}
impl UDPMessage {
// Parsing depuis slice -> Bytes (zero-copy si possible)
pub fn from_client_bytes(address: SocketAddr, data: &[u8]) -> Result<Self, ParseError> {
let original_size = data.len();
let bytes = Bytes::copy_from_slice(data); // Seule allocation
let data = UDPMessageData::from_client_bytes(bytes)?;
Ok(Self {
data,
address,
size: original_size
})
}
// Constructeurs server
pub fn server_ping(address: SocketAddr, message_id: Uuid) -> Self {
let data = UDPMessageData::server_ping(message_id);
let size = data.size();
Self { data, address, size }
}
pub fn server_audio(address: SocketAddr, user: Uuid, sequence: u16, data: Bytes) -> Self {
let msg_data = UDPMessageData::server_audio(user, sequence, data);
let size = msg_data.size();
Self { data: msg_data, address, size }
}
// Helpers
pub fn to_bytes(&self) -> Bytes {
self.data.to_bytes()
}
pub fn to_vec(&self) -> Vec<u8> {
self.data.to_vec()
}
pub fn message_type(&self) -> UDPMessageType {
self.data.message_type()
}
// Getters
pub fn size(&self) -> usize {
self.size
}
pub fn address(&self) -> SocketAddr {
self.address
}
}
impl UDPMessage {
// Helpers pour récupérer certain éléments des messages
pub fn get_message_id(&self) -> Option<Uuid> {
match &self.data {
UDPMessageData::ClientPing { message_id } => Some(*message_id),
UDPMessageData::ServerPing { message_id } => Some(*message_id),
_ => None,
}
}
}
// Helper pour compatibilité avec UDPMessageType
impl UDPMessageType {
pub fn from_message(data: &[u8]) -> Option<Self> {
if data.is_empty() {
return None;
}
Self::from_repr(data[0])
}
}
impl UdpBroadcastMessage {
// Constructeurs server
pub fn server_ping(addresses: HashSet<SocketAddr>, message_id: Uuid) -> Self {
let data = UDPMessageData::server_ping(message_id);
let size = data.size();
Self { data, addresses, size }
}
pub fn server_audio(addresses: HashSet<SocketAddr>, user: Uuid, sequence: u16, data: Bytes) -> Self {
let msg_data = UDPMessageData::server_audio(user, sequence, data);
let size = msg_data.size();
Self { data: msg_data, addresses, size }
}
// Conversion vers messages individuels (pour compatibilité)
pub fn to_individual_messages(&self) -> Vec<UDPMessage> {
self.addresses.iter().map(|&addr| {
UDPMessage {
data: self.data.clone(),
address: addr,
size: self.size,
}
}).collect()
}
// Helpers
pub fn to_bytes(&self) -> Bytes {
self.data.to_bytes()
}
pub fn addresses(&self) -> &HashSet<SocketAddr> {
&self.addresses
}
pub fn size(&self) -> usize {
self.size
}
}

View File

@@ -1,103 +0,0 @@
use tokio::net::UdpSocket;
use std::error::Error;
use std::sync::Arc;
use tokio::task::AbortHandle;
use crate::domain::event::{Event, EventBus};
use crate::network::protocol::{UDPMessage, UdpBroadcastMessage};
#[derive(Clone)]
pub struct UdpServer {
event_bus: EventBus,
socket: Arc<UdpSocket>,
abort_handle: Option<AbortHandle>,
}
impl UdpServer {
pub async fn new(event_bus: EventBus, addr: &str) -> Self {
let socket = UdpSocket::bind(addr).await.unwrap();
let addr = socket.local_addr().unwrap();
println!("Socket UDP lié avec succès on {}", addr);
Self {
event_bus,
socket: Arc::new(socket),
abort_handle: None
}
}
pub async fn start(&mut self) -> Result<(), Box<dyn Error>> {
println!("Démarrage du serveur UDP...");
let event_bus = self.event_bus.clone();
let socket = self.socket.clone();
let recv_task = tokio::spawn(async move {
// Buffer réutilisable pour éviter les allocations
let mut buf = vec![0u8; 1500];
loop {
match socket.recv_from(&mut buf).await {
Ok((size, address)) => {
// Slice du buffer pour éviter de copier des données inutiles
if let Ok(message) = UDPMessage::from_client_bytes(address, &buf[..size]) {
event_bus.emit(Event::UdpIn(message)).await;
}
// Sinon, on ignore silencieusement le message malformé
}
Err(e) => {
match e.kind() {
std::io::ErrorKind::ConnectionReset |
std::io::ErrorKind::ConnectionAborted => {
// Silencieux pour les déconnexions normales
continue;
}
_ => {
println!("Erreur UDP: {}", e);
continue;
}
}
}
}
}
});
self.abort_handle = Some(recv_task.abort_handle());
Ok(())
}
pub async fn send_udp_message(&self, message: &UDPMessage) -> bool {
match self.socket.send_to(&message.to_bytes(), message.address()).await {
Ok(_size) => {
self.event_bus.emit(Event::UdpOut(message.clone())).await;
true
}
Err(e) => {
println!("Erreur lors de l'envoi du message à {}: {}", message.address(), e);
false
}
}
}
pub async fn broadcast_udp_message(&self, message: &UdpBroadcastMessage) -> bool {
let bytes = message.to_bytes();
for &address in message.addresses() {
match self.socket.send_to(&bytes, address).await {
Ok(_) => {
// Emit individual event pour tracking
let individual_msg = UDPMessage {
data: message.data.clone(),
address,
size: message.size,
};
self.event_bus.emit(Event::UdpOut(individual_msg)).await;
}
Err(e) => {
println!("Erreur broadcast vers {}: {}", address, e);
}
}
}
true
}
}

1
src/network/udp/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod server;

160
src/network/udp/server.rs Normal file
View File

@@ -0,0 +1,160 @@
/*
Utilisation de UDP pour la communication entre le client et le serveur.
L'idée est d'utilise SO_REUSEPORT et de router les paquets aux clients dans le channel correspondant.
Il est donc important de garder une table de routage entre les channels et les sockets, comme ça le serveur sait à qui re-router le paquet.
Dans l'ancienne version on utilisait un mpsc, mais celà casse l'intérêt du SO_REUSEPORT.
Avec cette logique, on évite de transporter le paquet de thread en thread et potentiellement de devoir faire des copies.
*/
use std::io;
use std::net::{SocketAddr};
use tokio::net::UdpSocket;
use tokio::runtime::Handle;
use tokio::task;
use crate::utils::shared_store::{SharedArcMap, SharedArcVec};
use crate::utils::toolbox::number_of_cpus;
use crate::utils::logger::ContextLogger;
use crate::{log_info, log_warn, log_error, log_debug};
pub struct UdpServer {
// table de routage channel -> socket
table_router: SharedArcMap<String, SharedArcVec<SocketAddr>>,
// binds
bind_addr: SocketAddr,
workers: usize,
buf_size: usize,
// logger
logger: ContextLogger,
}
impl UdpServer {
pub fn new(bind_addr: &str) -> io::Result<Self> {
// convert bind_addr
let bind_addr = match bind_addr.parse::<SocketAddr>() {
Ok(addr) => addr,
Err(e) => {
return Err(io::Error::new(io::ErrorKind::InvalidInput, format!("Invalid bind address '{}': {}", bind_addr, e)))
},
};
let logger = ContextLogger::new("UDP");
logger.info(&format!("Creating UDP server on {}", bind_addr));
Ok(Self {
table_router: SharedArcMap::new(),
bind_addr,
workers: number_of_cpus(),
buf_size: 1500,
logger,
})
}
pub async fn run(self) -> io::Result<()> {
// S'assure qu'on est sur une runtime (utile si appelé hors #[tokio::main])
let _ = Handle::try_current().map_err(|e| {
self.logger.error(&format!("Runtime Tokio not available: {}", e));
io::Error::new(io::ErrorKind::Other, e)
})?;
#[cfg(unix)]
{
self.run_unix().await
}
#[cfg(windows)]
{
self.run_windows().await
}
#[cfg(not(any(unix, windows)))]
{
log_error!(self.logger, "OS not supported");
Err(io::Error::new(io::ErrorKind::Other, "OS non supporté"))
}
}
// -------------------------
// Impl Linux/macOS (SO_REUSEPORT)
// -------------------------
#[cfg(unix)]
async fn run_unix(self) -> io::Result<()> {
use socket2::{Domain, Protocol, Socket, Type};
self.logger.info(&format!("Starting UDP server on {} with {} workers - Unix mode", self.bind_addr, self.workers));
let mut handles = Vec::with_capacity(self.workers);
for id in 0..self.workers {
let logger = self.logger.with_sub_context(&format!("WORKER_{}", id));
let bind_addr = self.bind_addr;
// Create a socket UDP with SO_REUSEPORT
let domain = match bind_addr {
SocketAddr::V4(_) => Domain::IPV4,
SocketAddr::V6(_) => Domain::IPV6,
};
let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
sock.set_reuse_address(true)?;
sock.set_reuse_port(true)?;
sock.bind(&bind_addr.into())?;
let std_sock = std::net::UdpSocket::from(sock);
std_sock.set_nonblocking(true)?;
let udp = UdpSocket::from_std(std_sock)?;
let buf_size = self.buf_size;
let h = task::spawn(async move {
if let Err(e) = Self::worker_loop(id, udp, buf_size, logger).await {
eprintln!("[worker {id}] erreur: {e}");
}
});
handles.push(h);
}
for h in handles {
let _ = h.await;
}
Ok(())
}
// -------------------------
// Impl Windows (1 socket partagé, N tâches concurrentes)
// -------------------------
#[cfg(windows)]
async fn run_windows(self) -> io::Result<()> {
self.logger.info(&format!("Starting UDP server on {} with {} workers - Win mode", self.bind_addr, self.workers));
let udp = UdpSocket::bind(self.bind_addr).await?;
let udp = Arc::new(udp);
let mut handles = Vec::with_capacity(self.workers);
for id in 0..self.workers {
let logger = self.logger.with_sub_context(&format!("WORKER_{}", id));
let sock = udp.clone();
let buf_size = self.buf_size;
let h = task::spawn(async move {
if let Err(e) = worker_loop(id, (*sock).clone(), buf_size, logger).await {
eprintln!("[worker {id}] erreur: {e}");
}
});
handles.push(h);
}
for h in handles {
let _ = h.await;
}
Ok(())
}
pub async fn worker_loop(id: usize, socket: UdpSocket, buf_size: usize, logger: ContextLogger) -> io::Result<()> {
let mut buf = vec![0u8; buf_size];
logger.info("listening");
loop {
let (n, peer) = socket.recv_from(&mut buf).await?;
// Traitement: ici, simple echo
// Remplacez par votre logique (routage canal -> client, etc.)
eprintln!("[worker {id}] reçu {n}o de {peer}");
socket.send_to(&buf[..n], &peer).await?;
}
}
}

View File

@@ -1,193 +0,0 @@
use tokio::net::UdpSocket;
use tokio::sync::RwLock;
use std::error::Error;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use dashmap::DashMap;
use tokio::task::AbortHandle;
use tokio::time::{sleep, Instant};
use crate::domain::client::Client;
use crate::domain::event::{Event, EventBus};
use crate::network::protocol::{UdpClientMessage, UdpServerMessage};
#[derive(Clone)]
pub struct UdpServer {
event_bus: EventBus,
socket: Arc<UdpSocket>,
abort_handle: Option<AbortHandle>,
clients: Arc<DashMap<SocketAddr, Client>>,
}
impl UdpServer {
pub async fn new(event_bus: EventBus, addr: &str) -> Self {
let socket = UdpSocket::bind(addr).await.unwrap();
let addr = socket.local_addr().unwrap();
println!("Socket UDP lié avec succès on {}", addr);
Self {
event_bus,
socket: Arc::new(socket),
abort_handle: None,
clients: Arc::new(DashMap::new()),
}
}
pub async fn start(&mut self) -> Result<(), Box<dyn Error>> {
println!("Démarrage du serveur UDP...");
let event_bus = self.event_bus.clone();
let socket = self.socket.clone();
let clients = self.clients.clone();
let recv_task = tokio::spawn(async move {
let mut buf = [0u8; 1500];
loop {
match socket.recv_from(&mut buf).await {
Ok((size, address)) => {
// Ajouter le client à la liste
// todo : solution vraiment pas idéal, il faudrait vraiment la repenser avec un système helo/bye
if !clients.contains_key(&address) {
let client = Client::new(address);
clients.insert(address, client);
println!("Nouveau client connecté: {}", address);
}else {
let mut client = clients.get_mut(&address).unwrap();
client.update_last_seen();
}
if let Ok(message) = UdpClientMessage::from_bytes(&buf[..size]) {
let event = Event::UdpIn { address, size, message };
event_bus.emit(event).await;
} else {
println!("Erreur lors du parsing du message de {}: {:?}", address, &buf[..size]);
}
}
Err(e) => {
match e.kind() {
std::io::ErrorKind::ConnectionReset |
std::io::ErrorKind::ConnectionAborted => {
// Silencieux pour les déconnexions normales
continue;
}
_ => {
println!("Erreur UDP: {}", e);
continue;
}
}
}
}
}
});
self.abort_handle = Some(recv_task.abort_handle());
Ok(())
}
pub async fn send(&self, address: SocketAddr, message: UdpServerMessage) -> bool {
let event_bus = self.event_bus.clone();
match self.socket.send_to(&message.to_byte(), address).await {
Ok(size) => {
event_bus.emit(Event::UdpOut { address, size, message }).await;
true
}
Err(e) => {
println!("Erreur lors de l'envoi du message à {}: {}", address, e);
// Optionnel : retirer le client si l'adresse est invalide
self.remove_client(address).await;
false
}
}
}
pub async fn group_send(&self, addr_list: Vec<SocketAddr>, message: UdpServerMessage) -> bool {
if addr_list.is_empty() {
return true;
}
let socket = self.socket.clone();
let clients = self.clients.clone();
let send_tasks: Vec<_> = addr_list.into_iter().map(|address| {
let event_bus = self.event_bus.clone();
let message_clone = message.clone();
let socket_clone = socket.clone();
let clients_clone = clients.clone();
tokio::spawn(async move {
match socket_clone.send_to(&message_clone.to_byte(), address).await {
Ok(size) => {
event_bus.emit(Event::UdpOut { address, size, message: message_clone }).await;
true
}
Err(e) => {
println!("Erreur lors de l'envoi du message à {}: {}", address, e);
// Optionnel : retirer le client si l'adresse est invalide
if clients_clone.contains_key(&address) {
clients_clone.remove(&address);
println!("Client {} retiré de la liste", address);
}
false
}
}
})
}).collect();
let mut all_success = true;
for task in send_tasks {
match task.await {
Ok(success) => {
if !success {
all_success = false;
}
}
Err(_) => {
all_success = false;
}
}
}
all_success
}
pub async fn all_send(&self, message: UdpServerMessage) -> bool {
let client_addresses = self.get_clients().await;
self.group_send(client_addresses, message).await
}
pub async fn get_clients(&self) -> Vec<SocketAddr> {
self.clients.iter()
.map(|entry| *entry.key())
.collect()
}
// Nouvelle méthode pour nettoyer les clients déconnectés
async fn remove_client(&self, address: SocketAddr) {
if self.clients.contains_key(&address){
self.clients.remove(&address);
println!("Client {} retiré de la liste", address);
}else {
println!("Client {} n'est pas dans la liste", address);
}
}
// Méthode pour nettoyer les clients inactifs
pub async fn cleanup_inactive_clients(&self) {
let timeout = Duration::from_secs(10);
let now = Instant::now();
let mut to_remove = Vec::new();
for entry in self.clients.iter() {
let address = *entry.key();
let client = entry.value();
if now.duration_since(client.last_seen()) > timeout {
to_remove.push(address);
}
}
for address in &to_remove {
println!("Suppression du client {}", address);
self.clients.remove(address);
}
}
}

View File

@@ -1,114 +0,0 @@
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::task::AbortHandle;
use crate::domain::client::{Client, ClientManager};
use crate::domain::event::{Event, EventBus};
use crate::network::protocol::{UDPMessageType, UDPMessage, UdpBroadcastMessage, UDPMessageData};
use crate::network::udp::UdpServer;
use crate::store::store_service::StoreService;
#[derive(Clone)]
pub struct Dispatcher {
event_bus: Arc<EventBus>,
udp_server: Arc<UdpServer>,
client_manager: Arc<ClientManager>,
store: StoreService,
}
impl Dispatcher {
pub async fn new(event_bus: EventBus, udp_server: UdpServer, client_manager: ClientManager, store: StoreService) -> Self {
Self {
event_bus: Arc::new(event_bus),
udp_server: Arc::new(udp_server),
client_manager: Arc::new(client_manager),
store,
}
}
pub async fn start(&self, receiver: kanal::AsyncReceiver<Event>) {
println!("Dispatcher démarré sur le thread : {:?}", std::thread::current().id());
let (_udp_in_abort_handle, udp_in_sender) = self.udp_in_handler().await;
while let Ok(event) = receiver.recv().await {
match event {
Event::UdpIn(message) => {
let _ = udp_in_sender.send(message).await;
// // println!("Message reçu de {}: {:?}", address, message);
// let udp_server = self.udp_server.clone();
// tokio::spawn(async move {
// match message {
// UdpClientMessage::Ping {message_id} => {
// let send = UdpServerMessage::ping(message_id);
// let _ = udp_server.all_send(send);
// }
// UdpClientMessage::Audio {sequence, data} => {
// let tmp_user_id = Uuid::new_v4();
// let send = UdpServerMessage::audio(tmp_user_id, sequence, data);
// let _ = udp_server.all_send(send).await;
// }
// }
// });
}
Event::UdpOut(message) => {
// println!("Message envoyé à {}: {:?}", address, message);
}
Event::TickSeconds => {
self.client_manager.cleanup(Duration::from_secs(10));
}
_ => {
println!("Event non prit en charge : {:?}", event)
}
}
}
}
pub async fn udp_in_handler(&self) -> (AbortHandle, mpsc::Sender<UDPMessage>) {
let (sender, mut consumer) = mpsc::channel::<UDPMessage>(1024);
let udp_server = self.udp_server.clone();
let client_manager = self.client_manager.clone();
let task = tokio::spawn(async move {
while let Some(message) = consumer.recv().await {
// Traitement direct du message sans double parsing
match message.message_type() {
UDPMessageType::Ping => {
println!("ping from {:?}", message);
let response_message = UDPMessage::server_ping(message.address, message.get_message_id().unwrap());
if client_manager.client_exists(message.address) {
client_manager.update_client_last_seen(message.address);
}else {
client_manager.add(Client::new(message.address));
println!("new client: {:?}", message.address);
}
let _ = udp_server.send_udp_message(&response_message).await;
}
UDPMessageType::Audio => {
if let UDPMessageData::ClientAudio { sequence, data } = &message.data {
let addresses = client_manager.get_all_addresses();
if let Some(speaker_uuid) = client_manager.get_uuid_by_address(message.address) {
let response_message = UdpBroadcastMessage::server_audio(addresses, speaker_uuid, *sequence, data.clone());
let _ = udp_server.broadcast_udp_message(&response_message).await;
} else {
// Tu peux gérer ici le cas où lUUID nest pas trouvé (optionnel)
// println!("UUID non trouvé pour l'adresse: {:?}", message.address);
}
}
}
}
}
});
(task.abort_handle(), sender)
}
}

View File

@@ -1 +0,0 @@
pub mod dispatcher;

View File

@@ -1,70 +0,0 @@
-- Add migration script here
-- Création de la table user
CREATE TABLE user
(
id TEXT PRIMARY KEY NOT NULL,
username TEXT NOT NULL UNIQUE,
email TEXT,
avatar_url TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
-- Création de la table sub_server
CREATE TABLE sub_server
(
id TEXT PRIMARY KEY NOT NULL,
name TEXT NOT NULL,
description TEXT,
owner_id TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY (owner_id) REFERENCES user (id)
);
-- Création de la table channel
CREATE TABLE channel
(
id TEXT PRIMARY KEY NOT NULL,
name TEXT NOT NULL,
description TEXT,
channel_type TEXT NOT NULL,
sub_server_id TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY (sub_server_id) REFERENCES sub_server (id)
);
-- Création de la table message
CREATE TABLE message
(
id TEXT PRIMARY KEY NOT NULL,
content TEXT NOT NULL,
author_id TEXT NOT NULL,
channel_id TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY (author_id) REFERENCES user (id),
FOREIGN KEY (channel_id) REFERENCES channel (id)
);
-- Table de liaison pour les utilisateurs et sub_servers (relation N-N)
CREATE TABLE sub_server_user
(
id TEXT PRIMARY KEY NOT NULL,
sub_server_id TEXT NOT NULL,
user_id TEXT NOT NULL,
joined_at TEXT NOT NULL,
role TEXT,
FOREIGN KEY (sub_server_id) REFERENCES sub_server (id),
FOREIGN KEY (user_id) REFERENCES user (id),
UNIQUE (sub_server_id, user_id)
);
-- Index pour améliorer les performances
CREATE INDEX idx_channel_sub_server ON channel (sub_server_id);
CREATE INDEX idx_message_channel ON message (channel_id);
CREATE INDEX idx_message_author ON message (author_id);
CREATE INDEX idx_sub_server_user_sub_server ON sub_server_user (sub_server_id);
CREATE INDEX idx_sub_server_user_user ON sub_server_user (user_id);

View File

@@ -1,4 +0,0 @@
pub mod models;
pub mod repositories;
pub mod store_service;
mod session;

View File

@@ -1,47 +0,0 @@
use std::hash::{Hash, Hasher};
// use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
// use uuid::Uuid;
use sqlx::types::{Uuid, chrono::{{ DateTime, Utc}}};
#[derive(Debug, Clone, sqlx::FromRow, Serialize, Deserialize)]
pub struct Channel {
pub id: Uuid,
pub sub_server_id: Uuid,
pub channel_type: ChannelType,
pub name: String,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Clone, sqlx::Type, Serialize, Deserialize)]
#[repr(i32)]
pub enum ChannelType {
Text = 0,
Voice = 1,
}
impl Channel {
pub fn new(sub_server_id: Uuid, channel_type: ChannelType, name: String) -> Self {
Self {
id: Uuid::new_v4(),
sub_server_id,
channel_type,
name,
created_at: Utc::now(),
}
}
}
impl PartialEq for Channel {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl Eq for Channel {}
impl Hash for Channel {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}

View File

@@ -1,32 +0,0 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, sqlx::FromRow, Serialize, Deserialize)]
pub struct LinkSubServerUser {
pub id: Option<u64>,
pub sub_server_id: Uuid,
pub user_id: Uuid,
pub display_username: String,
pub joined_at: DateTime<Utc>,
pub last_seen_at: Option<DateTime<Utc>>,
pub is_admin: bool,
}
impl LinkSubServerUser {
pub fn new(
sub_server_id: Uuid,
user_id: Uuid,
display_username: String
) -> Self {
Self {
id: None,
sub_server_id,
user_id,
display_username,
joined_at: Utc::now(),
last_seen_at: None,
is_admin: false,
}
}
}

View File

@@ -1,24 +0,0 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, sqlx::FromRow, Serialize, Deserialize)]
pub struct Message {
pub id: Uuid,
pub channel_id: Uuid,
pub user_id: Uuid,
pub content: String,
pub created_at: DateTime<Utc>,
}
impl Message {
pub fn new(channel_id: Uuid, user_id: Uuid, content: String) -> Self {
Self {
id: Uuid::new_v4(),
channel_id,
user_id,
content,
created_at: Utc::now(),
}
}
}

View File

@@ -1,11 +0,0 @@
pub mod sub_server;
pub mod channel;
pub mod user;
pub mod message;
pub mod link_sub_server_user;
pub use sub_server::*;
pub use channel::*;
pub use user::*;
pub use message::*;
pub use link_sub_server_user::*;

View File

@@ -1,40 +0,0 @@
// L'application peut avoir plusieurs sous servers
use std::hash::{Hash, Hasher};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::store::models::Channel;
#[derive(Debug, Clone, sqlx::FromRow, Serialize, Deserialize)]
pub struct SubServer {
pub id: Uuid,
pub name: String,
pub password: String, // voir si on le hash, mais sera certainement pas nécessaire.
pub created_at: DateTime<Utc>,
}
impl SubServer {
pub fn new(name: String, password: String) -> Self {
Self {
id: Uuid::new_v4(),
name,
password,
created_at: Utc::now(),
}
}
}
impl PartialEq for SubServer {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl Eq for SubServer {}
impl Hash for SubServer {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}

View File

@@ -1,13 +0,0 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, sqlx::FromRow, Serialize, Deserialize)]
pub struct User {
pub id: Uuid,
pub username: String,
pub pub_key: String, // l'identification se fera par clé public/privé, comme teamspeak
pub created_at: DateTime<Utc>,
}

View File

@@ -1,97 +0,0 @@
use std::sync::Arc;
use sqlx::SqlitePool;
use uuid::Uuid;
use crate::store::models::channel::Channel;
use crate::store::models::Message;
use crate::store::store_service::StoreService;
use crate::utils::shared_store::SharedArcMap;
#[derive(Clone)]
pub struct ChannelRepository {
store: StoreService,
}
impl ChannelRepository {
pub fn new(store: StoreService) -> Self {
Self {
store
}
}
}
impl ChannelRepository {
// getters
pub async fn all(&self) -> Vec<Arc<Channel>> {
self.store.channels.values().collect()
}
pub async fn get(&self, id: Uuid) -> Option<Arc<Channel>> {
self.store.channels.get(&id)
}
}
// pub id: Uuid,
// pub sub_server_id: Uuid,
// pub channel_type: ChannelType,
// pub name: String,
// pub created_at: DateTime<Utc>,
impl ChannelRepository {
// writers
pub async fn create(&self, mut channel: Channel) -> Result<Arc<Channel>, sqlx::Error> {
sqlx::query(
"INSERT INTO channel (id, sub_server, channel_type, name, created_at) VALUES (?, ?, ?, ?, ?)"
)
.bind(&channel.id)
.bind(&channel.sub_server_id)
.bind(&channel.channel_type)
.bind(&channel.name)
.bind(&channel.created_at)
.execute(&self.store.db)
.await?;
// ajouter au cache
let arc_server = Arc::new(channel.clone());
self.store.channels.insert_arc(channel.id, arc_server.clone());
Ok(arc_server)
}
pub async fn save(&self, sub_server: &Channel) -> Result<(), sqlx::Error> {
sqlx::query(
"UPDATE channel SET name = ? WHERE id = ?"
)
.bind(&sub_server.name)
.bind(&sub_server.id)
.execute(&self.store.db)
.await?;
// Mettre à jour le cache
self.store.channels.insert(sub_server.id, sub_server.clone());
Ok(())
}
pub async fn delete(&self, id: Uuid) -> Result<bool, sqlx::Error> {
let rows_affected = sqlx::query("DELETE FROM channel WHERE id = ?")
.bind(&id)
.execute(&self.store.db)
.await?
.rows_affected();
if rows_affected > 0 {
self.store.channels.remove(&id);
Ok(true)
} else {
Ok(false)
}
}
}
impl ChannelRepository {
// getters (db)
pub async fn db_all(&self) -> Result<Vec<Channel>, sqlx::Error> {
sqlx::query_as("SELECT * FROM channel")
.fetch_all(&self.store.db)
.await
}
}

View File

@@ -1,75 +0,0 @@
use std::sync::Arc;
use uuid::Uuid;
use chrono::{DateTime, Utc};
use sqlx::SqlitePool;
use crate::store::models::link_sub_server_user::LinkSubServerUser;
use crate::store::store_service::StoreService;
use crate::utils::shared_store::SharedArcVec;
#[derive(Clone)]
pub struct LinkSubServerUserRepository {
store: StoreService
}
impl LinkSubServerUserRepository {
pub fn new(store: StoreService) -> Self {
Self { store }
}
}
impl LinkSubServerUserRepository {
// getters
/// Obtenir une relation spécifique
pub async fn get_relation(&self, sub_server_id: Uuid, user_id: Uuid) -> Option<LinkSubServerUser> {
self.store.sub_server_users.iter()
.find(|relation| {
relation.sub_server_id == sub_server_id && relation.user_id == user_id
})
.map(|arc| (*arc).clone())
}
pub async fn exists(&self, sub_server_id: Uuid, user_id: Uuid) -> bool {
self.get_relation(sub_server_id, user_id).await.is_some()
}
}
impl LinkSubServerUserRepository {
// writers
/// Créer une nouvelle relation
pub async fn create(&self, through: LinkSubServerUser) -> Result<LinkSubServerUser, sqlx::Error> {
// Vérifier que la relation n'existe pas déjà
if self.exists(through.sub_server_id, through.user_id).await {
return Err(sqlx::Error::RowNotFound); // Ou une erreur custom
}
// Insérer en base
sqlx::query(
"INSERT INTO sub_server_users
(sub_server_id, user_id, display_name, joined_at, last_seen, is_admin)
VALUES (?, ?, ?, ?, ?, ?)"
)
.bind(&through.sub_server_id)
.bind(&through.user_id)
.bind(&through.display_username)
.bind(&through.joined_at)
.bind(&through.last_seen_at)
.bind(&through.is_admin)
.execute(&self.store.db)
.await?;
// Ajouter au cache
self.store.sub_server_users.push(through.clone());
Ok(through)
}
}
impl LinkSubServerUserRepository {
// getters (db)
pub async fn db_all(&self) -> Result<Vec<LinkSubServerUser>, sqlx::Error> {
sqlx::query_as("SELECT * FROM sub_server_users")
.fetch_all(&self.store.db)
.await
}
}

View File

@@ -1,77 +0,0 @@
use std::sync::Arc;
use sqlx::SqlitePool;
use uuid::Uuid;
use crate::store::models::message::Message;
use crate::store::store_service::StoreService;
#[derive(Clone)]
pub struct MessageRepository {
store: StoreService
}
impl MessageRepository {
pub fn new(store: StoreService) -> Self {
Self {
store
}
}
}
impl MessageRepository {
// getters (caches)
}
impl MessageRepository {
// writers
pub async fn create(&self, mut message: Message) -> Result<Arc<Message>, sqlx::Error> {
sqlx::query(
"INSERT INTO message (id, channel_id, user_id, content, created_at) VALUES (?, ?, ?, ?, ?)"
)
.bind(&message.id)
.bind(&message.channel_id)
.bind(&message.user_id)
.bind(&message.content)
.bind(&message.created_at)
.execute(&self.store.db)
.await?;
// ajouter au cache
let arc_message = Arc::new(message.clone());
Ok(arc_message)
}
pub async fn save(&self, message: &Message) -> Result<(), sqlx::Error> {
sqlx::query(
"UPDATE message SET content = ? WHERE id = ?"
)
.bind(&message.content)
.bind(&message.id)
.execute(&self.store.db)
.await?;
Ok(())
}
pub async fn delete(&self, id: Uuid) -> Result<bool, sqlx::Error> {
let rows_affected = sqlx::query("DELETE FROM message WHERE id = ?")
.bind(&id)
.execute(&self.store.db)
.await?
.rows_affected();
if rows_affected > 0 {
Ok(true)
} else {
Ok(false)
}
}
}
impl MessageRepository {
// getters (db)
pub async fn db_all(&self) -> Result<Vec<Message>, sqlx::Error> {
sqlx::query_as("SELECT * FROM message")
.fetch_all(&self.store.db)
.await
}
}

View File

@@ -1,11 +0,0 @@
mod sub_server_repository;
mod channel_repository;
mod user_repository;
mod message_repository;
mod link_sub_server_user_repository;
pub use sub_server_repository::*;
pub use channel_repository::*;
pub use user_repository::*;
pub use message_repository::*;
pub use link_sub_server_user_repository::*;

View File

@@ -1,92 +0,0 @@
use std::sync::Arc;
use sqlx::SqlitePool;
use uuid::Uuid;
use crate::store::models::Channel;
use crate::store::models::sub_server::SubServer;
use crate::store::store_service::StoreService;
use crate::utils::shared_store::SharedArcMap;
#[derive(Clone)]
pub struct SubServerRepository {
store: StoreService
}
impl SubServerRepository {
pub fn new(store: StoreService) -> Self {
Self {
store
}
}
}
impl SubServerRepository {
// getters
pub async fn all(&self) -> Vec<Arc<SubServer>> {
self.store.sub_servers.values().collect()
}
pub async fn get(&self, id: Uuid) -> Option<Arc<SubServer>> {
self.store.sub_servers.get(&id)
}
}
impl SubServerRepository {
// writers
pub async fn create(&self, mut sub_server: SubServer) -> Result<Arc<SubServer>, sqlx::Error> {
sqlx::query(
"INSERT INTO sub_server (id, name, password, created_at) VALUES (?, ?, ?, ?)"
)
.bind(&sub_server.id)
.bind(&sub_server.name)
.bind(&sub_server.password)
.bind(&sub_server.created_at)
.execute(&self.store.db)
.await?;
// ajouter au cache
let arc_server = Arc::new(sub_server.clone());
self.store.sub_servers.insert_arc(sub_server.id, arc_server.clone());
Ok(arc_server)
}
pub async fn save(&self, sub_server: &SubServer) -> Result<(), sqlx::Error> {
sqlx::query(
"UPDATE sub_server SET name = ?, password = ? WHERE id = ?"
)
.bind(&sub_server.name)
.bind(&sub_server.password)
.bind(&sub_server.id)
.execute(&self.store.db)
.await?;
// Mettre à jour le cache
self.store.sub_servers.insert(sub_server.id, sub_server.clone());
Ok(())
}
pub async fn delete(&self, id: Uuid) -> Result<bool, sqlx::Error> {
let rows_affected = sqlx::query("DELETE FROM sub_server WHERE id = ?")
.bind(&id)
.execute(&self.store.db)
.await?
.rows_affected();
if rows_affected > 0 {
self.store.sub_servers.remove(&id);
Ok(true)
} else {
Ok(false)
}
}
}
impl SubServerRepository {
// getters (db)
pub async fn db_all(&self) -> Result<Vec<SubServer>, sqlx::Error> {
sqlx::query_as("SELECT * FROM sub_server")
.fetch_all(&self.store.db)
.await
}
}

View File

@@ -1,91 +0,0 @@
use std::sync::Arc;
use sqlx::SqlitePool;
use uuid::Uuid;
use crate::store::models::user::User;
use crate::store::store_service::StoreService;
use crate::utils::shared_store::SharedArcMap;
#[derive(Clone)]
pub struct UserRepository {
store: StoreService,
}
impl UserRepository {
pub fn new(store: StoreService) -> Self {
Self {
store
}
}
}
impl UserRepository {
// getters
pub async fn all(&self) -> Vec<Arc<User>> {
self.store.users.values().collect()
}
pub async fn get(&self, id: Uuid) -> Option<Arc<User>> {
self.store.users.get(&id)
}
}
impl UserRepository {
// writers
pub async fn create(&self, mut user: User) -> Result<Arc<User>, sqlx::Error> {
sqlx::query(
"INSERT INTO user (id, name, password, created_at) VALUES (?, ?, ?, ?)"
)
.bind(&user.id)
.bind(&user.username)
.bind(&user.pub_key)
.bind(&user.created_at)
.execute(&self.store.db)
.await?;
// ajouter au cache
let arc_server = Arc::new(user.clone());
self.store.users.insert_arc(user.id, arc_server.clone());
Ok(arc_server)
}
pub async fn save(&self, user: &User) -> Result<(), sqlx::Error> {
sqlx::query(
"UPDATE user SET name = ?, password = ? WHERE id = ?"
)
.bind(&user.username)
.bind(&user.pub_key)
.bind(&user.id)
.execute(&self.store.db)
.await?;
// Mettre à jour le cache
self.store.users.insert(user.id, user.clone());
Ok(())
}
pub async fn delete(&self, id: Uuid) -> Result<bool, sqlx::Error> {
let rows_affected = sqlx::query("DELETE FROM user WHERE id = ?")
.bind(&id)
.execute(&self.store.db)
.await?
.rows_affected();
if rows_affected > 0 {
self.store.users.remove(&id);
Ok(true)
} else {
Ok(false)
}
}
}
impl UserRepository {
// getters (db)
pub async fn db_all(&self) -> Result<Vec<User>, sqlx::Error> {
sqlx::query_as("SELECT * FROM user")
.fetch_all(&self.store.db)
.await
}
}

View File

@@ -1,213 +0,0 @@
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::net::SocketAddr;
use std::sync::{Arc, Weak};
use std::time::Instant;
use axum::extract::ws::WebSocket;
use uuid::Uuid;
use crate::store::models::{Channel, SubServer};
use crate::store::models::user::User;
use crate::utils::shared_store::{SharedArcHashSet, SharedArcMap, SharedArcVec};
/// Représente un client connecté au serveur (WebSocket + optionnel UDP)
///
/// Chaque client a une connexion WebSocket obligatoire et peut avoir une connexion UDP
/// pour la voix s'il rejoint un channel vocal.
#[derive(Debug)]
pub struct Client {
// ===== CHAMPS IMMUTABLES =====
pub session_id: Uuid,
pub user: User,
pub websocket: WebSocket,
pub created_at: Instant,
// ===== CHAMPS MUTABLES =====
pub udp_socket: Option<SocketAddr>,
pub voice_channel: Option<Channel>,
// ===== RÉFÉRENCES INTERNES =====
/// Référence faible vers le manager pour l'auto-nettoyage
manager: Option<Weak<ClientManager>>,
}
impl Client {
/// Crée un nouveau client
pub fn new(user: User, websocket: WebSocket) -> Self {
Self {
session_id: Uuid::new_v4(),
user,
websocket,
created_at: Instant::now(),
udp_socket: None,
voice_channel: None,
manager: None,
}
}
// ===== MÉTHODES MUTABLES INTERNES =====
// Ces méthodes sont pub(crate) - elles ne doivent être appelées que via ClientManager::modify_client()
/// Configure l'adresse UDP (méthode interne)
pub(crate) fn set_udp_socket(&mut self, udp_socket: SocketAddr) {
self.udp_socket = Some(udp_socket);
}
/// Supprime l'adresse UDP (méthode interne)
pub(crate) fn remove_udp_socket(&mut self) {
self.udp_socket = None;
}
/// Configure la référence vers le manager (appelé par le manager)
pub(crate) fn set_manager(&mut self, manager: Weak<ClientManager>) {
self.manager = Some(manager);
}
}
impl PartialEq for Client {
fn eq(&self, other: &Self) -> bool {
self.session_id == other.session_id
}
}
impl Eq for Client {}
impl Hash for Client {
fn hash<H: Hasher>(&self, state: &mut H) {
self.session_id.hash(state);
}
}
/// Nettoyage automatique quand le client est détruit
impl Drop for Client {
fn drop(&mut self) {
if let Some(manager) = self.manager.as_ref().and_then(|w| w.upgrade()) {
manager.cleanup_client_index(&self);
}
}
}
// =========================================================================
// CLIENT MANAGER
// =========================================================================
/// Gestionnaire centralisé de tous les clients connectés
///
/// Maintient les clients et leurs index pour des recherches efficaces.
/// Thread-safe via SharedArcMap.
pub struct ClientManager {
/// Map principale des clients par session_id
clients: SharedArcMap<Uuid, Client>,
// ===== INDEX POUR RECHERCHES RAPIDES =====
/// Index des clients par channel_id
voice_channel_clients: SharedArcMap<Channel, SharedArcHashSet<Client>>, // channel_id -> HashSet<session_id>
/// Index des clients par sub_server_id
sub_server_clients: SharedArcMap<SubServer, SharedArcHashSet<Client>>, // sub_server_id -> Vec<session_id>
/// Index des clients par adresse UDP
udp_clients: SharedArcMap<SocketAddr, String>, // udp_address -> session_id
}
impl ClientManager {
/// Crée un nouveau gestionnaire de clients
pub fn new() -> Self {
Self {
clients: SharedArcMap::new(),
voice_channel_clients: SharedArcMap::new(),
sub_server_clients: SharedArcMap::new(),
udp_clients: SharedArcMap::new(),
}
}
// ===== GESTION DES CLIENTS =====
/// Ajoute un nouveau client au gestionnaire
pub fn add_client(self: &Arc<Self>, mut client: Client) -> Arc<Client> {
let session_id = client.session_id.clone();
// Lier le client au manager pour l'auto-nettoyage
client.set_manager(Arc::downgrade(self));
// Stocker et retourner
let arc_client = Arc::new(client);
self.clients.insert_arc(session_id, arc_client.clone());
arc_client
}
pub fn join_sub_server(&self, client: &Arc<Client>, sub_server: &SubServer) {
let clients = self.sub_server_clients.get(sub_server).unwrap_or_else(|| {
let new_clients = Arc::new(SharedArcHashSet::new());
self.sub_server_clients.insert_arc(sub_server.clone(), new_clients.clone());
new_clients
});
clients.insert_arc(client.clone());
}
/// Obtient un client par son session_id
pub fn get_client(&self, session_id: &Uuid) -> Option<Arc<Client>> {
self.clients.get(session_id)
}
/// Obtient tous les clients connectés
pub fn get_all_clients(&self) -> Vec<Arc<Client>> {
self.clients.values().collect()
}
// ===== OPÉRATIONS DE MODIFICATION =====
/// Modifie un client via une closure thread-safe
///
/// Cette méthode garantit que la modification est atomique et que
/// les index restent cohérents avec l'état du client.
///
/// # Arguments
/// * `session_id` - L'ID de session du client à modifier
/// * `f` - La closure qui recevra une référence mutable vers le client
///
/// # Returns
/// `true` si le client a été trouvé et modifié, `false` sinon
pub fn modify_client<F>(&self, client: &Client, f: F) -> bool
where
F: FnOnce(&mut Client),
{
// Convertir &str en String pour les clés
todo!()
}
}
impl ClientManager {
// Delete methods
/// Supprime tous les index d'un client
pub fn cleanup_client_index(&self, client: &Client) {
if let Some(channel) = client.voice_channel.as_ref() {
self.remove_client_from_voice_channel(channel, client);
}
}
/// Supprime un client du gestionnaire
pub fn remove_client(&self, session_id: &Uuid) {
// Convertir &str en String pour les clés
if let Some(client) = self.clients.get(session_id) {
// Nettoyer tous les index
self.cleanup_client_index(&client);
// Supprimer le client de la liste principale
self.clients.remove(&session_id);
}
}
pub fn remove_client_from_voice_channel(&self, voice_channel: &Channel, client: &Client) {
if let Some(channel) = self.voice_channel_clients.get(voice_channel) {
channel.remove(client);
}
}
pub fn remove_client_from_sub_servers(&self, client: &Client) {
for (sub_server, clients) in self.sub_server_clients.iter() {
clients.remove(client);
}
}
}

View File

@@ -1 +0,0 @@
mod client;

View File

@@ -1,125 +0,0 @@
use std::collections::HashMap;
use std::sync::Arc;
use uuid::Uuid;
use sqlx::{AnyPool, SqlitePool};
use crate::store::models::{SubServer, Channel, User, Message, ChannelType, LinkSubServerUser};
use crate::store::repositories::{
SubServerRepository, ChannelRepository, UserRepository,
MessageRepository, LinkSubServerUserRepository
};
use crate::utils::shared_store::{SharedArcMap, SharedArcVec};
#[derive(Debug, Clone)]
pub enum StoreEvent {
SubServerCreated(Arc<SubServer>),
SubServerUpdated(Arc<SubServer>),
ChannelCreated(Arc<Channel>),
ChannelUpdated(Arc<Channel>),
MessageSent(Message),
UserJoinedSubServer { sub_server_id: Uuid, user_id: Uuid },
}
#[derive(Clone)]
pub struct StoreService {
// Database
pub db: SqlitePool,
// ✅ Caches mémoire centralisés
pub users: SharedArcMap<Uuid, User>,
pub sub_servers: SharedArcMap<Uuid, SubServer>,
pub channels: SharedArcMap<Uuid, Channel>,
pub sub_server_users: SharedArcVec<LinkSubServerUser>,
}
impl StoreService {
pub async fn new(database_url: &str) -> Result<Self, sqlx::Error> {
let connection_url = Self::normalize_database_url(database_url);
println!("🔍 Tentative de connexion à: {}", connection_url);
let db = SqlitePool::connect(&connection_url).await?;
// sqlx::migrate!("./src/store/migrations").run(&db).await?;
let mut service = Self {
db,
users: SharedArcMap::new(),
sub_servers: SharedArcMap::new(),
channels: SharedArcMap::new(),
sub_server_users: SharedArcVec::new(),
};
// Charger tout en mémoire au démarrage
let _ = service.load_all_caches().await;
Ok(service)
}
async fn load_all_caches(&self) -> Result<(), sqlx::Error> {
let sub_server_rep = SubServerRepository::new(self.clone());
let user_rep = UserRepository::new(self.clone());
let channel_rep = ChannelRepository::new(self.clone());
let link_sub_server_user_rep = LinkSubServerUserRepository::new(self.clone());
// sub_servers
let sub_servers = sub_server_rep.db_all().await?;
self.sub_servers.insert_batch_with_key(sub_servers, |sub_server| sub_server.id);
// Users
let users = user_rep.db_all().await?;
self.users.insert_batch_with_key(users, |user| user.id);
// Channels
let channels = channel_rep.db_all().await?;
self.channels.insert_batch_with_key(channels, |channel| channel.id);
// Relations N-N
let relations: Vec<LinkSubServerUser> = link_sub_server_user_rep.db_all().await?;
self.sub_server_users.extend(relations);
Ok(())
}
}
impl StoreService {
// Getters repositories
pub fn user_repository(&self) -> UserRepository {
UserRepository::new(self.clone())
}
pub fn sub_server_repository(&self) -> SubServerRepository {
SubServerRepository::new(self.clone())
}
pub fn channel_repository(&self) -> ChannelRepository {
ChannelRepository::new(self.clone())
}
pub fn message_repository(&self) -> MessageRepository {
MessageRepository::new(self.clone())
}
pub fn sub_server_user_repository(&self) -> LinkSubServerUserRepository {
LinkSubServerUserRepository::new(self.clone())
}
}
impl StoreService {
// ===== HELPERS =====
/// ✅ Normalise l'URL pour supporter différentes bases de données
fn normalize_database_url(database_url: &str) -> String {
// Si c'est déjà une URL complète, on la retourne telle quelle
if database_url.contains("://") {
return database_url.to_string();
}
// Sinon, on assume que c'est SQLite (comportement actuel)
if database_url.starts_with("sqlite:") {
database_url.to_string()
} else {
format!("sqlite:{}?mode=rwc", database_url)
}
}
}

116
src/utils/config.rs Normal file
View File

@@ -0,0 +1,116 @@
use std::env;
use std::path::Path;
use serde::Deserialize;
use crate::utils::logger::ContextLogger;
#[derive(Deserialize, Debug, Clone)]
pub struct Config {
#[serde(default = "default_server_host")]
pub server_host: String,
#[serde(default = "default_server_port")]
pub server_port: u16,
#[serde(default = "default_buffer_size")]
pub buffer_size: usize,
#[serde(default = "default_log_level")]
pub log_level: String,
#[serde(default)]
pub debug_mode: bool,
pub workers: Option<usize>,
pub database_url: Option<String>,
}
// Default values
fn default_server_host() -> String { "127.0.0.1".to_string() }
fn default_server_port() -> u16 { 8080 }
fn default_buffer_size() -> usize { 1024 }
fn default_log_level() -> String { "info".to_string() }
#[derive(Debug)]
pub enum ConfigError {
EnvFile(dotenvy::Error),
Parse(envy::Error),
}
impl std::fmt::Display for ConfigError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ConfigError::EnvFile(e) => write!(f, "Error loading .env file: {}", e),
ConfigError::Parse(e) => write!(f, "Error parsing configuration: {}", e),
}
}
}
impl std::error::Error for ConfigError {}
impl Config {
pub fn load() -> Result<Config, ConfigError> {
let logger = ContextLogger::new("CONFIG");
// Display current working directory
match env::current_dir() {
Ok(path) => {
logger.info(&format!("Working directory: {}", path.display()));
}
Err(e) => {
logger.error(&format!("Error getting directory: {}", e));
}
}
// Check if .env file exists and load it
let env_path = Path::new(".env");
if env_path.exists() {
logger.info(".env found");
// Load the .env file
match dotenvy::from_path(env_path) {
Ok(_) => logger.info(".env file loaded successfully"),
Err(e) => {
logger.error(&format!("Error loading .env: {}", e));
return Err(ConfigError::EnvFile(e));
}
}
} else {
logger.info(".env file does not exist - using defaults and system env vars");
}
// Load configuration from environment variables
logger.info("Loading configuration from environment variables");
let config = envy::from_env::<Config>().map_err(ConfigError::Parse)?;
logger.info("Configuration loaded successfully");
config.print_summary();
Ok(config)
}
pub fn socket_addr(&self) -> String {
format!("{}:{}", self.server_host, self.server_port)
}
pub fn workers_count(&self) -> usize {
self.workers.unwrap_or_else(|| {
crate::utils::toolbox::number_of_cpus()
})
}
pub fn print_summary(&self) {
let logger = ContextLogger::new("CONFIG");
logger.info(&format!("Server address: {}", self.socket_addr()));
logger.info(&format!("Workers: {}", self.workers_count()));
logger.info(&format!("Buffer size: {} bytes", self.buffer_size));
logger.info(&format!("Log level: {}", self.log_level));
logger.info(&format!("Debug mode: {}", self.debug_mode));
if self.database_url.is_some() {
logger.info("Database URL: [CONFIGURED]");
} else {
logger.info("Database URL: [NOT SET]");
}
}
}

88
src/utils/logger.rs Normal file
View File

@@ -0,0 +1,88 @@
use log::{info, warn, error, debug, LevelFilter};
use std::io::Write;
#[derive(Clone)]
pub struct ContextLogger {
context: String,
}
impl ContextLogger {
pub fn new(context: &str) -> Self {
Self {
context: context.to_string(),
}
}
pub fn with_sub_context(&self, sub_context: &str) -> Self {
Self {
context: format!("{}:{}", self.context, sub_context),
}
}
pub fn info(&self, msg: &str) {
info!("[{}] {}", self.context, msg);
}
pub fn warn(&self, msg: &str) {
warn!("[{}] {}", self.context, msg);
}
pub fn error(&self, msg: &str) {
error!("[{}] {}", self.context, msg);
}
pub fn debug(&self, msg: &str) {
debug!("[{}] {}", self.context, msg);
}
}
// Macros pour simplifier l'usage avec formatage
#[macro_export]
macro_rules! log_info {
($logger:expr, $($arg:tt)*) => {
log::info!("[{}] {}", $logger.context, format_args!($($arg)*));
};
}
#[macro_export]
macro_rules! log_warn {
($logger:expr, $($arg:tt)*) => {
log::warn!("[{}] {}", $logger.context, format_args!($($arg)*));
};
}
#[macro_export]
macro_rules! log_error {
($logger:expr, $($arg:tt)*) => {
log::error!("[{}] {}", $logger.context, format_args!($($arg)*));
};
}
#[macro_export]
macro_rules! log_debug {
($logger:expr, $($arg:tt)*) => {
log::debug!("[{}] {}", $logger.context, format_args!($($arg)*));
};
}
pub fn init_logger(log_level: &str) {
let level = match log_level.to_lowercase().as_str() {
"debug" => LevelFilter::Debug,
"info" => LevelFilter::Info,
"warn" | "warning" => LevelFilter::Warn,
"error" => LevelFilter::Error,
_ => LevelFilter::Info,
};
env_logger::Builder::from_default_env()
.filter_level(level)
.format_timestamp_secs()
.format(|buf, record| {
writeln!(buf, "{} | {} | {}",
buf.timestamp(),
record.level(),
record.args()
)
})
.init();
}

View File

@@ -1 +1,4 @@
pub mod shared_store;
pub mod config;
pub mod toolbox;
pub mod logger;

9
src/utils/toolbox.rs Normal file
View File

@@ -0,0 +1,9 @@
pub fn number_of_cpus() -> usize {
match std::thread::available_parallelism() {
Ok(n) => n.get(),
Err(_) => {
eprintln!("Warning: Could not determine number of CPUs, defaulting to 1");
1
}
}
}