From e38dcfca5ad54af6e528f91fa281cd12ed19e516 Mon Sep 17 00:00:00 2001 From: Nell Date: Sat, 20 Jun 2026 16:44:40 +0200 Subject: [PATCH] Init --- Cargo.lock | 128 ++++++++++++++++++++++++++++----- Cargo.toml | 9 +-- migration/Cargo.toml | 2 +- src/core/mod.rs | 6 ++ src/core/state.rs | 4 ++ src/routes/gateway/handlers.rs | 121 +++++++++++++++++++++++++++++++ src/routes/gateway/mod.rs | 53 ++++++++++++++ src/routes/gateway/routes.rs | 8 +++ src/routes/mod.rs | 4 +- 9 files changed, 310 insertions(+), 25 deletions(-) create mode 100644 src/routes/gateway/handlers.rs create mode 100644 src/routes/gateway/mod.rs create mode 100644 src/routes/gateway/routes.rs diff --git a/Cargo.lock b/Cargo.lock index 47de885..7f7f815 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -535,6 +535,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "31b698c5f9a010f6573133b09e0de5408834d0c82f8d7475a89fc1867a71cd90" dependencies = [ "axum-core", + "base64", "bytes", "form_urlencoded", "futures-util", @@ -553,8 +554,10 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha1 0.10.6", "sync_wrapper", "tokio", + "tokio-tungstenite", "tower", "tower-layer", "tower-service", @@ -885,9 +888,9 @@ dependencies = [ [[package]] name = "config" -version = "0.15.23" +version = "0.15.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f316c6237b2d38be61949ecd15268a4c6ca32570079394a2444d9ce2c72a72d8" +checksum = "0b34d0237145f33580b89724f75d16950efd3e2c91b2d823917ecb69ec7f84f0" dependencies = [ "async-trait", "convert_case", @@ -1058,9 +1061,9 @@ checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" [[package]] name = "crypto-common" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" dependencies = [ "generic-array", "typenum", @@ -1119,6 +1122,12 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "data-encoding" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4ae5f15dda3c708c0ade84bfee31ccab44a3da4f88015ed22f63732abe300c8" + [[package]] name = "deranged" version = "0.5.8" @@ -1169,7 +1178,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer 0.10.4", - "crypto-common 0.1.6", + "crypto-common 0.1.7", ] [[package]] @@ -1437,6 +1446,17 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "futures-sink" version = "0.3.32" @@ -1457,6 +1477,7 @@ checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -1466,9 +1487,9 @@ dependencies = [ [[package]] name = "generic-array" -version = "0.14.9" +version = "0.14.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bb6743198531e02858aeaea5398fcc883e71851fcbcb5a2f773e2fb6cb1edf2" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" dependencies = [ "typenum", "version_check", @@ -2329,6 +2350,7 @@ dependencies = [ "chrono", "config", "event_bus", + "futures-util", "jsonwebtoken", "log", "migration", @@ -2705,10 +2727,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ca0ecfa931c29007047d1bc58e623ab12e5590e8c7cc53200d5202b69266d8a" dependencies = [ "libc", - "rand_chacha", + "rand_chacha 0.3.1", "rand_core 0.6.4", ] +[[package]] +name = "rand" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44c5af06bb1b7d3216d91932aed5265164bf384dc89cd6ba05cf59a35f5f76ea" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.5", +] + [[package]] name = "rand" version = "0.10.1" @@ -2730,6 +2762,16 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.5", +] + [[package]] name = "rand_core" version = "0.6.4" @@ -2739,6 +2781,15 @@ dependencies = [ "getrandom 0.2.17", ] +[[package]] +name = "rand_core" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76afc826de14238e6e8c374ddcc1fa19e374fd8dd986b0d2af0d02377261d83c" +dependencies = [ + "getrandom 0.3.4", +] + [[package]] name = "rand_core" version = "0.10.1" @@ -2980,9 +3031,9 @@ dependencies = [ [[package]] name = "sea-orm" -version = "2.0.0-rc.40" +version = "2.0.0-rc.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "628c3b6acb53ca9942f7f151431ed49db92dafa14d15976a1b9db9d4bd06431c" +checksum = "fd6f5a9c10d95ca8c4331692c9d3b30a578c3500d72b08f6350b89031e78f884" dependencies = [ "async-stream", "async-trait", @@ -3027,9 +3078,9 @@ dependencies = [ [[package]] name = "sea-orm-cli" -version = "2.0.0-rc.40" +version = "2.0.0-rc.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8353fb3cf1baef735e273d0342ae75f69d0723d1715cfbc5f16ff3875166e0d" +checksum = "7396d668fd1b45f7f0bc1837819f8d359dd5c0ef8383e0cf1abf8b31230f332a" dependencies = [ "chrono", "clap", @@ -3044,9 +3095,9 @@ dependencies = [ [[package]] name = "sea-orm-macros" -version = "2.0.0-rc.40" +version = "2.0.0-rc.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68a91def07bceb98aab308f7dd16c27496b76a6b7b92b94a61b309b5043d93d5" +checksum = "df86f2a6570c3169681af1035f8b159e8d5a1c034b72c19ee74c4858e4b6ec9e" dependencies = [ "heck 0.5.0", "itertools 0.14.0", @@ -3060,9 +3111,9 @@ dependencies = [ [[package]] name = "sea-orm-migration" -version = "2.0.0-rc.40" +version = "2.0.0-rc.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e932517f74c4fd73c629998861602e261ef024242577a44c1fc4c7dc315186f" +checksum = "1987e95367941bdabd0b3e126292395ee0adf6417fe26b83b574c6c986e8c1f8" dependencies = [ "async-trait", "clap", @@ -3237,6 +3288,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures 0.2.17", + "digest 0.10.7", +] + [[package]] name = "sha1" version = "0.11.0" @@ -3472,7 +3534,7 @@ dependencies = [ "percent-encoding", "rust_decimal", "serde", - "sha1", + "sha1 0.11.0", "sha2 0.11.0", "sqlx-core", "thiserror", @@ -3770,6 +3832,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f72a05e828585856dacd553fba484c242c46e391fb0e58917c942ee9202915c" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "toml" version = "1.1.2+spec-1.1.0" @@ -3930,6 +4004,22 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "tungstenite" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c01152af293afb9c7c2a57e4b559c5620b421f6d133261c60dd2d0cdb38e6b8" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand 0.9.4", + "sha1 0.10.6", + "thiserror", +] + [[package]] name = "typeid" version = "1.0.3" @@ -4067,9 +4157,9 @@ dependencies = [ [[package]] name = "uuid" -version = "1.23.2" +version = "1.23.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d258b83ceec21034727ecee8c382cfa6c3e133699b0742c64571814fb420c9f7" +checksum = "144d6b123cef80b301b8f72a9e2ca4370ddec21950d0a103dd22c437006d2db7" dependencies = [ "getrandom 0.4.2", "js-sys", diff --git a/Cargo.toml b/Cargo.toml index b53c3f1..732c7e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,16 +12,16 @@ members = [".", "migration", "event_bus"] [dependencies] tokio = { version = "1.52.3", features = ["full"] } -axum = "0.8" -config = "0.15.23" -sea-orm = { version = "2.0.0-rc.40", features = ["sqlx-sqlite", "sqlx-postgres", "sqlx-mysql", "runtime-tokio", "with-chrono", "with-uuid", "with-json", "schema-sync"] } +axum = { version = "0.8", features = ["ws"] } +config = "0.15.24" +sea-orm = { version = "2.0.0-rc.41", features = ["sqlx-sqlite", "sqlx-postgres", "sqlx-mysql", "runtime-tokio", "with-chrono", "with-uuid", "with-json", "schema-sync"] } migration = { path = "migration" } event_bus = { path = "event_bus" } parking_lot = "0.12.5" serde = "1.0.228" serde_json = "1.0.150" toml = "1.1.2" -uuid = { version = "1.23.2", features = ["v4", "v7", "fast-rng", "serde"] } +uuid = { version = "1.23.3", features = ["v4", "v7", "fast-rng", "serde"] } tracing = "0.1.44" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt", "time"] } thiserror = "2" @@ -37,3 +37,4 @@ chrono = "0.4.45" validator = { version = "0.20.0", features = ["derive"] } async-trait = "0.1.89" anyhow = "1.0.102" +futures-util = "0.3" diff --git a/migration/Cargo.toml b/migration/Cargo.toml index 7ba649f..f5eadae 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -12,7 +12,7 @@ path = "src/lib.rs" async-std = { version = "1", features = ["attributes", "tokio1"] } [dependencies.sea-orm-migration] -version = "2.0.0-rc.40" +version = "2.0.0-rc.41" features = [ # Enable at least one `ASYNC_RUNTIME` and `DATABASE_DRIVER` feature if you want to run migration via CLI. # View the list of supported features at https://www.sea-ql.org/SeaORM/docs/install-and-config/database-and-async-runtime. diff --git a/src/core/mod.rs b/src/core/mod.rs index a1af3f2..32fdcfa 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -5,6 +5,7 @@ use crate::database::Database; use crate::http::server::HttpServer; use crate::metrics::{reporter, AppMetrics}; use crate::repositories::Repositories; +use crate::routes::gateway::GatewayManager; use crate::udp::server::UdpServer; use event_bus::EventBus; use migration::{Migrator, MigratorTrait}; @@ -32,6 +33,9 @@ impl App { // Initialize shared repositories let repositories = Repositories::new(db.clone(), event_bus.clone()); + // Initialize gateway manager + let gateway = Arc::new(GatewayManager::default()); + // Init one server if no one exist let default_server = match repositories.server.get_default().await? { Some(server) => server, @@ -69,6 +73,8 @@ impl App { init_token: Arc::new(RwLock::new(init_token)), default_server: Arc::new(default_server), metrics, + gateway, + event_bus, }; Ok(Self { state }) diff --git a/src/core/state.rs b/src/core/state.rs index db30545..71d8e6f 100644 --- a/src/core/state.rs +++ b/src/core/state.rs @@ -2,6 +2,8 @@ use crate::config::AppConfig; use crate::metrics::AppMetrics; use crate::models::server; use crate::repositories::Repositories; +use crate::routes::gateway::GatewayManager; +use event_bus::EventBus; use sea_orm::DatabaseConnection; use std::sync::{Arc, RwLock}; @@ -13,6 +15,8 @@ pub struct AppState { pub init_token: Arc>>, pub default_server: Arc, pub metrics: AppMetrics, + pub gateway: Arc, + pub event_bus: Arc, } impl AppState {} diff --git a/src/routes/gateway/handlers.rs b/src/routes/gateway/handlers.rs new file mode 100644 index 0000000..d4f8c1f --- /dev/null +++ b/src/routes/gateway/handlers.rs @@ -0,0 +1,121 @@ +use crate::auth::token::verify_jwt; +use crate::core::AppState; +use crate::http::context::CurrentUser; +use crate::models::user::Model as User; +use crate::routes::gateway::GatewayClient; +use axum::{ + extract::{ + ws::{Message, WebSocket, WebSocketUpgrade}, Query, + State, + }, + response::IntoResponse, +}; +use futures_util::{sink::SinkExt, stream::StreamExt}; +use serde::Deserialize; +use tokio::sync::mpsc; +use uuid::Uuid; + +#[derive(Deserialize)] +pub struct WsQuery { + token: String, +} + +pub async fn ws_handler( + Query(query): Query, + ws: WebSocketUpgrade, + State(state): State, + CurrentUser(user): CurrentUser, +) -> impl IntoResponse { + ws.on_upgrade(move |socket| handle_socket(socket, state, user)) + + // let token = query.token; + + // Vérification du JWT + // match verify_jwt(&token, &state.config.jwt.secret) { + // Ok(claims) => ws.on_upgrade(move |socket| handle_socket(socket, state, claims.user_id)), + // Err(e) => { + // tracing::error!("WS auth error: {:?}", e); + // (axum::http::StatusCode::UNAUTHORIZED, "Invalid token").into_response() + // } + // } +} + +async fn handle_socket(socket: WebSocket, state: AppState, user: User) { + let (mut sender, mut receiver) = socket.split(); + let (tx, mut rx) = mpsc::unbounded_channel(); + let event_bus = state.event_bus.clone(); + + let client = GatewayClient::new(user); + client.on_connect().await; + state.gateway.add_user(client); + + // // Enregistrement du client (Connect) + // on_connect(user_id, tx, &state).await; + // + // // Task pour envoyer les messages du canal mpsc vers le WebSocket + // let mut send_task = tokio::spawn(async move { + // while let Some(message) = rx.recv().await { + // if sender.send(message).await.is_err() { + // break; + // } + // } + // }); + // + // // Task pour recevoir les messages du WebSocket + // let state_clone = state.clone(); + // let mut recv_task = tokio::spawn(async move { + // while let Some(Ok(message)) = receiver.next().await { + // on_message(user_id, message, &state_clone).await; + // } + // }); + // + // // Attente de la fin d'une des tâches (déconnexion) + // tokio::select! { + // _ = (&mut send_task) => recv_task.abort(), + // _ = (&mut recv_task) => send_task.abort(), + // }; + // + // // Déconnexion (Disconnect) + // on_disconnect(user_id, &state).await; +} + +pub async fn on_connect(user_id: Uuid, tx: mpsc::UnboundedSender, state: &AppState) { + tracing::info!("Client connected: {}", user_id); + let mut clients = state + .gateway + .clients + .write() + .expect("Failed to lock clients for writing"); + clients.insert(user_id, GatewayClient { user_id, tx }); +} + +pub async fn on_disconnect(user_id: Uuid, state: &AppState) { + tracing::info!("Client disconnected: {}", user_id); + let mut clients = state + .gateway + .clients + .write() + .expect("Failed to lock clients for writing"); + clients.remove(&user_id); +} + +pub async fn on_message(user_id: Uuid, message: Message, _state: &AppState) { + tracing::debug!("Message received from {}: {:?}", user_id, message); + + // Exemple d'utilisation de l'état/repositories + // let user_opt = state.repositories.user.get_by_id(user_id).await.ok().flatten(); + + match message { + Message::Text(text) => { + tracing::info!("Received text from {}: {}", user_id, text); + // Logique de dispatch ou de traitement ici + } + Message::Binary(_) => { + tracing::info!("Received binary from {}", user_id); + } + Message::Close(_) => { + tracing::info!("Received close from {}", user_id); + } + _ => {} + } +} diff --git a/src/routes/gateway/mod.rs b/src/routes/gateway/mod.rs new file mode 100644 index 0000000..76d27e8 --- /dev/null +++ b/src/routes/gateway/mod.rs @@ -0,0 +1,53 @@ +use crate::models::user::Model as User; +use parking_lot::RwLock; +use std::collections::HashMap; +use uuid::Uuid; + +pub mod handlers; +pub mod routes; + +#[derive(Debug, Default)] +pub struct GatewayManager { + // {UserID: {connection_id: GatewayClient}} + pub clients: RwLock>>, +} + +#[derive(Debug, Clone)] +pub struct GatewayClient { + user: User, + connection_id: Uuid, +} + +impl GatewayManager { + fn add_client(&self, gateway_client: GatewayClient) { + let mut clients = self.clients.write(); + let user_id = gateway_client.user.id; + clients + .entry(user_id) + .or_insert_with(HashMap::new) + .insert(gateway_client.connection_id, gateway_client); + } + + fn remove_client(&self, gateway_client: GatewayClient) { + let mut clients = self.clients.write(); + if let Some(client_list) = clients.get_mut(&gateway_client.user.id) { + client_list.remove(&gateway_client.connection_id); + } + } +} + +impl GatewayClient { + pub fn new(user: User) -> Self { + let connection_id = Uuid::new_v4(); + Self { + user, + connection_id, + } + } + + async fn on_connect(&self) {} + + async fn on_disconnect(&self) {} + + async fn on_message(&self) {} +} diff --git a/src/routes/gateway/routes.rs b/src/routes/gateway/routes.rs new file mode 100644 index 0000000..8f05510 --- /dev/null +++ b/src/routes/gateway/routes.rs @@ -0,0 +1,8 @@ +use super::handlers; +use crate::core::AppState; +use axum::routing::get; +use axum::Router; + +pub fn router() -> Router { + Router::new().route("/gateway", get(handlers::ws_handler)) +} diff --git a/src/routes/mod.rs b/src/routes/mod.rs index a228501..5c73551 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -9,6 +9,7 @@ pub mod auth; pub mod category; pub mod channel; pub mod core; +pub mod gateway; pub mod group; pub mod message; pub mod openapi; @@ -30,7 +31,8 @@ pub fn router() -> OxRouter { let api_routes = Router::new() .merge(secure_routes) .merge(auth::routes::router()) - .merge(core::routes::router()); + .merge(core::routes::router()) + .merge(gateway::routes::router()); Router::new() .nest("/api", api_routes)