This commit is contained in:
2026-01-05 00:03:26 +01:00
parent 1a4b706702
commit 5ada657ce7
11 changed files with 218 additions and 111 deletions

View File

@@ -1,5 +1,6 @@
<template>
<h1>Oxspeak</h1>
<ws />
<server_list />
<category_list />
@@ -11,6 +12,7 @@
import Server_list from "@/components/server/server_list.vue";
import Category_list from "@/components/category/category_list.vue";
import Channel_list from "@/components/channel/channel_list.vue";
import Ws from "@/components/ws.vue";
</script>
<style scoped></style>

View File

@@ -0,0 +1,17 @@
<script setup>
import {onMounted, ref} from "vue";
const messages = ref([])
onMounted(() => {
let ws = new WebSocket("ws://localhost:7000/handler/ws/");
})
</script>
<template>
<p></p>
</template>
<style scoped>
</style>

View File

@@ -2,7 +2,8 @@ use crate::app::AppState;
use crate::config::Config;
use crate::database::Database;
use crate::event_bus::EventBus;
use crate::network::http::{HTTPServer, WSHub};
use crate::hub::Clients;
use crate::network::http::HTTPServer;
use crate::network::udp::UDPServer;
use crate::repositories::Repositories;
@@ -29,7 +30,7 @@ impl App {
db: db.clone(),
event_bus: event_bus.clone(),
repositories: repositories.clone(),
ws_hub: WSHub::new()
clients: Clients::new()
};
let udp_server = UDPServer::new(config.bind_addr());

View File

@@ -1,6 +1,6 @@
use crate::database::Database;
use crate::event_bus::EventBus;
use crate::network::http::WSHub;
use crate::hub::Clients;
use crate::repositories::Repositories;
#[derive(Clone)]
@@ -8,7 +8,7 @@ pub struct AppState {
pub db: Database,
pub event_bus: EventBus,
pub repositories: Repositories,
pub ws_hub: WSHub,
pub clients: Clients,
}
impl AppState {

84
src/hub/client.rs Normal file
View File

@@ -0,0 +1,84 @@
use std::collections::HashMap;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use tokio::sync::mpsc;
use axum::extract::ws::Message;
use parking_lot::Mutex;
use tokio::task::JoinHandle;
use crate::app::AppState;
#[derive(Clone)]
pub struct Client {
pub user_id: uuid::Uuid,
pub connection_id: uuid::Uuid,
pub connected_at: DateTime<Utc>,
// tools
app_state: AppState,
sender: mpsc::Sender<Message>,
tasks: Arc<Mutex<HashMap<String, JoinHandle<()>>>>
}
impl Client {
pub fn new(
user_id: uuid::Uuid,
connection_id: uuid::Uuid,
app_state: AppState,
sender: mpsc::Sender<Message>
) -> Self {
Self {
user_id,
connection_id,
connected_at: Utc::now(),
app_state,
sender,
tasks: Arc::new(Mutex::new(HashMap::new()))
}
}
pub async fn send(&self, message: Message) {
let _ = self.sender.send(message).await;
}
pub async fn on_message(&self, message: Message) {
if let Message::Text(t) = message {
}
}
pub async fn on_disconnect(&self) {
self.clear_tasks().await;
}
pub async fn add_task(&self, name: &str, task: JoinHandle<()>) {
self.tasks.lock().insert(name.to_string(), task);
}
pub async fn remove_task(&self, name: &str) {
let task = self.tasks.lock().remove(name);
if let Some(task) = task {
task.abort();
let _ = task.await; // On peut await ici sans bloquer le Mutex
println!("Task {} cleaned up", name);
}
}
pub async fn clear_tasks(&self) {
// 1. On vide la map et on récupère toutes les tâches d'un coup
let mut tasks_to_clean = Vec::new();
{
let mut lock = self.tasks.lock();
for (name, task) in lock.drain() {
tasks_to_clean.push((name, task));
}
} // Le verrou est relâché ICI
// 2. On les nettoie une par une de manière asynchrone
for (name, task) in tasks_to_clean {
task.abort();
let _ = task.await;
println!("Task {} cleaned up", name);
}
}
}

28
src/hub/mod.rs Normal file
View File

@@ -0,0 +1,28 @@
mod client;
use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::RwLock;
use uuid::Uuid;
pub use client::Client;
#[derive(Clone)]
pub struct Clients {
pub connections: Arc<RwLock<HashMap<Uuid, Client>>>
}
impl Clients {
pub fn new() -> Self {
Self {
connections: Arc::new(RwLock::new(HashMap::new()))
}
}
pub fn add_client(&self, client: Client) {
self.connections.write().insert(client.connection_id, client);
}
pub fn remove_client(&self, connection_id: Uuid) {
self.connections.write().remove(&connection_id);
}
}

View File

@@ -8,3 +8,4 @@ pub mod models;
pub mod serializers;
pub mod repositories;
pub mod event_bus;
pub mod hub;

View File

@@ -11,6 +11,5 @@ mod context;
pub use server::HTTPServer;
pub use error::HTTPError;
pub use context::RequestContext;
pub use web::WSHub;
pub type AppRouter = Router<AppState>;

View File

@@ -1,102 +0,0 @@
use std::collections::HashMap;
use std::sync::Arc;
use axum::extract::{State, WebSocketUpgrade};
use axum::extract::ws::{Message, WebSocket};
use axum::response::IntoResponse;
use futures_util::{SinkExt, StreamExt};
use parking_lot::RwLock;
use tokio::sync::mpsc;
use uuid::Uuid;
use crate::app::AppState;
async fn ws_handler(
ws: WebSocketUpgrade,
State(app_state): State<AppState>
) -> impl IntoResponse {
// todo : récupérer le vrai id de l'utilisateur
let user_id = Uuid::new_v4();
ws.on_upgrade(move |socket| handle_socket(socket, app_state, user_id))
}
async fn handle_socket(socket: WebSocket, app_state: AppState, user_id: Uuid) {
// .split() sépare la lecture de l'écriture
let (mut ws_sender, mut ws_receiver) = socket.split();
let (tx, mut rx) = mpsc::channel::<Message>(100);
// ID unique pour CETTE connexion spécifique (un utilisateur peut en avoir plusieurs)
let connection_id = Uuid::new_v4();
// 1. Enregistrement du client (Gestion du Vec)
let client = ConnectedClient {
user_id,
connection_id,
sender: tx,
};
{
let mut clients = app_state.ws_hub.clients.write();
clients.entry(user_id).or_insert_with(Vec::new).push(client);
}
// 2. Tâche d'envoi (Hub -> Browser)
let mut send_task = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
if ws_sender.send(msg).await.is_err() {
break;
}
}
});
// 3. Tâche de réception (Browser -> Server)
let mut recv_task = tokio::spawn(async move {
while let Some(result) = ws_receiver.next().await {
if let Ok(msg) = result {
if let Message::Close(_) = msg { break; }
// Ici vous pourriez dispatcher les messages vers l'event_bus
} else {
break;
}
}
});
// 4. Attente de fin
tokio::select! {
_ = (&mut send_task) => recv_task.abort(),
_ = (&mut recv_task) => send_task.abort(),
};
// 5. Nettoyage propre : on retire seulement CETTE connexion
let mut clients = app_state.ws_hub.clients.write();
if let Some(user_conns) = clients.get_mut(&user_id) {
user_conns.retain(|c| c.connection_id != connection_id);
// Optionnel : si le Vec est vide, on peut supprimer l'entrée pour libérer de la mémoire
if user_conns.is_empty() {
clients.remove(&user_id);
}
}
println!("Connexion {} de l'utilisateur {} fermée.", connection_id, user_id);
}
/// Représente une connexion active
pub struct ConnectedClient {
pub user_id: Uuid,
pub connection_id: Uuid,
pub sender: mpsc::Sender<Message>,
}
/// Gestionnaire global des connexions WebSocket
#[derive(Clone)]
pub struct WSHub {
pub clients: Arc<RwLock<HashMap<Uuid, Vec<ConnectedClient>>>>,
}
impl WSHub {
pub fn new() -> Self {
Self {
clients: Arc::new(RwLock::new(HashMap::new()))
}
}
}

View File

@@ -3,11 +3,10 @@ use crate::app::AppState;
use crate::network::http::AppRouter;
mod api;
mod hub;
pub use hub::WSHub;
mod ws_handler;
pub fn setup_route() -> AppRouter {
AppRouter::new()
.nest("/api", api::setup_route())
.nest("/handler", ws_handler::setup_route())
}

View File

@@ -0,0 +1,78 @@
use std::collections::HashMap;
use std::sync::Arc;
use axum::extract::{State, WebSocketUpgrade};
use axum::extract::ws::{Message, WebSocket};
use axum::response::IntoResponse;
use axum::routing::{delete, get, post, put};
use futures_util::{SinkExt, StreamExt};
use parking_lot::RwLock;
use tokio::sync::mpsc;
use uuid::Uuid;
use crate::app::AppState;
use crate::hub::Client;
use crate::network::http::AppRouter;
pub fn setup_route() -> AppRouter {
AppRouter::new()
.route("/ws/", get(ws_handler))
}
async fn ws_handler(
ws: WebSocketUpgrade,
State(app_state): State<AppState>
) -> impl IntoResponse {
// todo : récupérer le vrai id de l'utilisateur
println!("Nouvelle connexion WebSocket !");
let user_id = Uuid::new_v4();
ws.on_upgrade(move |socket| handle_socket(socket, user_id, app_state))
}
async fn handle_socket(socket: WebSocket, user_id: Uuid, app_state: AppState) {
println!("1Nouvelle session pour le client {}", user_id);
// .split() sépare la lecture de l'écriture
let (mut ws_sender, mut ws_receiver) = socket.split();
let (tx, mut rx) = mpsc::channel::<Message>(100);
// ID unique pour CETTE connexion spécifique (un utilisateur peut en avoir plusieurs)
let connection_id = Uuid::new_v4();
// 1. Création de l'instance Client (ton objet intelligent)
let client = Client::new(user_id, connection_id, app_state.clone(), tx);
// 2. Enregistrement dans le Hub global pour être visible des autres
app_state.clients.add_client(client.clone());
// 3. Tâche d'ENVOI : On écoute la boîte aux lettres et on pousse vers le navigateur
let mut send_task = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
if ws_sender.send(msg).await.is_err() {
break; // Socket fermée
}
}
});
// 4. Tâche de RÉCEPTION : On écoute le navigateur et on délègue au Client
let client_for_recv = client.clone();
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(msg)) = ws_receiver.next().await {
if let Message::Close(_) = msg {
break;
}
// On utilise la méthode métier que tu as définie !
client_for_recv.on_message(msg).await;
}
});
println!("2Nouvelle session pour le client {}", connection_id);
// 5. Attente de la fin de session (Bloque ici jusqu'à la déco)
tokio::select! {
_ = (&mut send_task) => (),
_ = (&mut recv_task) => (),
};
// 6. Nettoyage
client.on_disconnect().await;
app_state.clients.remove_client(connection_id);
println!("Session terminée pour le client {}", connection_id);
}