This commit is contained in:
2025-12-14 12:53:25 +01:00
parent dbec2e9a74
commit 60bedab4a5
33 changed files with 1975 additions and 48 deletions

View File

@@ -1,23 +1,31 @@
use crate::app::AppState;
use crate::config::Config;
use crate::database::Database;
use crate::event_bus::EventBus;
use crate::network::http::HTTPServer;
use crate::network::udp::UDPServer;
use crate::repositories::Repositories;
pub struct App {
config: Config,
event_bus: EventBus,
db: Database,
repositories: Repositories,
udp_server: UDPServer,
http_server: HTTPServer
http_server: HTTPServer,
}
impl App {
pub async fn init(config: Config) -> Self {
let event_bus = EventBus::new(1024);
let db = Database::init(&config.database_url()).await.expect("Failed to initialize database");
let repositories = Repositories::new(db.get_connection(), event_bus.clone());
let state = AppState::new(db.clone());
let state = AppState{db: db.clone(), event_bus: event_bus.clone(), repositories: repositories.clone()};
// let state = AppState::new();
let udp_server = UDPServer::new(config.bind_addr());
@@ -25,9 +33,11 @@ impl App {
Self {
config,
event_bus,
db,
repositories,
udp_server,
http_server
http_server,
}
}

View File

@@ -1,14 +1,21 @@
use crate::database::Database;
use crate::event_bus::EventBus;
use crate::repositories::Repositories;
#[derive(Clone)]
pub struct AppState {
pub db: Database
pub db: Database,
pub event_bus: EventBus,
pub repositories: Repositories
}
impl AppState {
pub fn new(db: Database) -> Self {
Self { db }
}
// pub fn new(db: Database, event_bus: EventBus) -> Self {
// Self {
// db,
// event_bus
// }
// }
}
// #[derive(Clone)]

252
src/event_bus/bus.rs Normal file
View File

@@ -0,0 +1,252 @@
use parking_lot::RwLock;
use std::any::Any;
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
pub type DynPayload = Arc<dyn Any + Send + Sync>;
#[derive(Clone)]
pub struct EventBus {
capacity: usize,
topics: Arc<RwLock<HashMap<String, broadcast::Sender<DynPayload>>>>,
}
/// Receiver typé : il ne “voit” que les payloads qui downcastent en `T`.
/// Les autres messages du topic sont ignorés.
pub struct TypedReceiver<T> {
rx: broadcast::Receiver<DynPayload>,
_marker: std::marker::PhantomData<T>,
}
impl<T> TypedReceiver<T>
where
T: Any + Send + Sync + 'static,
{
/// Attend le prochain message du topic qui est bien de type `T`.
///
/// - Ignore silencieusement les messages dun autre type.
/// - Peut retourner `Lagged/Closed` comme un receiver broadcast normal.
pub async fn recv_typed(&mut self) -> Result<T, broadcast::error::RecvError>
where
T: Clone,
{
loop {
let payload = self.rx.recv().await?;
if let Some(v) = (&*payload).downcast_ref::<T>() {
return Ok(v.clone());
}
// sinon: mauvais type => on ignore et on attend le suivant
}
}
/// Accès au receiver brut si tu veux gérer toi-même.
pub fn into_inner(self) -> broadcast::Receiver<DynPayload> {
self.rx
}
}
impl EventBus {
pub fn new(capacity: usize) -> Self {
Self {
capacity,
topics: Arc::new(RwLock::new(HashMap::new())),
}
}
fn get_or_create_sender(&self, topic: &str) -> broadcast::Sender<DynPayload> {
// Fast-path read
if let Some(tx) = self.topics.read().get(topic) {
return tx.clone();
}
// Slow-path write
let mut map = self.topics.write();
if let Some(tx) = map.get(topic) {
return tx.clone();
}
let (tx, _) = broadcast::channel(self.capacity);
map.insert(topic.to_string(), tx.clone());
tx
}
/// Emit un évènement sur un topic.
///
/// # Exemple : emit un model directement (sans DbEvent)
/// ```rust
/// bus.emit("server-created", server_model);
/// ```
///
/// # Exemple : emit un DbEvent (payload dynamique)
/// ```rust
/// bus.emit("server-created", DbEvent::new(server_model));
/// ```
pub fn emit<T>(&self, topic: &str, payload: T)
where
T: Any + Send + Sync + 'static,
{
let tx = self.get_or_create_sender(topic);
let _ = tx.send(Arc::new(payload));
}
/// Receiver "brut" pour que tu gères toi-même la boucle / erreurs / filtrage.
///
/// # Exemple (loop custom)
/// ```rust
/// let mut rx = bus.receiver("websocket-connected");
/// tokio::spawn(async move {
/// loop {
/// match rx.recv().await {
/// Ok(payload) => { /* downcast ici */ }
/// Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
/// Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
/// }
/// }
/// });
/// ```
pub fn receiver(&self, topic: &str) -> broadcast::Receiver<DynPayload> {
self.get_or_create_sender(topic).subscribe()
}
/// S'abonner à un topic (niveau "brut" = payload dynamique).
/// Alias de `receiver`.
pub fn subscribe(&self, topic: &str) -> broadcast::Receiver<DynPayload> {
self.receiver(topic)
}
/// Receiver typé : ne “retourne” que des `T` via `recv_typed().await`.
///
/// # Exemple
/// ```rust
/// use crate::event_bus::EventBus;
/// use crate::models::server;
///
/// let bus = EventBus::new(1024);
/// let mut rx = bus.receiver_typed::<server::Model>("server-created");
///
/// tokio::spawn(async move {
/// loop {
/// match rx.recv_typed().await {
/// Ok(srv) => println!("server: {}", srv.name),
/// Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
/// Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
/// }
/// }
/// });
/// ```
pub fn receiver_typed<T>(&self, topic: &str) -> TypedReceiver<T>
where
T: Any + Send + Sync + 'static,
{
TypedReceiver {
rx: self.receiver(topic),
_marker: std::marker::PhantomData,
}
}
/// API ergonomique : handler appelé seulement si le payload est du type attendu.
///
/// # Exemple : écouter un model directement
/// ```rust
/// use crate::models::server;
/// bus.on::<server::Model, _>("server-created", |srv| {
/// println!("Nouveau server: {}", srv.name);
/// });
/// ```
///
/// # Exemple : écouter un DbEvent et downcast le model à l'intérieur
/// ```rust
/// use crate::event_bus::db::DbEvent;
/// use crate::models::server;
///
/// bus.on::<DbEvent, _>("server-created", |ev| {
/// if let Some(srv) = ev.model_ref::<server::Model>() {
/// println!("Server créé: {}", srv.name);
/// }
/// });
/// ```
///
/// # Exemple : écouter un évènement réseau
/// ```rust
/// use crate::event_bus::network::NetworkEvent;
///
/// bus.on::<NetworkEvent, _>("websocket-connected", |ev| {
/// if let NetworkEvent::WsConnected { peer } = ev {
/// println!("WS connected: {}", peer);
/// }
/// });
/// ```
pub fn on<T, F>(&self, topic: &str, handler: F) -> JoinHandle<()>
where
T: Any + Send + Sync + Clone + 'static,
F: Fn(T) + Send + Sync + 'static,
{
let mut rx = self.receiver(topic);
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(payload) => {
if let Some(typed) = (&*payload).downcast_ref::<T>() {
handler(typed.clone());
}
}
Err(broadcast::error::RecvError::Lagged(_)) => {}
Err(broadcast::error::RecvError::Closed) => break,
}
}
})
}
/// Version async-friendly : le handler est async, tu peux `await` dedans.
///
/// # Exemple : handler async (ex: requête DB / HTTP / WS broadcast)
/// ```rust
/// use crate::models::server;
///
/// bus.on_async::<server::Model, _, _>("server-created", |srv| async move {
/// // Ici tu peux faire du async
/// println!("(async) Server créé: {}", srv.name);
///
/// // Exemple fictif :
/// // my_http_client.post("/audit").json(&srv).send().await.unwrap();
/// });
/// ```
///
/// # Exemple : handler async + DbEvent interne
/// ```rust
/// use crate::event_bus::db::DbEvent;
/// use crate::models::server;
///
/// bus.on_async::<DbEvent, _, _>("server-created", |ev| async move {
/// if let Some(srv) = ev.model_ref::<server::Model>() {
/// println!("(async) Server créé: {}", srv.name);
/// }
/// });
/// ```
pub fn on_async<T, F, Fut>(&self, topic: &str, handler: F) -> JoinHandle<()>
where
T: Any + Send + Sync + Clone + 'static,
F: Fn(T) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let mut rx = self.receiver(topic);
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(payload) => {
if let Some(typed) = (&*payload).downcast_ref::<T>() {
handler(typed.clone()).await;
}
}
Err(broadcast::error::RecvError::Lagged(_)) => {}
Err(broadcast::error::RecvError::Closed) => break,
}
}
})
}
}

24
src/event_bus/db.rs Normal file
View File

@@ -0,0 +1,24 @@
use std::any::Any;
use std::sync::Arc;
pub type DynModel = Arc<dyn Any + Send + Sync>;
#[derive(Clone, Debug)]
pub struct DbEvent {
pub model: DynModel,
}
impl DbEvent {
pub fn new<T>(model: T) -> Self
where
T: Any + Send + Sync + 'static,
{
Self {
model: Arc::new(model),
}
}
pub fn model_ref<T: Any>(&self) -> Option<&T> {
(&*self.model).downcast_ref::<T>()
}
}

5
src/event_bus/mod.rs Normal file
View File

@@ -0,0 +1,5 @@
mod bus;
mod db;
mod network;
pub use bus::{DynPayload, EventBus};

9
src/event_bus/network.rs Normal file
View File

@@ -0,0 +1,9 @@
#[derive(Clone, Debug)]
pub enum NetworkEvent {
HttpRequest { method: String, path: String },
HttpResponse { status: u16, path: String },
UdpConnected { peer: String },
UdpDisconnected { peer: String },
WsConnected { peer: String },
WsDisconnected { peer: String },
}

View File

@@ -6,4 +6,5 @@ pub mod utils;
pub mod database;
pub mod models;
pub mod serializers;
pub mod repositories;
pub mod repositories;
pub mod event_bus;

View File

@@ -1,5 +1,6 @@
use std::sync::Arc;
use axum::{middleware, Router};
use tower_http::cors::CorsLayer;
use crate::app::AppState;
use crate::network::http::middleware::context_middleware;
use crate::network::http::{web, AppRouter};
@@ -7,6 +8,7 @@ use crate::network::http::{web, AppRouter};
pub fn setup_route(app_state: AppState) -> Router {
Router::new()
.merge(web::setup_route())
.layer(CorsLayer::permissive())
.layer(middleware::from_fn_with_state(app_state.clone(), context_middleware))
.with_state(app_state)
}

View File

@@ -2,7 +2,7 @@ use axum::Json;
use axum::extract::{Path, State};
use axum::http::StatusCode;
use axum::routing::{delete, get, post, put};
use sea_orm::{ActiveModelTrait, EntityTrait, IntoActiveModel};
use sea_orm::{IntoActiveModel};
use uuid::Uuid;
use crate::app::AppState;
use crate::models::server;
@@ -19,21 +19,18 @@ pub fn setup_route() -> AppRouter {
}
pub async fn server_list(
State(app_state): State<AppState>
State(state): State<AppState>
) -> Result<Json<Vec<ServerSerializer>>, HTTPError> {
let servers = server::Entity::find()
.all(app_state.db.get_connection())
.await?;
let servers = state.repositories.server.get_all().await?;
Ok(Json(servers.into_iter().map(ServerSerializer::from).collect()))
}
pub async fn server_detail(
State(app_state): State<AppState>,
State(state): State<AppState>,
Path(id): Path<Uuid>
) -> Result<Json<ServerSerializer>, HTTPError> {
let server = server::Entity::find_by_id(id)
.one(app_state.db.get_connection())
let server = state.repositories.server.get_by_id(id)
.await?
.ok_or(HTTPError::NotFound)?;
@@ -41,45 +38,40 @@ pub async fn server_detail(
}
pub async fn server_create(
State(app_state): State<AppState>,
State(state): State<AppState>,
Json(serializer): Json<ServerSerializer>
) -> Result<Json<ServerSerializer>, HTTPError> {
let active = serializer.into_active_model();
let server: server::Model = active.insert(app_state.db.get_connection()).await?;
let server = state.repositories.server.create(active).await?;
Ok(Json(ServerSerializer::from(server)))
}
pub async fn server_update(
State(app_state): State<AppState>,
State(state): State<AppState>,
Path(id): Path<Uuid>,
Json(serializer): Json<ServerSerializer>,
) -> Result<Json<ServerSerializer>, HTTPError> {
let server = server::Entity::find_by_id(id)
.one(app_state.db.get_connection())
let am_server = state.repositories.server
.get_by_id(id)
.await?
.ok_or(HTTPError::NotFound)?;
.ok_or(HTTPError::NotFound)?
.into_active_model();
let active = server.into_active_model();
let server: server::Model = serializer.apply_to_active_model(active)
.update(app_state.db.get_connection())
let server = state.repositories.server
.update(serializer.apply_to_active_model(am_server))
.await?;
Ok(Json(ServerSerializer::from(server)))
}
pub async fn server_delete(
State(app_state): State<AppState>,
State(state): State<AppState>,
Path(id): Path<Uuid>
) -> Result<StatusCode, HTTPError> {
let result = server::Entity::delete_by_id(id)
.exec(app_state.db.get_connection())
.await?;
if result.rows_affected == 0 {
Err(HTTPError::NotFound)
} else {
if state.repositories.server.delete(id).await? {
Ok(StatusCode::NO_CONTENT)
} else {
Err(HTTPError::NotFound)
}
}

View File

@@ -0,0 +1,5 @@
Ce module permet de :
- Gérer/simplifier les interactions avec la base de données
- Avoir un système de Signal qui se déclenche lors de modifications
dans la base de données (Création, Modification, Suppression)

View File

@@ -0,0 +1,33 @@
use std::sync::Arc;
use sea_orm::{DbErr, EntityTrait, ActiveModelTrait};
use crate::models::category;
use crate::repositories::RepositoryContext;
#[derive(Clone)]
pub struct CategoryRepository {
pub context: Arc<RepositoryContext>
}
impl CategoryRepository {
pub async fn get_by_id(&self, id: uuid::Uuid) -> Result<Option<category::Model>, DbErr> {
category::Entity::find_by_id(id).one(&self.context.db).await
}
pub async fn update(&self, active: category::ActiveModel) -> Result<category::Model, DbErr> {
let category = active.update(&self.context.db).await?;
self.context.events.emit("Category_updated", category.clone());
Ok(category)
}
pub async fn create(&self, active: category::ActiveModel) -> Result<category::Model, DbErr> {
let category = active.insert(&self.context.db).await?;
self.context.events.emit("Category_created", category.clone());
Ok(category)
}
pub async fn delete(&self, id: uuid::Uuid) -> Result<(), DbErr> {
category::Entity::delete_by_id(id).exec(&self.context.db).await?;
self.context.events.emit("Category_deleted", id);
Ok(())
}
}

View File

@@ -0,0 +1,33 @@
use std::sync::Arc;
use sea_orm::{DbErr, EntityTrait, ActiveModelTrait};
use crate::models::channel;
use crate::repositories::RepositoryContext;
#[derive(Clone)]
pub struct ChannelRepository {
pub context: Arc<RepositoryContext>
}
impl ChannelRepository {
pub async fn get_by_id(&self, id: uuid::Uuid) -> Result<Option<channel::Model>, DbErr> {
channel::Entity::find_by_id(id).one(&self.context.db).await
}
pub async fn update(&self, active: channel::ActiveModel) -> Result<channel::Model, DbErr> {
let channel = active.update(&self.context.db).await?;
self.context.events.emit("channel_updated", channel.clone());
Ok(channel)
}
pub async fn create(&self, active: channel::ActiveModel) -> Result<channel::Model, DbErr> {
let channel = active.insert(&self.context.db).await?;
self.context.events.emit("channel_created", channel.clone());
Ok(channel)
}
pub async fn delete(&self, id: uuid::Uuid) -> Result<(), DbErr> {
channel::Entity::delete_by_id(id).exec(&self.context.db).await?;
self.context.events.emit("channel_deleted", id);
Ok(())
}
}

View File

@@ -0,0 +1,33 @@
use std::sync::Arc;
use sea_orm::{DbErr, EntityTrait, ActiveModelTrait};
use crate::models::message;
use crate::repositories::RepositoryContext;
#[derive(Clone)]
pub struct MessageRepository {
pub context: Arc<RepositoryContext>
}
impl MessageRepository {
pub async fn get_by_id(&self, id: uuid::Uuid) -> Result<Option<message::Model>, DbErr> {
message::Entity::find_by_id(id).one(&self.context.db).await
}
pub async fn update(&self, active: message::ActiveModel) -> Result<message::Model, DbErr> {
let message = active.update(&self.context.db).await?;
self.context.events.emit("message_updated", message.clone());
Ok(message)
}
pub async fn create(&self, active: message::ActiveModel) -> Result<message::Model, DbErr> {
let message = active.insert(&self.context.db).await?;
self.context.events.emit("message_created", message.clone());
Ok(message)
}
pub async fn delete(&self, id: uuid::Uuid) -> Result<(), DbErr> {
message::Entity::delete_by_id(id).exec(&self.context.db).await?;
self.context.events.emit("message_deleted", id);
Ok(())
}
}

View File

@@ -1,13 +1,18 @@
use std::sync::Arc;
use sea_orm::DatabaseConnection;
use crate::event_bus::EventBus;
use crate::repositories::server::ServerRepository;
mod server;
mod category;
mod channel;
mod message;
mod user;
#[derive(Clone)]
pub struct RepositoryContext {
db: DatabaseConnection,
// pub events: EventBus, // si tu veux publier des events “post-save” plus tard
events: EventBus, // si tu veux publier des events “post-save” plus tard
}
#[derive(Clone)]
@@ -16,8 +21,8 @@ pub struct Repositories {
}
impl Repositories {
pub fn new(db: DatabaseConnection) -> Self {
let context = Arc::new(RepositoryContext { db });
pub fn new(db: &DatabaseConnection, events: EventBus) -> Self {
let context = Arc::new(RepositoryContext { db: db.clone(), events });
Self {
server: ServerRepository {context: context.clone()},

View File

@@ -9,25 +9,29 @@ pub struct ServerRepository {
}
impl ServerRepository {
pub async fn get_all(&self) -> Result<Vec<server::Model>, DbErr> {
server::Entity::find().all(&self.context.db).await
}
pub async fn get_by_id(&self, id: uuid::Uuid) -> Result<Option<server::Model>, DbErr> {
server::Entity::find_by_id(id).one(&self.context.db).await
}
pub async fn update(&self, active: server::ActiveModel) -> Result<server::Model, DbErr> {
let model = active.update(&self.context.db).await?;
// plus tard: self.context.events.publish(...)
Ok(model)
let server = active.update(&self.context.db).await?;
self.context.events.emit("server_updated", server.clone());
Ok(server)
}
pub async fn create(&self, active: server::ActiveModel) -> Result<server::Model, DbErr> {
let model = active.insert(&self.context.db).await?;
// plus tard: emit post-save
Ok(model)
let server = active.insert(&self.context.db).await?;
self.context.events.emit("server_created", server.clone());
Ok(server)
}
pub async fn delete(&self, id: uuid::Uuid) -> Result<(), DbErr> {
server::Entity::delete_by_id(id).exec(&self.context.db).await?;
// plus tard: emit post-delete
Ok(())
pub async fn delete(&self, id: uuid::Uuid) -> Result<bool, DbErr> {
let res = server::Entity::delete_by_id(id).exec(&self.context.db).await?;
self.context.events.emit("server_deleted", id);
Ok(res.rows_affected > 0)
}
}

33
src/repositories/user.rs Normal file
View File

@@ -0,0 +1,33 @@
use std::sync::Arc;
use sea_orm::{DbErr, EntityTrait, ActiveModelTrait};
use crate::models::user;
use crate::repositories::RepositoryContext;
#[derive(Clone)]
pub struct UserRepository {
pub context: Arc<RepositoryContext>
}
impl UserRepository {
pub async fn get_by_id(&self, id: uuid::Uuid) -> Result<Option<user::Model>, DbErr> {
user::Entity::find_by_id(id).one(&self.context.db).await
}
pub async fn update(&self, active: user::ActiveModel) -> Result<user::Model, DbErr> {
let user = active.update(&self.context.db).await?;
self.context.events.emit("user_updated", user.clone());
Ok(user)
}
pub async fn create(&self, active: user::ActiveModel) -> Result<user::Model, DbErr> {
let user = active.insert(&self.context.db).await?;
self.context.events.emit("user_created", user.clone());
Ok(user)
}
pub async fn delete(&self, id: uuid::Uuid) -> Result<(), DbErr> {
user::Entity::delete_by_id(id).exec(&self.context.db).await?;
self.context.events.emit("user_deleted", id);
Ok(())
}
}