diff --git a/Cargo.lock b/Cargo.lock index 545d2bc..87a3098 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1019,6 +1019,15 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "event_bus" +version = "0.1.0" +dependencies = [ + "glob", + "parking_lot", + "tokio", +] + [[package]] name = "fastrand" version = "2.4.1" @@ -1906,6 +1915,7 @@ name = "oxspeak_server" version = "0.1.0" dependencies = [ "config", + "event_bus", "glob", "log", "migration", diff --git a/Cargo.toml b/Cargo.toml index 2ced443..87da2e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ tokio = { version = "1.52.1", features = ["full"] } config = "0.15.22" sea-orm = { version = "2.0.0-rc.38", features = ["sqlx-sqlite", "sqlx-postgres", "sqlx-mysql", "runtime-tokio", "with-chrono", "with-uuid", "with-json", "schema-sync"] } migration = { path = "migration" } -event-bus = { path = "event_bus" } +event_bus = { path = "event_bus" } parking_lot = "0.12.5" serde = "1.0.228" serde_json = "1.0.149" diff --git a/event_bus/Cargo.toml b/event_bus/Cargo.toml new file mode 100644 index 0000000..58fa0c4 --- /dev/null +++ b/event_bus/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "event_bus" +version = "0.1.0" +edition = "2024" +publish = false + +[lib] +name = "event_bus" +path = "src/lib.rs" + +[dependencies] +tokio = { version = "1.52.1", default-features = false, features = ["rt", "sync"] } +glob = "0.3.3" +parking_lot = "0.12.5" + +[dev-dependencies] +tokio = { version = "1.52.1", default-features = false, features = ["rt", "rt-multi-thread", "macros", "time", "sync"] } \ No newline at end of file diff --git a/event_bus/event_bus/Cargo.toml b/event_bus/event_bus/Cargo.toml deleted file mode 100644 index 204bc91..0000000 --- a/event_bus/event_bus/Cargo.toml +++ /dev/null @@ -1,11 +0,0 @@ -[package] -name = "event-bus" -version = "0.1.0" -edition = "2024" -publish = false - -[lib] -name = "migration" -path = "src/lib.rs" - -[dependencies] diff --git a/event_bus/event_bus/src/mod.rs b/event_bus/event_bus/src/mod.rs deleted file mode 100644 index a6b3f5e..0000000 --- a/event_bus/event_bus/src/mod.rs +++ /dev/null @@ -1,458 +0,0 @@ -//! # Event Bus -//! -//! Un bus d'événements asynchrone permettant de faire transiter des messages typés -//! entre plusieurs modules, sans couplage direct. -//! -//! ## Caractéristiques -//! -//! - **Association clé → événement** : chaque topic (`&str`) est indépendant -//! - **Sans restriction de type** : n'importe quel `T: Any + Send + Sync + Clone` -//! - **Wake-up ciblé** : seuls les abonnés du bon topic sont réveillés -//! - **API callback** : style JavaScript — `bus.on("topic", |payload| { ... })` -//! - **Pattern glob** : `on_pattern("user-*", |topic, payload| { ... })` -//! - **Handlers async** : `on_async` et `on_pattern_async` -//! -//! ## Exemple — callback sync -//! -//! ```rust,no_run -//! use std::sync::Arc; -//! use oxspeak_server_lib::event_bus::EventBus; -//! -//! #[derive(Clone, Debug)] -//! struct User { name: String } -//! -//! # tokio_test::block_on(async { -//! let bus = Arc::new(EventBus::new()); -//! -//! bus.on::("user-connected", |user| { -//! println!("Connecté : {:?}", user); -//! }); -//! -//! bus.emit("user-connected", User { name: "Alice".into() }); -//! # tokio::time::sleep(std::time::Duration::from_millis(10)).await; -//! # }); -//! ``` -//! -//! ## Exemple — callback async -//! -//! ```rust,no_run -//! use std::sync::Arc; -//! use oxspeak_server_lib::event_bus::EventBus; -//! -//! #[derive(Clone, Debug)] -//! struct User { name: String } -//! -//! # tokio_test::block_on(async { -//! let bus = Arc::new(EventBus::new()); -//! -//! bus.on_async::("user-connected", |user| async move { -//! println!("(async) Connecté : {:?}", user); -//! }); -//! -//! bus.emit("user-connected", User { name: "Bob".into() }); -//! # tokio::time::sleep(std::time::Duration::from_millis(10)).await; -//! # }); -//! ``` -//! -//! ## Exemple — pattern glob (topic inclus dans le callback) -//! -//! ```rust,no_run -//! use std::sync::Arc; -//! use oxspeak_server_lib::event_bus::EventBus; -//! -//! #[derive(Clone, Debug)] -//! struct User { name: String } -//! -//! # tokio_test::block_on(async { -//! let bus = Arc::new(EventBus::new()); -//! -//! bus.on_pattern::("user-*", |topic, user| { -//! match topic.as_str() { -//! "user-created" => println!("Créé : {:?}", user), -//! "user-deleted" => println!("Supprimé : {:?}", user), -//! other => println!("{}: {:?}", other, user), -//! } -//! }); -//! -//! bus.emit("user-created", User { name: "Alice".into() }); -//! bus.emit("user-deleted", User { name: "Bob".into() }); -//! # tokio::time::sleep(std::time::Duration::from_millis(10)).await; -//! # }); -//! ``` -//! -//! ## Exemple — multi-types avec `match_event!` (cas avancé) -//! -//! ```rust,no_run -//! use std::sync::Arc; -//! use oxspeak_server_lib::event_bus::EventBus; -//! use oxspeak_server_lib::match_event; -//! -//! #[derive(Clone, Debug)] struct User { name: String } -//! #[derive(Clone, Debug)] struct UdpMetric { value: f32 } -//! -//! # tokio_test::block_on(async { -//! let bus = Arc::new(EventBus::new()); -//! let mut rx = bus.on_raw("mixed-topic"); -//! -//! bus.emit("mixed-topic", User { name: "Alice".into() }); -//! -//! if let Ok(evt) = rx.recv().await { -//! match_event!(evt, -//! User => |u| println!("User: {:?}", u), -//! UdpMetric => |m| println!("Metric: {:?}", m), -//! ); -//! } -//! # }); -//! ``` - -mod bus; - -// Réexports publics -pub use bus::{AnyEvent, EventBus}; - -/// Downcaste un [`AnyEvent`] vers un ou plusieurs types concrets et exécute -/// la closure correspondante si le type correspond. -/// -/// Les branches non correspondantes sont ignorées silencieusement. -/// -/// # Syntaxe -/// ```text -/// match_event!(evt, Type1 => |val| { ... }, Type2 => |val| { ... }) -/// ``` -/// -/// # Exemple -/// ```rust,no_run -/// # use std::sync::Arc; -/// # use oxspeak_server_lib::event_bus::EventBus; -/// # use oxspeak_server_lib::match_event; -/// # #[derive(Clone, Debug)] struct User { name: String } -/// # #[derive(Clone, Debug)] struct UdpMetric { value: f32 } -/// # let bus = Arc::new(EventBus::new()); -/// # tokio_test::block_on(async { -/// let mut rx = bus.on_raw("user-connected"); -/// bus.emit("user-connected", User { name: "Alice".into() }); -/// -/// if let Ok(evt) = rx.recv().await { -/// match_event!(evt, -/// User => |u| println!("Utilisateur : {:?}", u), -/// UdpMetric => |m| println!("Metric : {:?}", m), -/// ); -/// } -/// # }); -/// ``` -#[macro_export] -macro_rules! match_event { - ($evt:expr, $($type:ty => $handler:expr),+ $(,)?) => { - $( - if let Some(val) = ($evt).downcast_ref::<$type>() { - ($handler)(val.clone()); - } else - )+ - { - // Aucun type ne correspond → on ignore silencieusement - } - }; -} - -// ───────────────────────────────────────────────────────────────────────────── -// Tests -// ───────────────────────────────────────────────────────────────────────────── - -#[cfg(test)] -mod tests { - use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; - use std::sync::Arc; - - use super::*; - - #[derive(Clone, Debug, PartialEq)] - struct User { - name: String, - } - - #[derive(Clone, Debug, PartialEq)] - struct UdpMetric { - value: f32, - } - - // ── on (callback sync) ──────────────────────────────────────────────────── - - #[tokio::test] - async fn test_on_callback_sync() { - let bus = Arc::new(EventBus::new()); - let received = Arc::new(AtomicBool::new(false)); - let flag = Arc::clone(&received); - - bus.on::("user-connected", move |user| { - if user.name == "Alice" { - flag.store(true, Ordering::SeqCst); - } - }); - - bus.emit( - "user-connected", - User { - name: "Alice".into(), - }, - ); - tokio::time::sleep(std::time::Duration::from_millis(20)).await; - - assert!(received.load(Ordering::SeqCst)); - } - - #[tokio::test] - async fn test_on_targeted_wakeup() { - // Émettre sur "user-connected" ne doit pas réveiller "udp-metrics-updated" - let bus = Arc::new(EventBus::new()); - let metric_called = Arc::new(AtomicBool::new(false)); - let flag = Arc::clone(&metric_called); - - bus.on::("udp-metrics-updated", move |_| { - flag.store(true, Ordering::SeqCst); - }); - - bus.emit( - "user-connected", - User { - name: "Carol".into(), - }, - ); - tokio::time::sleep(std::time::Duration::from_millis(20)).await; - - assert!(!metric_called.load(Ordering::SeqCst)); - } - - #[tokio::test] - async fn test_on_type_mismatch_ignored() { - // Émettre un UdpMetric sur un topic écouté en User → handler pas appelé - let bus = Arc::new(EventBus::new()); - let called = Arc::new(AtomicBool::new(false)); - let flag = Arc::clone(&called); - - bus.on::("mixed-topic", move |_| { - flag.store(true, Ordering::SeqCst); - }); - - bus.emit("mixed-topic", UdpMetric { value: 1.0 }); - tokio::time::sleep(std::time::Duration::from_millis(20)).await; - - assert!(!called.load(Ordering::SeqCst)); - } - - #[tokio::test] - async fn test_on_multiple_subscribers_same_topic() { - let bus = Arc::new(EventBus::new()); - let count = Arc::new(AtomicU32::new(0)); - - for _ in 0..3 { - let c = Arc::clone(&count); - bus.on::("user-connected", move |_| { - c.fetch_add(1, Ordering::SeqCst); - }); - } - - bus.emit( - "user-connected", - User { - name: "Grace".into(), - }, - ); - tokio::time::sleep(std::time::Duration::from_millis(20)).await; - - assert_eq!(count.load(Ordering::SeqCst), 3); - } - - // ── on_async ────────────────────────────────────────────────────────────── - - #[tokio::test] - async fn test_on_async_callback() { - let bus = Arc::new(EventBus::new()); - let received = Arc::new(AtomicBool::new(false)); - let flag = Arc::clone(&received); - - bus.on_async::("user-connected", move |user| { - let f = Arc::clone(&flag); - async move { - if user.name == "Async" { - f.store(true, Ordering::SeqCst); - } - } - }); - - bus.emit( - "user-connected", - User { - name: "Async".into(), - }, - ); - tokio::time::sleep(std::time::Duration::from_millis(20)).await; - - assert!(received.load(Ordering::SeqCst)); - } - - // ── on_pattern ──────────────────────────────────────────────────────────── - - #[tokio::test] - async fn test_on_pattern_callback_receives_topic_and_payload() { - let bus = Arc::new(EventBus::new()); - let created = Arc::new(AtomicBool::new(false)); - let deleted = Arc::new(AtomicBool::new(false)); - let c = Arc::clone(&created); - let d = Arc::clone(&deleted); - - bus.on_pattern::("user-*", move |topic, user| match topic.as_str() { - "user-created" if user.name == "Dave" => c.store(true, Ordering::SeqCst), - "user-deleted" if user.name == "Eve" => d.store(true, Ordering::SeqCst), - _ => {} - }); - - bus.emit( - "user-created", - User { - name: "Dave".into(), - }, - ); - bus.emit("user-deleted", User { name: "Eve".into() }); - tokio::time::sleep(std::time::Duration::from_millis(30)).await; - - assert!(created.load(Ordering::SeqCst)); - assert!(deleted.load(Ordering::SeqCst)); - } - - #[tokio::test] - async fn test_on_pattern_does_not_match_other_topics() { - let bus = Arc::new(EventBus::new()); - let called = Arc::new(AtomicBool::new(false)); - let flag = Arc::clone(&called); - - bus.on_pattern::("user-*", move |_, _| { - flag.store(true, Ordering::SeqCst); - }); - - bus.emit( - "server-created", - User { - name: "Ghost".into(), - }, - ); - tokio::time::sleep(std::time::Duration::from_millis(20)).await; - - assert!(!called.load(Ordering::SeqCst)); - } - - #[tokio::test] - async fn test_on_pattern_no_pre_registration_needed() { - // Le pattern est enregistré avant que le topic n'existe - let bus = Arc::new(EventBus::new()); - let received = Arc::new(AtomicBool::new(false)); - let flag = Arc::clone(&received); - - bus.on_pattern::("user-*", move |topic, user| { - if topic == "user-new-topic" && user.name == "Frank" { - flag.store(true, Ordering::SeqCst); - } - }); - - bus.emit( - "user-new-topic", - User { - name: "Frank".into(), - }, - ); - tokio::time::sleep(std::time::Duration::from_millis(20)).await; - - assert!(received.load(Ordering::SeqCst)); - } - - // ── on_pattern_async ────────────────────────────────────────────────────── - - #[tokio::test] - async fn test_on_pattern_async_callback() { - let bus = Arc::new(EventBus::new()); - let received = Arc::new(AtomicBool::new(false)); - let flag = Arc::clone(&received); - - bus.on_pattern_async::("user-*", move |topic, user| { - let f = Arc::clone(&flag); - async move { - if topic == "user-created" && user.name == "Hank" { - f.store(true, Ordering::SeqCst); - } - } - }); - - bus.emit( - "user-created", - User { - name: "Hank".into(), - }, - ); - tokio::time::sleep(std::time::Duration::from_millis(20)).await; - - assert!(received.load(Ordering::SeqCst)); - } - - // ── on_raw + match_event! ───────────────────────────────────────────────── - - #[tokio::test] - async fn test_on_raw_and_match_event_macro() { - let bus = Arc::new(EventBus::new()); - let mut rx = bus.on_raw("user-connected"); - - bus.emit( - "user-connected", - User { - name: "Frank".into(), - }, - ); - - let evt = rx.recv().await.unwrap(); - let mut received_name = String::new(); - match_event!(evt, - User => |u: User| { received_name = u.name.clone(); }, - UdpMetric => |_m: UdpMetric| { panic!("mauvais type"); } - ); - assert_eq!(received_name, "Frank"); - } - - // ── Utilitaires ──────────────────────────────────────────────────────────── - - #[test] - fn test_topics_list() { - let bus = EventBus::new(); - // on_raw enregistre le canal (get_or_create) - let _rx1 = bus.on_raw("user-connected"); - let _rx2 = bus.on_raw("udp-metrics-updated"); - - let mut topics = bus.topics(); - topics.sort(); - assert_eq!(topics, vec!["udp-metrics-updated", "user-connected"]); - } - - #[tokio::test] - async fn test_emit_multiple_types_same_bus() { - let bus = Arc::new(EventBus::new()); - let user_ok = Arc::new(AtomicBool::new(false)); - let metric_ok = Arc::new(AtomicBool::new(false)); - let u = Arc::clone(&user_ok); - let m = Arc::clone(&metric_ok); - - bus.on::("user-connected", move |user| { - if user.name == "Bob" { - u.store(true, Ordering::SeqCst); - } - }); - bus.on::("udp-metrics-updated", move |metric| { - if (metric.value - 3.14).abs() < 0.001 { - m.store(true, Ordering::SeqCst); - } - }); - - bus.emit("user-connected", User { name: "Bob".into() }); - bus.emit("udp-metrics-updated", UdpMetric { value: 3.14 }); - tokio::time::sleep(std::time::Duration::from_millis(20)).await; - - assert!(user_ok.load(Ordering::SeqCst)); - assert!(metric_ok.load(Ordering::SeqCst)); - } -} diff --git a/event_bus/event_bus/src/receiver.rs b/event_bus/event_bus/src/receiver.rs deleted file mode 100644 index 99e9f52..0000000 --- a/event_bus/event_bus/src/receiver.rs +++ /dev/null @@ -1,199 +0,0 @@ -use std::any::Any; -use std::sync::Arc; - -use tokio::sync::broadcast; - -/// Type brut d'un événement : pointeur atomique vers n'importe quelle valeur. -pub type AnyEvent = Arc; - -// ───────────────────────────────────────────────────────────────────────────── -// Erreurs -// ───────────────────────────────────────────────────────────────────────────── - -/// Erreurs possibles lors d'un `recv().await`. -#[derive(Debug, thiserror::Error)] -pub enum RecvError { - #[error("le canal est fermé")] - Closed, - #[error("messages perdus (lag) : {0} ignorés")] - Lagged(u64), -} - -/// Erreurs possibles lors d'un `try_recv()`. -#[derive(Debug, thiserror::Error)] -pub enum TryRecvError { - #[error("aucun message disponible")] - Empty, - #[error("le canal est fermé")] - Closed, - #[error("messages perdus (lag) : {0} ignorés")] - Lagged(u64), -} - -impl From for RecvError { - fn from(e: broadcast::error::RecvError) -> Self { - match e { - broadcast::error::RecvError::Closed => RecvError::Closed, - broadcast::error::RecvError::Lagged(n) => RecvError::Lagged(n), - } - } -} - -impl From for TryRecvError { - fn from(e: broadcast::error::TryRecvError) -> Self { - match e { - broadcast::error::TryRecvError::Empty => TryRecvError::Empty, - broadcast::error::TryRecvError::Closed => TryRecvError::Closed, - broadcast::error::TryRecvError::Lagged(n) => TryRecvError::Lagged(n), - } - } -} - -// ───────────────────────────────────────────────────────────────────────────── -// TypedReceiver -// ───────────────────────────────────────────────────────────────────────────── - -/// Receiver typé pour un topic précis. -/// -/// `recv().await` retourne directement un `T` — le downcast est automatique. -/// Les événements d'un type différent sont ignorés silencieusement. -/// -/// # Exemple -/// ```rust,no_run -/// # use std::sync::Arc; -/// # use oxspeak_server_lib::event_bus::EventBus; -/// # #[derive(Clone, Debug)] struct User { name: String } -/// # let bus = Arc::new(EventBus::new()); -/// # tokio_test::block_on(async { -/// let mut rx = bus.on::("user-connected"); -/// bus.emit("user-connected", User { name: "Alice".into() }); -/// let user = rx.recv().await.unwrap(); -/// println!("{:?}", user); -/// # }); -/// ``` -pub struct TypedReceiver { - pub(crate) inner: broadcast::Receiver, - pub(crate) _marker: std::marker::PhantomData, -} - -impl TypedReceiver -where - T: Any + Send + Sync + Clone + 'static, -{ - pub(crate) fn new(inner: broadcast::Receiver) -> Self { - Self { - inner, - _marker: std::marker::PhantomData, - } - } - - /// Attend le prochain événement de type `T` sur ce topic. - /// Les événements d'un autre type sont ignorés silencieusement. - pub async fn recv(&mut self) -> Result { - loop { - match self.inner.recv().await { - Ok(evt) => { - if let Some(val) = evt.downcast_ref::() { - return Ok(val.clone()); - } - // Mauvais type → on ignore et on attend le suivant - } - Err(e) => return Err(e.into()), - } - } - } - - /// Version non-bloquante. - pub fn try_recv(&mut self) -> Result { - loop { - match self.inner.try_recv() { - Ok(evt) => { - if let Some(val) = evt.downcast_ref::() { - return Ok(val.clone()); - } - } - Err(broadcast::error::TryRecvError::Empty) => return Err(TryRecvError::Empty), - Err(e) => return Err(e.into()), - } - } - } - - /// Accès au receiver brut sous-jacent. - pub fn into_inner(self) -> broadcast::Receiver { - self.inner - } -} - -// ───────────────────────────────────────────────────────────────────────────── -// PatternReceiver -// ───────────────────────────────────────────────────────────────────────────── - -/// Receiver unique pour un pattern wildcard (ex: `"user-*"`). -/// -/// Un seul receiver reçoit les événements de **tous** les topics correspondants, -/// passés et futurs. `recv().await` retourne `(nom_du_topic, valeur)`. -/// -/// # Exemple -/// ```rust,no_run -/// # use std::sync::Arc; -/// # use oxspeak_server_lib::event_bus::EventBus; -/// # #[derive(Clone, Debug)] struct User { name: String } -/// # let bus = Arc::new(EventBus::new()); -/// # tokio_test::block_on(async { -/// let mut rx = bus.on_pattern::("user-*"); -/// -/// bus.emit("user-created", User { name: "Alice".into() }); -/// bus.emit("user-deleted", User { name: "Bob".into() }); -/// -/// let (topic, user) = rx.recv().await.unwrap(); -/// println!("{}: {:?}", topic, user); -/// # }); -/// ``` -pub struct PatternReceiver { - pub(crate) inner: broadcast::Receiver<(String, AnyEvent)>, - pub(crate) _marker: std::marker::PhantomData, -} - -impl PatternReceiver -where - T: Any + Send + Sync + Clone + 'static, -{ - pub(crate) fn new(inner: broadcast::Receiver<(String, AnyEvent)>) -> Self { - Self { - inner, - _marker: std::marker::PhantomData, - } - } - - /// Attend le prochain événement de type `T` sur n'importe quel topic du pattern. - /// Retourne `(nom_du_topic, valeur)`. - /// Les événements d'un type différent sont ignorés silencieusement. - pub async fn recv(&mut self) -> Result<(String, T), RecvError> { - loop { - match self.inner.recv().await { - Ok((topic, evt)) => { - if let Some(val) = evt.downcast_ref::() { - return Ok((topic, val.clone())); - } - // Mauvais type → on ignore et on attend le suivant - } - Err(e) => return Err(e.into()), - } - } - } - - /// Version non-bloquante. - pub fn try_recv(&mut self) -> Result<(String, T), TryRecvError> { - loop { - match self.inner.try_recv() { - Ok((topic, evt)) => { - if let Some(val) = evt.downcast_ref::() { - return Ok((topic, val.clone())); - } - } - Err(broadcast::error::TryRecvError::Empty) => return Err(TryRecvError::Empty), - Err(e) => return Err(e.into()), - } - } - } -} diff --git a/event_bus/event_bus/src/wildcard.rs b/event_bus/event_bus/src/wildcard.rs deleted file mode 100644 index 0922a65..0000000 --- a/event_bus/event_bus/src/wildcard.rs +++ /dev/null @@ -1,66 +0,0 @@ -/// Vérifie si `topic` correspond au `pattern` avec support des wildcards. -/// -/// - `*` correspond à n'importe quelle séquence de caractères (y compris vide) -/// - `?` correspond à exactement un caractère quelconque -/// -/// # Exemples -/// ``` -/// use oxspeak_server_lib::event_bus::wildcard_match; -/// -/// assert!(wildcard_match("user-*", "user-connected")); -/// assert!(wildcard_match("user-*", "user-created")); -/// assert!(!wildcard_match("user-*", "udp-metrics-updated")); -/// assert!(wildcard_match("*-updated", "udp-metrics-updated")); -/// assert!(wildcard_match("user-?", "user-a")); -/// assert!(!wildcard_match("user-?", "user-ab")); -/// ``` -pub fn wildcard_match(pattern: &str, topic: &str) -> bool { - let p: Vec = pattern.chars().collect(); - let t: Vec = topic.chars().collect(); - wildcard_match_inner(&p, &t) -} - -fn wildcard_match_inner(pattern: &[char], topic: &[char]) -> bool { - match (pattern.first(), topic.first()) { - (None, None) => true, - (Some(&'*'), _) => { - // '*' peut correspondre à 0 ou plusieurs caractères - wildcard_match_inner(&pattern[1..], topic) - || (!topic.is_empty() && wildcard_match_inner(pattern, &topic[1..])) - } - (Some(&'?'), Some(_)) => wildcard_match_inner(&pattern[1..], &topic[1..]), - (Some(p), Some(t)) if p == t => wildcard_match_inner(&pattern[1..], &topic[1..]), - _ => false, - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_wildcard_exact() { - assert!(wildcard_match("user-connected", "user-connected")); - assert!(!wildcard_match("user-connected", "user-created")); - } - - #[test] - fn test_wildcard_star() { - assert!(wildcard_match("user-*", "user-connected")); - assert!(wildcard_match("user-*", "user-created")); - assert!(wildcard_match("user-*", "user-deleted")); - assert!(!wildcard_match("user-*", "udp-metrics-updated")); - } - - #[test] - fn test_wildcard_question_mark() { - assert!(wildcard_match("user-?", "user-a")); - assert!(!wildcard_match("user-?", "user-ab")); - } - - #[test] - fn test_wildcard_star_anywhere() { - assert!(wildcard_match("*-updated", "udp-metrics-updated")); - assert!(wildcard_match("*metrics*", "udp-metrics-updated")); - } -} diff --git a/event_bus/event_bus/src/bus.rs b/event_bus/src/bus.rs similarity index 97% rename from event_bus/event_bus/src/bus.rs rename to event_bus/src/bus.rs index 34f4ebe..13a21eb 100644 --- a/event_bus/event_bus/src/bus.rs +++ b/event_bus/src/bus.rs @@ -1,396 +1,396 @@ -use std::any::Any; -use std::future::Future; -use std::sync::Arc; - -use glob::Pattern; -use parking_lot::RwLock; -use std::collections::HashMap; -use tokio::sync::broadcast; -use tokio::task::JoinHandle; - -/// Type brut d'un événement : pointeur atomique vers n'importe quelle valeur. -pub type AnyEvent = Arc; - -/// Capacité par défaut du buffer de chaque canal broadcast. -const DEFAULT_CAPACITY: usize = 64; - -/// Le bus d'événements central. -/// -/// Partagez-le via `Arc` entre les modules. -/// Chaque topic possède son propre canal broadcast : seuls les abonnés du bon -/// topic sont réveillés lors d'un `emit` (wake-up ciblé). -/// -/// # Exemple minimal — callback sync -/// ```rust,no_run -/// use std::sync::Arc; -/// use oxspeak_server_lib::event_bus::EventBus; -/// -/// #[derive(Clone, Debug)] -/// struct User { name: String } -/// -/// # tokio_test::block_on(async { -/// let bus = Arc::new(EventBus::new()); -/// -/// bus.on::("user-connected", |user| { -/// println!("Connecté : {:?}", user); -/// }); -/// -/// bus.emit("user-connected", User { name: "Alice".into() }); -/// # tokio::time::sleep(std::time::Duration::from_millis(10)).await; -/// # }); -/// ``` -/// -/// # Exemple — callback async -/// ```rust,no_run -/// use std::sync::Arc; -/// use oxspeak_server_lib::event_bus::EventBus; -/// -/// #[derive(Clone, Debug)] -/// struct User { name: String } -/// -/// # tokio_test::block_on(async { -/// let bus = Arc::new(EventBus::new()); -/// -/// bus.on_async::("user-connected", |user| async move { -/// println!("(async) Connecté : {:?}", user); -/// }); -/// -/// bus.emit("user-connected", User { name: "Bob".into() }); -/// # tokio::time::sleep(std::time::Duration::from_millis(10)).await; -/// # }); -/// ``` -/// -/// # Exemple — pattern glob avec topic -/// ```rust,no_run -/// use std::sync::Arc; -/// use oxspeak_server_lib::event_bus::EventBus; -/// -/// #[derive(Clone, Debug)] -/// struct User { name: String } -/// -/// # tokio_test::block_on(async { -/// let bus = Arc::new(EventBus::new()); -/// -/// bus.on_pattern::("user-*", |topic, user| { -/// match topic.as_str() { -/// "user-created" => println!("Créé : {:?}", user), -/// "user-deleted" => println!("Supprimé : {:?}", user), -/// other => println!("{}: {:?}", other, user), -/// } -/// }); -/// -/// bus.emit("user-created", User { name: "Alice".into() }); -/// bus.emit("user-deleted", User { name: "Bob".into() }); -/// # tokio::time::sleep(std::time::Duration::from_millis(10)).await; -/// # }); -/// ``` -pub struct EventBus { - /// Canaux par topic exact. - channels: RwLock>>, - /// Canaux pour les souscriptions par pattern glob. - patterns: RwLock)>>, - capacity: usize, -} - -impl EventBus { - /// Crée un bus avec la capacité par défaut (64 messages par canal). - pub fn new() -> Self { - Self { - channels: RwLock::new(HashMap::new()), - patterns: RwLock::new(Vec::new()), - capacity: DEFAULT_CAPACITY, - } - } - - /// Crée un bus avec une capacité de buffer personnalisée. - pub fn with_capacity(capacity: usize) -> Self { - Self { - channels: RwLock::new(HashMap::new()), - patterns: RwLock::new(Vec::new()), - capacity, - } - } - - // ───────────────────────────────────────────────────────────────────────── - // Interne - // ───────────────────────────────────────────────────────────────────────── - - fn get_or_create_sender(&self, topic: &str) -> broadcast::Sender { - { - let channels = self.channels.read(); - if let Some(tx) = channels.get(topic) { - return tx.clone(); - } - } - let mut channels = self.channels.write(); - channels - .entry(topic.to_string()) - .or_insert_with(|| { - let (tx, _) = broadcast::channel(self.capacity); - tx - }) - .clone() - } - - // ───────────────────────────────────────────────────────────────────────── - // Émission - // ───────────────────────────────────────────────────────────────────────── - - /// Émet un événement sur un topic. - /// - /// - Pousse l'event dans le canal du topic exact (si des abonnés existent). - /// - Pousse l'event dans tous les canaux de pattern qui matchent le topic. - /// - Si personne n'écoute, l'événement est ignoré silencieusement. - /// - /// # Exemple - /// ```rust,no_run - /// # use std::sync::Arc; - /// # use oxspeak_server_lib::event_bus::EventBus; - /// # #[derive(Clone)] struct User; - /// # let bus = Arc::new(EventBus::new()); - /// bus.emit("user-connected", User); - /// bus.emit("user-deleted", uuid::Uuid::new_v4()); - /// ``` - pub fn emit(&self, topic: &str, event: T) { - let event: AnyEvent = Arc::new(event); - - // Abonnés au topic exact - if let Some(tx) = self.channels.read().get(topic) { - let _ = tx.send(Arc::clone(&event)); - } - - // Abonnés aux patterns glob correspondants - for (pattern, tx) in self.patterns.read().iter() { - if pattern.matches(topic) { - let _ = tx.send((topic.to_string(), Arc::clone(&event))); - } - } - } - - // ───────────────────────────────────────────────────────────────────────── - // Abonnement — callbacks (API principale) - // ───────────────────────────────────────────────────────────────────────── - - /// S'abonne à un topic et appelle `handler` à chaque événement de type `T`. - /// - /// Le handler est exécuté dans une tâche Tokio dédiée (fire-and-forget). - /// Les événements d'un autre type sont ignorés silencieusement. - /// Retourne un [`JoinHandle`] pour annuler l'abonnement si besoin. - /// - /// # Exemple - /// ```rust,no_run - /// # use std::sync::Arc; - /// # use oxspeak_server_lib::event_bus::EventBus; - /// # #[derive(Clone, Debug)] struct User { name: String } - /// # let bus = Arc::new(EventBus::new()); - /// bus.on::("user-connected", |user| { - /// println!("Connecté : {:?}", user); - /// }); - /// ``` - pub fn on(&self, topic: &str, handler: F) -> JoinHandle<()> - where - T: Any + Send + Sync + Clone + 'static, - F: Fn(T) + Send + Sync + 'static, - { - let mut rx = self.get_or_create_sender(topic).subscribe(); - - tokio::spawn(async move { - loop { - match rx.recv().await { - Ok(evt) => { - if let Some(typed) = evt.downcast_ref::() { - handler(typed.clone()); - } - } - Err(broadcast::error::RecvError::Lagged(_)) => {} - Err(broadcast::error::RecvError::Closed) => break, - } - } - }) - } - - /// S'abonne à un topic et appelle un handler **async** à chaque événement de type `T`. - /// - /// Parfait pour effectuer des opérations async dans le handler - /// (requête DB, appel HTTP, broadcast WebSocket…). - /// Retourne un [`JoinHandle`] pour annuler l'abonnement si besoin. - /// - /// # Exemple - /// ```rust,no_run - /// # use std::sync::Arc; - /// # use oxspeak_server_lib::event_bus::EventBus; - /// # #[derive(Clone, Debug)] struct User { name: String } - /// # let bus = Arc::new(EventBus::new()); - /// bus.on_async::("user-connected", |user| async move { - /// println!("(async) Connecté : {:?}", user); - /// // Ici tu peux faire du async : requête DB, HTTP, etc. - /// }); - /// ``` - pub fn on_async(&self, topic: &str, handler: F) -> JoinHandle<()> - where - T: Any + Send + Sync + Clone + 'static, - F: Fn(T) -> Fut + Send + Sync + 'static, - Fut: Future + Send + 'static, - { - let mut rx = self.get_or_create_sender(topic).subscribe(); - - tokio::spawn(async move { - loop { - match rx.recv().await { - Ok(evt) => { - if let Some(typed) = evt.downcast_ref::() { - handler(typed.clone()).await; - } - } - Err(broadcast::error::RecvError::Lagged(_)) => {} - Err(broadcast::error::RecvError::Closed) => break, - } - } - }) - } - - /// S'abonne à tous les topics correspondant à un pattern glob. - /// - /// Le handler reçoit `(topic, valeur)` — le nom du topic est inclus pour - /// distinguer `user-created` de `user-deleted` par exemple. - /// - /// **Aucun pré-enregistrement nécessaire** : les topics futurs sont couverts. - /// Supporte la syntaxe glob : `*` (toute séquence), `?` (un caractère), - /// `[abc]` (classe de caractères). - /// - /// Retourne un [`JoinHandle`] pour annuler l'abonnement si besoin. - /// - /// # Exemple - /// ```rust,no_run - /// # use std::sync::Arc; - /// # use oxspeak_server_lib::event_bus::EventBus; - /// # #[derive(Clone, Debug)] struct User { name: String } - /// # let bus = Arc::new(EventBus::new()); - /// bus.on_pattern::("user-*", |topic, user| { - /// match topic.as_str() { - /// "user-created" => println!("Créé : {:?}", user), - /// "user-deleted" => println!("Supprimé : {:?}", user), - /// other => println!("{}: {:?}", other, user), - /// } - /// }); - /// - /// bus.emit("user-created", User { name: "Alice".into() }); - /// bus.emit("user-deleted", User { name: "Bob".into() }); - /// ``` - pub fn on_pattern(&self, pattern: &str, handler: F) -> JoinHandle<()> - where - T: Any + Send + Sync + Clone + 'static, - F: Fn(String, T) + Send + Sync + 'static, - { - let glob = Pattern::new(pattern).expect("pattern glob invalide"); - let (tx, mut rx) = broadcast::channel(self.capacity); - self.patterns.write().push((glob, tx)); - - tokio::spawn(async move { - loop { - match rx.recv().await { - Ok((topic, evt)) => { - if let Some(typed) = evt.downcast_ref::() { - handler(topic, typed.clone()); - } - } - Err(broadcast::error::RecvError::Lagged(_)) => {} - Err(broadcast::error::RecvError::Closed) => break, - } - } - }) - } - - /// S'abonne à tous les topics correspondant à un pattern glob, avec un handler **async**. - /// - /// Retourne un [`JoinHandle`] pour annuler l'abonnement si besoin. - /// - /// # Exemple - /// ```rust,no_run - /// # use std::sync::Arc; - /// # use oxspeak_server_lib::event_bus::EventBus; - /// # #[derive(Clone, Debug)] struct User { name: String } - /// # let bus = Arc::new(EventBus::new()); - /// bus.on_pattern_async::("user-*", |topic, user| async move { - /// match topic.as_str() { - /// "user-created" => println!("(async) Créé : {:?}", user), - /// "user-deleted" => println!("(async) Supprimé : {:?}", user), - /// other => println!("(async) {}: {:?}", other, user), - /// } - /// }); - /// - /// bus.emit("user-created", User { name: "Alice".into() }); - /// ``` - pub fn on_pattern_async(&self, pattern: &str, handler: F) -> JoinHandle<()> - where - T: Any + Send + Sync + Clone + 'static, - F: Fn(String, T) -> Fut + Send + Sync + 'static, - Fut: Future + Send + 'static, - { - let glob = Pattern::new(pattern).expect("pattern glob invalide"); - let (tx, mut rx) = broadcast::channel(self.capacity); - self.patterns.write().push((glob, tx)); - - tokio::spawn(async move { - loop { - match rx.recv().await { - Ok((topic, evt)) => { - if let Some(typed) = evt.downcast_ref::() { - handler(topic, typed.clone()).await; - } - } - Err(broadcast::error::RecvError::Lagged(_)) => {} - Err(broadcast::error::RecvError::Closed) => break, - } - } - }) - } - - // ───────────────────────────────────────────────────────────────────────── - // Abonnement — accès bas niveau (cas avancés) - // ───────────────────────────────────────────────────────────────────────── - - /// Retourne un receiver brut ([`AnyEvent`]) pour gérer toi-même la boucle. - /// - /// Utile avec la macro [`match_event!`][crate::match_event] pour gérer - /// plusieurs types différents sur un même topic. - /// - /// # Exemple - /// ```rust,no_run - /// # use std::sync::Arc; - /// # use oxspeak_server_lib::event_bus::EventBus; - /// # use oxspeak_server_lib::match_event; - /// # #[derive(Clone, Debug)] struct User { name: String } - /// # #[derive(Clone, Debug)] struct UdpMetric { value: f32 } - /// # let bus = Arc::new(EventBus::new()); - /// # tokio_test::block_on(async { - /// let mut rx = bus.on_raw("user-connected"); - /// bus.emit("user-connected", User { name: "Alice".into() }); - /// - /// if let Ok(evt) = rx.recv().await { - /// match_event!(evt, - /// User => |u| println!("User: {:?}", u), - /// UdpMetric => |m| println!("Metric: {:?}", m), - /// ); - /// } - /// # }); - /// ``` - pub fn on_raw(&self, topic: &str) -> broadcast::Receiver { - self.get_or_create_sender(topic).subscribe() - } - - // ───────────────────────────────────────────────────────────────────────── - // Utilitaires - // ───────────────────────────────────────────────────────────────────────── - - /// Retourne la liste des topics actuellement enregistrés. - pub fn topics(&self) -> Vec { - self.channels.read().keys().cloned().collect() - } -} - -impl Default for EventBus { - fn default() -> Self { - Self::new() - } -} +use std::any::Any; +use std::future::Future; +use std::sync::Arc; + +use glob::Pattern; +use parking_lot::RwLock; +use std::collections::HashMap; +use tokio::sync::broadcast; +use tokio::task::JoinHandle; + +/// Type brut d'un événement : pointeur atomique vers n'importe quelle valeur. +pub type AnyEvent = Arc; + +/// Capacité par défaut du buffer de chaque canal broadcast. +const DEFAULT_CAPACITY: usize = 64; + +/// Le bus d'événements central. +/// +/// Partagez-le via `Arc` entre les modules. +/// Chaque topic possède son propre canal broadcast : seuls les abonnés du bon +/// topic sont réveillés lors d'un `emit` (wake-up ciblé). +/// +/// # Exemple minimal — callback sync +/// ```rust,no_run +/// use std::sync::Arc; +/// use oxspeak_server_lib::event_bus::EventBus; +/// +/// #[derive(Clone, Debug)] +/// struct User { name: String } +/// +/// # tokio_test::block_on(async { +/// let bus = Arc::new(EventBus::new()); +/// +/// bus.on::("user-connected", |user| { +/// println!("Connecté : {:?}", user); +/// }); +/// +/// bus.emit("user-connected", User { name: "Alice".into() }); +/// # tokio::time::sleep(std::time::Duration::from_millis(10)).await; +/// # }); +/// ``` +/// +/// # Exemple — callback async +/// ```rust,no_run +/// use std::sync::Arc; +/// use oxspeak_server_lib::event_bus::EventBus; +/// +/// #[derive(Clone, Debug)] +/// struct User { name: String } +/// +/// # tokio_test::block_on(async { +/// let bus = Arc::new(EventBus::new()); +/// +/// bus.on_async::("user-connected", |user| async move { +/// println!("(async) Connecté : {:?}", user); +/// }); +/// +/// bus.emit("user-connected", User { name: "Bob".into() }); +/// # tokio::time::sleep(std::time::Duration::from_millis(10)).await; +/// # }); +/// ``` +/// +/// # Exemple — pattern glob avec topic +/// ```rust,no_run +/// use std::sync::Arc; +/// use oxspeak_server_lib::event_bus::EventBus; +/// +/// #[derive(Clone, Debug)] +/// struct User { name: String } +/// +/// # tokio_test::block_on(async { +/// let bus = Arc::new(EventBus::new()); +/// +/// bus.on_pattern::("user-*", |topic, user| { +/// match topic.as_str() { +/// "user-created" => println!("Créé : {:?}", user), +/// "user-deleted" => println!("Supprimé : {:?}", user), +/// other => println!("{}: {:?}", other, user), +/// } +/// }); +/// +/// bus.emit("user-created", User { name: "Alice".into() }); +/// bus.emit("user-deleted", User { name: "Bob".into() }); +/// # tokio::time::sleep(std::time::Duration::from_millis(10)).await; +/// # }); +/// ``` +pub struct EventBus { + /// Canaux par topic exact. + channels: RwLock>>, + /// Canaux pour les souscriptions par pattern glob. + patterns: RwLock)>>, + capacity: usize, +} + +impl EventBus { + /// Crée un bus avec la capacité par défaut (64 messages par canal). + pub fn new() -> Self { + Self { + channels: RwLock::new(HashMap::new()), + patterns: RwLock::new(Vec::new()), + capacity: DEFAULT_CAPACITY, + } + } + + /// Crée un bus avec une capacité de buffer personnalisée. + pub fn with_capacity(capacity: usize) -> Self { + Self { + channels: RwLock::new(HashMap::new()), + patterns: RwLock::new(Vec::new()), + capacity, + } + } + + // ───────────────────────────────────────────────────────────────────────── + // Interne + // ───────────────────────────────────────────────────────────────────────── + + fn get_or_create_sender(&self, topic: &str) -> broadcast::Sender { + { + let channels = self.channels.read(); + if let Some(tx) = channels.get(topic) { + return tx.clone(); + } + } + let mut channels = self.channels.write(); + channels + .entry(topic.to_string()) + .or_insert_with(|| { + let (tx, _) = broadcast::channel(self.capacity); + tx + }) + .clone() + } + + // ───────────────────────────────────────────────────────────────────────── + // Émission + // ───────────────────────────────────────────────────────────────────────── + + /// Émet un événement sur un topic. + /// + /// - Pousse l'event dans le canal du topic exact (si des abonnés existent). + /// - Pousse l'event dans tous les canaux de pattern qui matchent le topic. + /// - Si personne n'écoute, l'événement est ignoré silencieusement. + /// + /// # Exemple + /// ```rust,no_run + /// # use std::sync::Arc; + /// # use oxspeak_server_lib::event_bus::EventBus; + /// # #[derive(Clone)] struct User; + /// # let bus = Arc::new(EventBus::new()); + /// bus.emit("user-connected", User); + /// bus.emit("user-deleted", uuid::Uuid::new_v4()); + /// ``` + pub fn emit(&self, topic: &str, event: T) { + let event: AnyEvent = Arc::new(event); + + // Abonnés au topic exact + if let Some(tx) = self.channels.read().get(topic) { + let _ = tx.send(Arc::clone(&event)); + } + + // Abonnés aux patterns glob correspondants + for (pattern, tx) in self.patterns.read().iter() { + if pattern.matches(topic) { + let _ = tx.send((topic.to_string(), Arc::clone(&event))); + } + } + } + + // ───────────────────────────────────────────────────────────────────────── + // Abonnement — callbacks (API principale) + // ───────────────────────────────────────────────────────────────────────── + + /// S'abonne à un topic et appelle `handler` à chaque événement de type `T`. + /// + /// Le handler est exécuté dans une tâche Tokio dédiée (fire-and-forget). + /// Les événements d'un autre type sont ignorés silencieusement. + /// Retourne un [`JoinHandle`] pour annuler l'abonnement si besoin. + /// + /// # Exemple + /// ```rust,no_run + /// # use std::sync::Arc; + /// # use oxspeak_server_lib::event_bus::EventBus; + /// # #[derive(Clone, Debug)] struct User { name: String } + /// # let bus = Arc::new(EventBus::new()); + /// bus.on::("user-connected", |user| { + /// println!("Connecté : {:?}", user); + /// }); + /// ``` + pub fn on(&self, topic: &str, handler: F) -> JoinHandle<()> + where + T: Any + Send + Sync + Clone + 'static, + F: Fn(T) + Send + Sync + 'static, + { + let mut rx = self.get_or_create_sender(topic).subscribe(); + + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(evt) => { + if let Some(typed) = evt.downcast_ref::() { + handler(typed.clone()); + } + } + Err(broadcast::error::RecvError::Lagged(_)) => {} + Err(broadcast::error::RecvError::Closed) => break, + } + } + }) + } + + /// S'abonne à un topic et appelle un handler **async** à chaque événement de type `T`. + /// + /// Parfait pour effectuer des opérations async dans le handler + /// (requête DB, appel HTTP, broadcast WebSocket…). + /// Retourne un [`JoinHandle`] pour annuler l'abonnement si besoin. + /// + /// # Exemple + /// ```rust,no_run + /// # use std::sync::Arc; + /// # use oxspeak_server_lib::event_bus::EventBus; + /// # #[derive(Clone, Debug)] struct User { name: String } + /// # let bus = Arc::new(EventBus::new()); + /// bus.on_async::("user-connected", |user| async move { + /// println!("(async) Connecté : {:?}", user); + /// // Ici tu peux faire du async : requête DB, HTTP, etc. + /// }); + /// ``` + pub fn on_async(&self, topic: &str, handler: F) -> JoinHandle<()> + where + T: Any + Send + Sync + Clone + 'static, + F: Fn(T) -> Fut + Send + Sync + 'static, + Fut: Future + Send + 'static, + { + let mut rx = self.get_or_create_sender(topic).subscribe(); + + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(evt) => { + if let Some(typed) = evt.downcast_ref::() { + handler(typed.clone()).await; + } + } + Err(broadcast::error::RecvError::Lagged(_)) => {} + Err(broadcast::error::RecvError::Closed) => break, + } + } + }) + } + + /// S'abonne à tous les topics correspondant à un pattern glob. + /// + /// Le handler reçoit `(topic, valeur)` — le nom du topic est inclus pour + /// distinguer `user-created` de `user-deleted` par exemple. + /// + /// **Aucun pré-enregistrement nécessaire** : les topics futurs sont couverts. + /// Supporte la syntaxe glob : `*` (toute séquence), `?` (un caractère), + /// `[abc]` (classe de caractères). + /// + /// Retourne un [`JoinHandle`] pour annuler l'abonnement si besoin. + /// + /// # Exemple + /// ```rust,no_run + /// # use std::sync::Arc; + /// # use oxspeak_server_lib::event_bus::EventBus; + /// # #[derive(Clone, Debug)] struct User { name: String } + /// # let bus = Arc::new(EventBus::new()); + /// bus.on_pattern::("user-*", |topic, user| { + /// match topic.as_str() { + /// "user-created" => println!("Créé : {:?}", user), + /// "user-deleted" => println!("Supprimé : {:?}", user), + /// other => println!("{}: {:?}", other, user), + /// } + /// }); + /// + /// bus.emit("user-created", User { name: "Alice".into() }); + /// bus.emit("user-deleted", User { name: "Bob".into() }); + /// ``` + pub fn on_pattern(&self, pattern: &str, handler: F) -> JoinHandle<()> + where + T: Any + Send + Sync + Clone + 'static, + F: Fn(String, T) + Send + Sync + 'static, + { + let glob = Pattern::new(pattern).expect("pattern glob invalide"); + let (tx, mut rx) = broadcast::channel(self.capacity); + self.patterns.write().push((glob, tx)); + + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok((topic, evt)) => { + if let Some(typed) = evt.downcast_ref::() { + handler(topic, typed.clone()); + } + } + Err(broadcast::error::RecvError::Lagged(_)) => {} + Err(broadcast::error::RecvError::Closed) => break, + } + } + }) + } + + /// S'abonne à tous les topics correspondant à un pattern glob, avec un handler **async**. + /// + /// Retourne un [`JoinHandle`] pour annuler l'abonnement si besoin. + /// + /// # Exemple + /// ```rust,no_run + /// # use std::sync::Arc; + /// # use oxspeak_server_lib::event_bus::EventBus; + /// # #[derive(Clone, Debug)] struct User { name: String } + /// # let bus = Arc::new(EventBus::new()); + /// bus.on_pattern_async::("user-*", |topic, user| async move { + /// match topic.as_str() { + /// "user-created" => println!("(async) Créé : {:?}", user), + /// "user-deleted" => println!("(async) Supprimé : {:?}", user), + /// other => println!("(async) {}: {:?}", other, user), + /// } + /// }); + /// + /// bus.emit("user-created", User { name: "Alice".into() }); + /// ``` + pub fn on_pattern_async(&self, pattern: &str, handler: F) -> JoinHandle<()> + where + T: Any + Send + Sync + Clone + 'static, + F: Fn(String, T) -> Fut + Send + Sync + 'static, + Fut: Future + Send + 'static, + { + let glob = Pattern::new(pattern).expect("pattern glob invalide"); + let (tx, mut rx) = broadcast::channel(self.capacity); + self.patterns.write().push((glob, tx)); + + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok((topic, evt)) => { + if let Some(typed) = evt.downcast_ref::() { + handler(topic, typed.clone()).await; + } + } + Err(broadcast::error::RecvError::Lagged(_)) => {} + Err(broadcast::error::RecvError::Closed) => break, + } + } + }) + } + + // ───────────────────────────────────────────────────────────────────────── + // Abonnement — accès bas niveau (cas avancés) + // ───────────────────────────────────────────────────────────────────────── + + /// Retourne un receiver brut ([`AnyEvent`]) pour gérer toi-même la boucle. + /// + /// Utile avec la macro [`match_event!`][crate::match_event] pour gérer + /// plusieurs types différents sur un même topic. + /// + /// # Exemple + /// ```rust,no_run + /// # use std::sync::Arc; + /// # use oxspeak_server_lib::event_bus::EventBus; + /// # use oxspeak_server_lib::match_event; + /// # #[derive(Clone, Debug)] struct User { name: String } + /// # #[derive(Clone, Debug)] struct UdpMetric { value: f32 } + /// # let bus = Arc::new(EventBus::new()); + /// # tokio_test::block_on(async { + /// let mut rx = bus.on_raw("user-connected"); + /// bus.emit("user-connected", User { name: "Alice".into() }); + /// + /// if let Ok(evt) = rx.recv().await { + /// match_event!(evt, + /// User => |u| println!("User: {:?}", u), + /// UdpMetric => |m| println!("Metric: {:?}", m), + /// ); + /// } + /// # }); + /// ``` + pub fn on_raw(&self, topic: &str) -> broadcast::Receiver { + self.get_or_create_sender(topic).subscribe() + } + + // ───────────────────────────────────────────────────────────────────────── + // Utilitaires + // ───────────────────────────────────────────────────────────────────────── + + /// Retourne la liste des topics actuellement enregistrés. + pub fn topics(&self) -> Vec { + self.channels.read().keys().cloned().collect() + } +} + +impl Default for EventBus { + fn default() -> Self { + Self::new() + } +} diff --git a/event_bus/src/lib.rs b/event_bus/src/lib.rs new file mode 100644 index 0000000..b225064 --- /dev/null +++ b/event_bus/src/lib.rs @@ -0,0 +1,162 @@ +//! # Event Bus +//! +//! Un bus d'événements asynchrone permettant de faire transiter des messages typés +//! entre plusieurs modules, sans couplage direct. +//! +//! ## Caractéristiques +//! +//! - **Association clé → événement** : chaque topic (`&str`) est indépendant +//! - **Sans restriction de type** : n'importe quel `T: Any + Send + Sync + Clone` +//! - **Wake-up ciblé** : seuls les abonnés du bon topic sont réveillés +//! - **API callback** : style JavaScript — `bus.on("topic", |payload| { ... })` +//! - **Pattern glob** : `on_pattern("user-*", |topic, payload| { ... })` +//! - **Handlers async** : `on_async` et `on_pattern_async` +//! +//! ## Exemple — callback sync +//! +//! ```rust,no_run +//! use std::sync::Arc; +//! use oxspeak_server_lib::event_bus::EventBus; +//! +//! #[derive(Clone, Debug)] +//! struct User { name: String } +//! +//! # tokio_test::block_on(async { +//! let bus = Arc::new(EventBus::new()); +//! +//! bus.on::("user-connected", |user| { +//! println!("Connecté : {:?}", user); +//! }); +//! +//! bus.emit("user-connected", User { name: "Alice".into() }); +//! # tokio::time::sleep(std::time::Duration::from_millis(10)).await; +//! # }); +//! ``` +//! +//! ## Exemple — callback async +//! +//! ```rust,no_run +//! use std::sync::Arc; +//! use oxspeak_server_lib::event_bus::EventBus; +//! +//! #[derive(Clone, Debug)] +//! struct User { name: String } +//! +//! # tokio_test::block_on(async { +//! let bus = Arc::new(EventBus::new()); +//! +//! bus.on_async::("user-connected", |user| async move { +//! println!("(async) Connecté : {:?}", user); +//! }); +//! +//! bus.emit("user-connected", User { name: "Bob".into() }); +//! # tokio::time::sleep(std::time::Duration::from_millis(10)).await; +//! # }); +//! ``` +//! +//! ## Exemple — pattern glob (topic inclus dans le callback) +//! +//! ```rust,no_run +//! use std::sync::Arc; +//! use oxspeak_server_lib::event_bus::EventBus; +//! +//! #[derive(Clone, Debug)] +//! struct User { name: String } +//! +//! # tokio_test::block_on(async { +//! let bus = Arc::new(EventBus::new()); +//! +//! bus.on_pattern::("user-*", |topic, user| { +//! match topic.as_str() { +//! "user-created" => println!("Créé : {:?}", user), +//! "user-deleted" => println!("Supprimé : {:?}", user), +//! other => println!("{}: {:?}", other, user), +//! } +//! }); +//! +//! bus.emit("user-created", User { name: "Alice".into() }); +//! bus.emit("user-deleted", User { name: "Bob".into() }); +//! # tokio::time::sleep(std::time::Duration::from_millis(10)).await; +//! # }); +//! ``` +//! +//! ## Exemple — multi-types avec `match_event!` (cas avancé) +//! +//! ```rust,no_run +//! use std::sync::Arc; +//! use oxspeak_server_lib::event_bus::EventBus; +//! use oxspeak_server_lib::match_event; +//! +//! #[derive(Clone, Debug)] struct User { name: String } +//! #[derive(Clone, Debug)] struct UdpMetric { value: f32 } +//! +//! # tokio_test::block_on(async { +//! let bus = Arc::new(EventBus::new()); +//! let mut rx = bus.on_raw("mixed-topic"); +//! +//! bus.emit("mixed-topic", User { name: "Alice".into() }); +//! +//! if let Ok(evt) = rx.recv().await { +//! match_event!(evt, +//! User => |u| println!("User: {:?}", u), +//! UdpMetric => |m| println!("Metric: {:?}", m), +//! ); +//! } +//! # }); +//! ``` + +mod bus; + +// Réexports publics +pub use bus::{AnyEvent, EventBus}; + +/// Downcaste un [`AnyEvent`] vers un ou plusieurs types concrets et exécute +/// la closure correspondante si le type correspond. +/// +/// Les branches non correspondantes sont ignorées silencieusement. +/// +/// # Syntaxe +/// ```text +/// match_event!(evt, Type1 => |val| { ... }, Type2 => |val| { ... }) +/// ``` +/// +/// # Exemple +/// ```rust,no_run +/// # use std::sync::Arc; +/// # use oxspeak_server_lib::event_bus::EventBus; +/// # use oxspeak_server_lib::match_event; +/// # #[derive(Clone, Debug)] struct User { name: String } +/// # #[derive(Clone, Debug)] struct UdpMetric { value: f32 } +/// # let bus = Arc::new(EventBus::new()); +/// # tokio_test::block_on(async { +/// let mut rx = bus.on_raw("user-connected"); +/// bus.emit("user-connected", User { name: "Alice".into() }); +/// +/// if let Ok(evt) = rx.recv().await { +/// match_event!(evt, +/// User => |u| println!("Utilisateur : {:?}", u), +/// UdpMetric => |m| println!("Metric : {:?}", m), +/// ); +/// } +/// # }); +/// ``` +#[macro_export] +macro_rules! match_event { + ($evt:expr, $($type:ty => $handler:expr),+ $(,)?) => { + $( + if let Some(val) = ($evt).downcast_ref::<$type>() { + ($handler)(val.clone()); + } else + )+ + { + // Aucun type ne correspond → on ignore silencieusement + } + }; +} + +// ───────────────────────────────────────────────────────────────────────────── +// Tests +// ───────────────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests; diff --git a/event_bus/src/tests.rs b/event_bus/src/tests.rs new file mode 100644 index 0000000..b26e0d6 --- /dev/null +++ b/event_bus/src/tests.rs @@ -0,0 +1,293 @@ +use crate::{match_event, EventBus}; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use std::sync::Arc; +#[derive(Clone, Debug, PartialEq)] +struct User { + name: String, +} + +#[derive(Clone, Debug, PartialEq)] +struct UdpMetric { + value: f32, +} + +// ── on (callback sync) ──────────────────────────────────────────────────── + +#[tokio::test] +async fn test_on_callback_sync() { + let bus = Arc::new(EventBus::new()); + let received = Arc::new(AtomicBool::new(false)); + let flag = Arc::clone(&received); + + bus.on::("user-connected", move |user| { + if user.name == "Alice" { + flag.store(true, Ordering::SeqCst); + } + }); + + bus.emit( + "user-connected", + User { + name: "Alice".into(), + }, + ); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + + assert!(received.load(Ordering::SeqCst)); +} + +#[tokio::test] +async fn test_on_targeted_wakeup() { + // Émettre sur "user-connected" ne doit pas réveiller "udp-metrics-updated" + let bus = Arc::new(EventBus::new()); + let metric_called = Arc::new(AtomicBool::new(false)); + let flag = Arc::clone(&metric_called); + + bus.on::("udp-metrics-updated", move |_| { + flag.store(true, Ordering::SeqCst); + }); + + bus.emit( + "user-connected", + User { + name: "Carol".into(), + }, + ); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + + assert!(!metric_called.load(Ordering::SeqCst)); +} + +#[tokio::test] +async fn test_on_type_mismatch_ignored() { + // Émettre un UdpMetric sur un topic écouté en User → handler pas appelé + let bus = Arc::new(EventBus::new()); + let called = Arc::new(AtomicBool::new(false)); + let flag = Arc::clone(&called); + + bus.on::("mixed-topic", move |_| { + flag.store(true, Ordering::SeqCst); + }); + + bus.emit("mixed-topic", UdpMetric { value: 1.0 }); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + + assert!(!called.load(Ordering::SeqCst)); +} + +#[tokio::test] +async fn test_on_multiple_subscribers_same_topic() { + let bus = Arc::new(EventBus::new()); + let count = Arc::new(AtomicU32::new(0)); + + for _ in 0..3 { + let c = Arc::clone(&count); + bus.on::("user-connected", move |_| { + c.fetch_add(1, Ordering::SeqCst); + }); + } + + bus.emit( + "user-connected", + User { + name: "Grace".into(), + }, + ); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + + assert_eq!(count.load(Ordering::SeqCst), 3); +} + +// ── on_async ────────────────────────────────────────────────────────────── + +#[tokio::test] +async fn test_on_async_callback() { + let bus = Arc::new(EventBus::new()); + let received = Arc::new(AtomicBool::new(false)); + let flag = Arc::clone(&received); + + bus.on_async::("user-connected", move |user| { + let f = Arc::clone(&flag); + async move { + if user.name == "Async" { + f.store(true, Ordering::SeqCst); + } + } + }); + + bus.emit( + "user-connected", + User { + name: "Async".into(), + }, + ); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + + assert!(received.load(Ordering::SeqCst)); +} + +// ── on_pattern ──────────────────────────────────────────────────────────── + +#[tokio::test] +async fn test_on_pattern_callback_receives_topic_and_payload() { + let bus = Arc::new(EventBus::new()); + let created = Arc::new(AtomicBool::new(false)); + let deleted = Arc::new(AtomicBool::new(false)); + let c = Arc::clone(&created); + let d = Arc::clone(&deleted); + + bus.on_pattern::("user-*", move |topic, user| match topic.as_str() { + "user-created" if user.name == "Dave" => c.store(true, Ordering::SeqCst), + "user-deleted" if user.name == "Eve" => d.store(true, Ordering::SeqCst), + _ => {} + }); + + bus.emit( + "user-created", + User { + name: "Dave".into(), + }, + ); + bus.emit("user-deleted", User { name: "Eve".into() }); + tokio::time::sleep(std::time::Duration::from_millis(30)).await; + + assert!(created.load(Ordering::SeqCst)); + assert!(deleted.load(Ordering::SeqCst)); +} + +#[tokio::test] +async fn test_on_pattern_does_not_match_other_topics() { + let bus = Arc::new(EventBus::new()); + let called = Arc::new(AtomicBool::new(false)); + let flag = Arc::clone(&called); + + bus.on_pattern::("user-*", move |_, _| { + flag.store(true, Ordering::SeqCst); + }); + + bus.emit( + "server-created", + User { + name: "Ghost".into(), + }, + ); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + + assert!(!called.load(Ordering::SeqCst)); +} + +#[tokio::test] +async fn test_on_pattern_no_pre_registration_needed() { + // Le pattern est enregistré avant que le topic n'existe + let bus = Arc::new(EventBus::new()); + let received = Arc::new(AtomicBool::new(false)); + let flag = Arc::clone(&received); + + bus.on_pattern::("user-*", move |topic, user| { + if topic == "user-new-topic" && user.name == "Frank" { + flag.store(true, Ordering::SeqCst); + } + }); + + bus.emit( + "user-new-topic", + User { + name: "Frank".into(), + }, + ); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + + assert!(received.load(Ordering::SeqCst)); +} + +// ── on_pattern_async ────────────────────────────────────────────────────── + +#[tokio::test] +async fn test_on_pattern_async_callback() { + let bus = Arc::new(EventBus::new()); + let received = Arc::new(AtomicBool::new(false)); + let flag = Arc::clone(&received); + + bus.on_pattern_async::("user-*", move |topic, user| { + let f = Arc::clone(&flag); + async move { + if topic == "user-created" && user.name == "Hank" { + f.store(true, Ordering::SeqCst); + } + } + }); + + bus.emit( + "user-created", + User { + name: "Hank".into(), + }, + ); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + + assert!(received.load(Ordering::SeqCst)); +} + +// ── on_raw + match_event! ───────────────────────────────────────────────── + +#[tokio::test] +async fn test_on_raw_and_match_event_macro() { + let bus = Arc::new(EventBus::new()); + let mut rx = bus.on_raw("user-connected"); + + bus.emit( + "user-connected", + User { + name: "Frank".into(), + }, + ); + + let evt = rx.recv().await.unwrap(); + let mut received_name = String::new(); + match_event!(evt, + User => |u: User| { received_name = u.name.clone(); }, + UdpMetric => |_m: UdpMetric| { panic!("mauvais type"); } + ); + assert_eq!(received_name, "Frank"); +} + +// ── Utilitaires ──────────────────────────────────────────────────────────── + +#[test] +fn test_topics_list() { + let bus = EventBus::new(); + // on_raw enregistre le canal (get_or_create) + let _rx1 = bus.on_raw("user-connected"); + let _rx2 = bus.on_raw("udp-metrics-updated"); + + let mut topics = bus.topics(); + topics.sort(); + assert_eq!(topics, vec!["udp-metrics-updated", "user-connected"]); +} + +#[tokio::test] +async fn test_emit_multiple_types_same_bus() { + let bus = Arc::new(EventBus::new()); + let user_ok = Arc::new(AtomicBool::new(false)); + let metric_ok = Arc::new(AtomicBool::new(false)); + let u = Arc::clone(&user_ok); + let m = Arc::clone(&metric_ok); + + bus.on::("user-connected", move |user| { + if user.name == "Bob" { + u.store(true, Ordering::SeqCst); + } + }); + bus.on::("udp-metrics-updated", move |metric| { + if (metric.value - 3.14).abs() < 0.001 { + m.store(true, Ordering::SeqCst); + } + }); + + bus.emit("user-connected", User { name: "Bob".into() }); + bus.emit("udp-metrics-updated", UdpMetric { value: 3.14 }); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + + assert!(user_ok.load(Ordering::SeqCst)); + assert!(metric_ok.load(Ordering::SeqCst)); +}