This commit is contained in:
2026-05-03 21:25:01 +02:00
parent 47c33a3a6c
commit 0de2e334ae
10 changed files with 879 additions and 1131 deletions
Generated
+10
View File
@@ -1019,6 +1019,15 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "event_bus"
version = "0.1.0"
dependencies = [
"glob",
"parking_lot",
"tokio",
]
[[package]]
name = "fastrand"
version = "2.4.1"
@@ -1906,6 +1915,7 @@ name = "oxspeak_server"
version = "0.1.0"
dependencies = [
"config",
"event_bus",
"glob",
"log",
"migration",
+1 -1
View File
@@ -15,7 +15,7 @@ tokio = { version = "1.52.1", features = ["full"] }
config = "0.15.22"
sea-orm = { version = "2.0.0-rc.38", features = ["sqlx-sqlite", "sqlx-postgres", "sqlx-mysql", "runtime-tokio", "with-chrono", "with-uuid", "with-json", "schema-sync"] }
migration = { path = "migration" }
event-bus = { path = "event_bus" }
event_bus = { path = "event_bus" }
parking_lot = "0.12.5"
serde = "1.0.228"
serde_json = "1.0.149"
+17
View File
@@ -0,0 +1,17 @@
[package]
name = "event_bus"
version = "0.1.0"
edition = "2024"
publish = false
[lib]
name = "event_bus"
path = "src/lib.rs"
[dependencies]
tokio = { version = "1.52.1", default-features = false, features = ["rt", "sync"] }
glob = "0.3.3"
parking_lot = "0.12.5"
[dev-dependencies]
tokio = { version = "1.52.1", default-features = false, features = ["rt", "rt-multi-thread", "macros", "time", "sync"] }
-11
View File
@@ -1,11 +0,0 @@
[package]
name = "event-bus"
version = "0.1.0"
edition = "2024"
publish = false
[lib]
name = "migration"
path = "src/lib.rs"
[dependencies]
-458
View File
@@ -1,458 +0,0 @@
//! # Event Bus
//!
//! Un bus d'événements asynchrone permettant de faire transiter des messages typés
//! entre plusieurs modules, sans couplage direct.
//!
//! ## Caractéristiques
//!
//! - **Association clé → événement** : chaque topic (`&str`) est indépendant
//! - **Sans restriction de type** : n'importe quel `T: Any + Send + Sync + Clone`
//! - **Wake-up ciblé** : seuls les abonnés du bon topic sont réveillés
//! - **API callback** : style JavaScript — `bus.on("topic", |payload| { ... })`
//! - **Pattern glob** : `on_pattern("user-*", |topic, payload| { ... })`
//! - **Handlers async** : `on_async` et `on_pattern_async`
//!
//! ## Exemple — callback sync
//!
//! ```rust,no_run
//! use std::sync::Arc;
//! use oxspeak_server_lib::event_bus::EventBus;
//!
//! #[derive(Clone, Debug)]
//! struct User { name: String }
//!
//! # tokio_test::block_on(async {
//! let bus = Arc::new(EventBus::new());
//!
//! bus.on::<User>("user-connected", |user| {
//! println!("Connecté : {:?}", user);
//! });
//!
//! bus.emit("user-connected", User { name: "Alice".into() });
//! # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
//! # });
//! ```
//!
//! ## Exemple — callback async
//!
//! ```rust,no_run
//! use std::sync::Arc;
//! use oxspeak_server_lib::event_bus::EventBus;
//!
//! #[derive(Clone, Debug)]
//! struct User { name: String }
//!
//! # tokio_test::block_on(async {
//! let bus = Arc::new(EventBus::new());
//!
//! bus.on_async::<User, _, _>("user-connected", |user| async move {
//! println!("(async) Connecté : {:?}", user);
//! });
//!
//! bus.emit("user-connected", User { name: "Bob".into() });
//! # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
//! # });
//! ```
//!
//! ## Exemple — pattern glob (topic inclus dans le callback)
//!
//! ```rust,no_run
//! use std::sync::Arc;
//! use oxspeak_server_lib::event_bus::EventBus;
//!
//! #[derive(Clone, Debug)]
//! struct User { name: String }
//!
//! # tokio_test::block_on(async {
//! let bus = Arc::new(EventBus::new());
//!
//! bus.on_pattern::<User, _>("user-*", |topic, user| {
//! match topic.as_str() {
//! "user-created" => println!("Créé : {:?}", user),
//! "user-deleted" => println!("Supprimé : {:?}", user),
//! other => println!("{}: {:?}", other, user),
//! }
//! });
//!
//! bus.emit("user-created", User { name: "Alice".into() });
//! bus.emit("user-deleted", User { name: "Bob".into() });
//! # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
//! # });
//! ```
//!
//! ## Exemple — multi-types avec `match_event!` (cas avancé)
//!
//! ```rust,no_run
//! use std::sync::Arc;
//! use oxspeak_server_lib::event_bus::EventBus;
//! use oxspeak_server_lib::match_event;
//!
//! #[derive(Clone, Debug)] struct User { name: String }
//! #[derive(Clone, Debug)] struct UdpMetric { value: f32 }
//!
//! # tokio_test::block_on(async {
//! let bus = Arc::new(EventBus::new());
//! let mut rx = bus.on_raw("mixed-topic");
//!
//! bus.emit("mixed-topic", User { name: "Alice".into() });
//!
//! if let Ok(evt) = rx.recv().await {
//! match_event!(evt,
//! User => |u| println!("User: {:?}", u),
//! UdpMetric => |m| println!("Metric: {:?}", m),
//! );
//! }
//! # });
//! ```
mod bus;
// Réexports publics
pub use bus::{AnyEvent, EventBus};
/// Downcaste un [`AnyEvent`] vers un ou plusieurs types concrets et exécute
/// la closure correspondante si le type correspond.
///
/// Les branches non correspondantes sont ignorées silencieusement.
///
/// # Syntaxe
/// ```text
/// match_event!(evt, Type1 => |val| { ... }, Type2 => |val| { ... })
/// ```
///
/// # Exemple
/// ```rust,no_run
/// # use std::sync::Arc;
/// # use oxspeak_server_lib::event_bus::EventBus;
/// # use oxspeak_server_lib::match_event;
/// # #[derive(Clone, Debug)] struct User { name: String }
/// # #[derive(Clone, Debug)] struct UdpMetric { value: f32 }
/// # let bus = Arc::new(EventBus::new());
/// # tokio_test::block_on(async {
/// let mut rx = bus.on_raw("user-connected");
/// bus.emit("user-connected", User { name: "Alice".into() });
///
/// if let Ok(evt) = rx.recv().await {
/// match_event!(evt,
/// User => |u| println!("Utilisateur : {:?}", u),
/// UdpMetric => |m| println!("Metric : {:?}", m),
/// );
/// }
/// # });
/// ```
#[macro_export]
macro_rules! match_event {
($evt:expr, $($type:ty => $handler:expr),+ $(,)?) => {
$(
if let Some(val) = ($evt).downcast_ref::<$type>() {
($handler)(val.clone());
} else
)+
{
// Aucun type ne correspond → on ignore silencieusement
}
};
}
// ─────────────────────────────────────────────────────────────────────────────
// Tests
// ─────────────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use super::*;
#[derive(Clone, Debug, PartialEq)]
struct User {
name: String,
}
#[derive(Clone, Debug, PartialEq)]
struct UdpMetric {
value: f32,
}
// ── on (callback sync) ────────────────────────────────────────────────────
#[tokio::test]
async fn test_on_callback_sync() {
let bus = Arc::new(EventBus::new());
let received = Arc::new(AtomicBool::new(false));
let flag = Arc::clone(&received);
bus.on::<User, _>("user-connected", move |user| {
if user.name == "Alice" {
flag.store(true, Ordering::SeqCst);
}
});
bus.emit(
"user-connected",
User {
name: "Alice".into(),
},
);
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(received.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_on_targeted_wakeup() {
// Émettre sur "user-connected" ne doit pas réveiller "udp-metrics-updated"
let bus = Arc::new(EventBus::new());
let metric_called = Arc::new(AtomicBool::new(false));
let flag = Arc::clone(&metric_called);
bus.on::<UdpMetric, _>("udp-metrics-updated", move |_| {
flag.store(true, Ordering::SeqCst);
});
bus.emit(
"user-connected",
User {
name: "Carol".into(),
},
);
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(!metric_called.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_on_type_mismatch_ignored() {
// Émettre un UdpMetric sur un topic écouté en User → handler pas appelé
let bus = Arc::new(EventBus::new());
let called = Arc::new(AtomicBool::new(false));
let flag = Arc::clone(&called);
bus.on::<User, _>("mixed-topic", move |_| {
flag.store(true, Ordering::SeqCst);
});
bus.emit("mixed-topic", UdpMetric { value: 1.0 });
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(!called.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_on_multiple_subscribers_same_topic() {
let bus = Arc::new(EventBus::new());
let count = Arc::new(AtomicU32::new(0));
for _ in 0..3 {
let c = Arc::clone(&count);
bus.on::<User, _>("user-connected", move |_| {
c.fetch_add(1, Ordering::SeqCst);
});
}
bus.emit(
"user-connected",
User {
name: "Grace".into(),
},
);
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert_eq!(count.load(Ordering::SeqCst), 3);
}
// ── on_async ──────────────────────────────────────────────────────────────
#[tokio::test]
async fn test_on_async_callback() {
let bus = Arc::new(EventBus::new());
let received = Arc::new(AtomicBool::new(false));
let flag = Arc::clone(&received);
bus.on_async::<User, _, _>("user-connected", move |user| {
let f = Arc::clone(&flag);
async move {
if user.name == "Async" {
f.store(true, Ordering::SeqCst);
}
}
});
bus.emit(
"user-connected",
User {
name: "Async".into(),
},
);
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(received.load(Ordering::SeqCst));
}
// ── on_pattern ────────────────────────────────────────────────────────────
#[tokio::test]
async fn test_on_pattern_callback_receives_topic_and_payload() {
let bus = Arc::new(EventBus::new());
let created = Arc::new(AtomicBool::new(false));
let deleted = Arc::new(AtomicBool::new(false));
let c = Arc::clone(&created);
let d = Arc::clone(&deleted);
bus.on_pattern::<User, _>("user-*", move |topic, user| match topic.as_str() {
"user-created" if user.name == "Dave" => c.store(true, Ordering::SeqCst),
"user-deleted" if user.name == "Eve" => d.store(true, Ordering::SeqCst),
_ => {}
});
bus.emit(
"user-created",
User {
name: "Dave".into(),
},
);
bus.emit("user-deleted", User { name: "Eve".into() });
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
assert!(created.load(Ordering::SeqCst));
assert!(deleted.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_on_pattern_does_not_match_other_topics() {
let bus = Arc::new(EventBus::new());
let called = Arc::new(AtomicBool::new(false));
let flag = Arc::clone(&called);
bus.on_pattern::<User, _>("user-*", move |_, _| {
flag.store(true, Ordering::SeqCst);
});
bus.emit(
"server-created",
User {
name: "Ghost".into(),
},
);
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(!called.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_on_pattern_no_pre_registration_needed() {
// Le pattern est enregistré avant que le topic n'existe
let bus = Arc::new(EventBus::new());
let received = Arc::new(AtomicBool::new(false));
let flag = Arc::clone(&received);
bus.on_pattern::<User, _>("user-*", move |topic, user| {
if topic == "user-new-topic" && user.name == "Frank" {
flag.store(true, Ordering::SeqCst);
}
});
bus.emit(
"user-new-topic",
User {
name: "Frank".into(),
},
);
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(received.load(Ordering::SeqCst));
}
// ── on_pattern_async ──────────────────────────────────────────────────────
#[tokio::test]
async fn test_on_pattern_async_callback() {
let bus = Arc::new(EventBus::new());
let received = Arc::new(AtomicBool::new(false));
let flag = Arc::clone(&received);
bus.on_pattern_async::<User, _, _>("user-*", move |topic, user| {
let f = Arc::clone(&flag);
async move {
if topic == "user-created" && user.name == "Hank" {
f.store(true, Ordering::SeqCst);
}
}
});
bus.emit(
"user-created",
User {
name: "Hank".into(),
},
);
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(received.load(Ordering::SeqCst));
}
// ── on_raw + match_event! ─────────────────────────────────────────────────
#[tokio::test]
async fn test_on_raw_and_match_event_macro() {
let bus = Arc::new(EventBus::new());
let mut rx = bus.on_raw("user-connected");
bus.emit(
"user-connected",
User {
name: "Frank".into(),
},
);
let evt = rx.recv().await.unwrap();
let mut received_name = String::new();
match_event!(evt,
User => |u: User| { received_name = u.name.clone(); },
UdpMetric => |_m: UdpMetric| { panic!("mauvais type"); }
);
assert_eq!(received_name, "Frank");
}
// ── Utilitaires ────────────────────────────────────────────────────────────
#[test]
fn test_topics_list() {
let bus = EventBus::new();
// on_raw enregistre le canal (get_or_create)
let _rx1 = bus.on_raw("user-connected");
let _rx2 = bus.on_raw("udp-metrics-updated");
let mut topics = bus.topics();
topics.sort();
assert_eq!(topics, vec!["udp-metrics-updated", "user-connected"]);
}
#[tokio::test]
async fn test_emit_multiple_types_same_bus() {
let bus = Arc::new(EventBus::new());
let user_ok = Arc::new(AtomicBool::new(false));
let metric_ok = Arc::new(AtomicBool::new(false));
let u = Arc::clone(&user_ok);
let m = Arc::clone(&metric_ok);
bus.on::<User, _>("user-connected", move |user| {
if user.name == "Bob" {
u.store(true, Ordering::SeqCst);
}
});
bus.on::<UdpMetric, _>("udp-metrics-updated", move |metric| {
if (metric.value - 3.14).abs() < 0.001 {
m.store(true, Ordering::SeqCst);
}
});
bus.emit("user-connected", User { name: "Bob".into() });
bus.emit("udp-metrics-updated", UdpMetric { value: 3.14 });
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(user_ok.load(Ordering::SeqCst));
assert!(metric_ok.load(Ordering::SeqCst));
}
}
-199
View File
@@ -1,199 +0,0 @@
use std::any::Any;
use std::sync::Arc;
use tokio::sync::broadcast;
/// Type brut d'un événement : pointeur atomique vers n'importe quelle valeur.
pub type AnyEvent = Arc<dyn Any + Send + Sync>;
// ─────────────────────────────────────────────────────────────────────────────
// Erreurs
// ─────────────────────────────────────────────────────────────────────────────
/// Erreurs possibles lors d'un `recv().await`.
#[derive(Debug, thiserror::Error)]
pub enum RecvError {
#[error("le canal est fermé")]
Closed,
#[error("messages perdus (lag) : {0} ignorés")]
Lagged(u64),
}
/// Erreurs possibles lors d'un `try_recv()`.
#[derive(Debug, thiserror::Error)]
pub enum TryRecvError {
#[error("aucun message disponible")]
Empty,
#[error("le canal est fermé")]
Closed,
#[error("messages perdus (lag) : {0} ignorés")]
Lagged(u64),
}
impl From<broadcast::error::RecvError> for RecvError {
fn from(e: broadcast::error::RecvError) -> Self {
match e {
broadcast::error::RecvError::Closed => RecvError::Closed,
broadcast::error::RecvError::Lagged(n) => RecvError::Lagged(n),
}
}
}
impl From<broadcast::error::TryRecvError> for TryRecvError {
fn from(e: broadcast::error::TryRecvError) -> Self {
match e {
broadcast::error::TryRecvError::Empty => TryRecvError::Empty,
broadcast::error::TryRecvError::Closed => TryRecvError::Closed,
broadcast::error::TryRecvError::Lagged(n) => TryRecvError::Lagged(n),
}
}
}
// ─────────────────────────────────────────────────────────────────────────────
// TypedReceiver<T>
// ─────────────────────────────────────────────────────────────────────────────
/// Receiver typé pour un topic précis.
///
/// `recv().await` retourne directement un `T` — le downcast est automatique.
/// Les événements d'un type différent sont ignorés silencieusement.
///
/// # Exemple
/// ```rust,no_run
/// # use std::sync::Arc;
/// # use oxspeak_server_lib::event_bus::EventBus;
/// # #[derive(Clone, Debug)] struct User { name: String }
/// # let bus = Arc::new(EventBus::new());
/// # tokio_test::block_on(async {
/// let mut rx = bus.on::<User>("user-connected");
/// bus.emit("user-connected", User { name: "Alice".into() });
/// let user = rx.recv().await.unwrap();
/// println!("{:?}", user);
/// # });
/// ```
pub struct TypedReceiver<T> {
pub(crate) inner: broadcast::Receiver<AnyEvent>,
pub(crate) _marker: std::marker::PhantomData<T>,
}
impl<T> TypedReceiver<T>
where
T: Any + Send + Sync + Clone + 'static,
{
pub(crate) fn new(inner: broadcast::Receiver<AnyEvent>) -> Self {
Self {
inner,
_marker: std::marker::PhantomData,
}
}
/// Attend le prochain événement de type `T` sur ce topic.
/// Les événements d'un autre type sont ignorés silencieusement.
pub async fn recv(&mut self) -> Result<T, RecvError> {
loop {
match self.inner.recv().await {
Ok(evt) => {
if let Some(val) = evt.downcast_ref::<T>() {
return Ok(val.clone());
}
// Mauvais type → on ignore et on attend le suivant
}
Err(e) => return Err(e.into()),
}
}
}
/// Version non-bloquante.
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
loop {
match self.inner.try_recv() {
Ok(evt) => {
if let Some(val) = evt.downcast_ref::<T>() {
return Ok(val.clone());
}
}
Err(broadcast::error::TryRecvError::Empty) => return Err(TryRecvError::Empty),
Err(e) => return Err(e.into()),
}
}
}
/// Accès au receiver brut sous-jacent.
pub fn into_inner(self) -> broadcast::Receiver<AnyEvent> {
self.inner
}
}
// ─────────────────────────────────────────────────────────────────────────────
// PatternReceiver<T>
// ─────────────────────────────────────────────────────────────────────────────
/// Receiver unique pour un pattern wildcard (ex: `"user-*"`).
///
/// Un seul receiver reçoit les événements de **tous** les topics correspondants,
/// passés et futurs. `recv().await` retourne `(nom_du_topic, valeur)`.
///
/// # Exemple
/// ```rust,no_run
/// # use std::sync::Arc;
/// # use oxspeak_server_lib::event_bus::EventBus;
/// # #[derive(Clone, Debug)] struct User { name: String }
/// # let bus = Arc::new(EventBus::new());
/// # tokio_test::block_on(async {
/// let mut rx = bus.on_pattern::<User>("user-*");
///
/// bus.emit("user-created", User { name: "Alice".into() });
/// bus.emit("user-deleted", User { name: "Bob".into() });
///
/// let (topic, user) = rx.recv().await.unwrap();
/// println!("{}: {:?}", topic, user);
/// # });
/// ```
pub struct PatternReceiver<T> {
pub(crate) inner: broadcast::Receiver<(String, AnyEvent)>,
pub(crate) _marker: std::marker::PhantomData<T>,
}
impl<T> PatternReceiver<T>
where
T: Any + Send + Sync + Clone + 'static,
{
pub(crate) fn new(inner: broadcast::Receiver<(String, AnyEvent)>) -> Self {
Self {
inner,
_marker: std::marker::PhantomData,
}
}
/// Attend le prochain événement de type `T` sur n'importe quel topic du pattern.
/// Retourne `(nom_du_topic, valeur)`.
/// Les événements d'un type différent sont ignorés silencieusement.
pub async fn recv(&mut self) -> Result<(String, T), RecvError> {
loop {
match self.inner.recv().await {
Ok((topic, evt)) => {
if let Some(val) = evt.downcast_ref::<T>() {
return Ok((topic, val.clone()));
}
// Mauvais type → on ignore et on attend le suivant
}
Err(e) => return Err(e.into()),
}
}
}
/// Version non-bloquante.
pub fn try_recv(&mut self) -> Result<(String, T), TryRecvError> {
loop {
match self.inner.try_recv() {
Ok((topic, evt)) => {
if let Some(val) = evt.downcast_ref::<T>() {
return Ok((topic, val.clone()));
}
}
Err(broadcast::error::TryRecvError::Empty) => return Err(TryRecvError::Empty),
Err(e) => return Err(e.into()),
}
}
}
}
-66
View File
@@ -1,66 +0,0 @@
/// Vérifie si `topic` correspond au `pattern` avec support des wildcards.
///
/// - `*` correspond à n'importe quelle séquence de caractères (y compris vide)
/// - `?` correspond à exactement un caractère quelconque
///
/// # Exemples
/// ```
/// use oxspeak_server_lib::event_bus::wildcard_match;
///
/// assert!(wildcard_match("user-*", "user-connected"));
/// assert!(wildcard_match("user-*", "user-created"));
/// assert!(!wildcard_match("user-*", "udp-metrics-updated"));
/// assert!(wildcard_match("*-updated", "udp-metrics-updated"));
/// assert!(wildcard_match("user-?", "user-a"));
/// assert!(!wildcard_match("user-?", "user-ab"));
/// ```
pub fn wildcard_match(pattern: &str, topic: &str) -> bool {
let p: Vec<char> = pattern.chars().collect();
let t: Vec<char> = topic.chars().collect();
wildcard_match_inner(&p, &t)
}
fn wildcard_match_inner(pattern: &[char], topic: &[char]) -> bool {
match (pattern.first(), topic.first()) {
(None, None) => true,
(Some(&'*'), _) => {
// '*' peut correspondre à 0 ou plusieurs caractères
wildcard_match_inner(&pattern[1..], topic)
|| (!topic.is_empty() && wildcard_match_inner(pattern, &topic[1..]))
}
(Some(&'?'), Some(_)) => wildcard_match_inner(&pattern[1..], &topic[1..]),
(Some(p), Some(t)) if p == t => wildcard_match_inner(&pattern[1..], &topic[1..]),
_ => false,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_wildcard_exact() {
assert!(wildcard_match("user-connected", "user-connected"));
assert!(!wildcard_match("user-connected", "user-created"));
}
#[test]
fn test_wildcard_star() {
assert!(wildcard_match("user-*", "user-connected"));
assert!(wildcard_match("user-*", "user-created"));
assert!(wildcard_match("user-*", "user-deleted"));
assert!(!wildcard_match("user-*", "udp-metrics-updated"));
}
#[test]
fn test_wildcard_question_mark() {
assert!(wildcard_match("user-?", "user-a"));
assert!(!wildcard_match("user-?", "user-ab"));
}
#[test]
fn test_wildcard_star_anywhere() {
assert!(wildcard_match("*-updated", "udp-metrics-updated"));
assert!(wildcard_match("*metrics*", "udp-metrics-updated"));
}
}
+162
View File
@@ -0,0 +1,162 @@
//! # Event Bus
//!
//! Un bus d'événements asynchrone permettant de faire transiter des messages typés
//! entre plusieurs modules, sans couplage direct.
//!
//! ## Caractéristiques
//!
//! - **Association clé → événement** : chaque topic (`&str`) est indépendant
//! - **Sans restriction de type** : n'importe quel `T: Any + Send + Sync + Clone`
//! - **Wake-up ciblé** : seuls les abonnés du bon topic sont réveillés
//! - **API callback** : style JavaScript — `bus.on("topic", |payload| { ... })`
//! - **Pattern glob** : `on_pattern("user-*", |topic, payload| { ... })`
//! - **Handlers async** : `on_async` et `on_pattern_async`
//!
//! ## Exemple — callback sync
//!
//! ```rust,no_run
//! use std::sync::Arc;
//! use oxspeak_server_lib::event_bus::EventBus;
//!
//! #[derive(Clone, Debug)]
//! struct User { name: String }
//!
//! # tokio_test::block_on(async {
//! let bus = Arc::new(EventBus::new());
//!
//! bus.on::<User>("user-connected", |user| {
//! println!("Connecté : {:?}", user);
//! });
//!
//! bus.emit("user-connected", User { name: "Alice".into() });
//! # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
//! # });
//! ```
//!
//! ## Exemple — callback async
//!
//! ```rust,no_run
//! use std::sync::Arc;
//! use oxspeak_server_lib::event_bus::EventBus;
//!
//! #[derive(Clone, Debug)]
//! struct User { name: String }
//!
//! # tokio_test::block_on(async {
//! let bus = Arc::new(EventBus::new());
//!
//! bus.on_async::<User, _, _>("user-connected", |user| async move {
//! println!("(async) Connecté : {:?}", user);
//! });
//!
//! bus.emit("user-connected", User { name: "Bob".into() });
//! # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
//! # });
//! ```
//!
//! ## Exemple — pattern glob (topic inclus dans le callback)
//!
//! ```rust,no_run
//! use std::sync::Arc;
//! use oxspeak_server_lib::event_bus::EventBus;
//!
//! #[derive(Clone, Debug)]
//! struct User { name: String }
//!
//! # tokio_test::block_on(async {
//! let bus = Arc::new(EventBus::new());
//!
//! bus.on_pattern::<User, _>("user-*", |topic, user| {
//! match topic.as_str() {
//! "user-created" => println!("Créé : {:?}", user),
//! "user-deleted" => println!("Supprimé : {:?}", user),
//! other => println!("{}: {:?}", other, user),
//! }
//! });
//!
//! bus.emit("user-created", User { name: "Alice".into() });
//! bus.emit("user-deleted", User { name: "Bob".into() });
//! # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
//! # });
//! ```
//!
//! ## Exemple — multi-types avec `match_event!` (cas avancé)
//!
//! ```rust,no_run
//! use std::sync::Arc;
//! use oxspeak_server_lib::event_bus::EventBus;
//! use oxspeak_server_lib::match_event;
//!
//! #[derive(Clone, Debug)] struct User { name: String }
//! #[derive(Clone, Debug)] struct UdpMetric { value: f32 }
//!
//! # tokio_test::block_on(async {
//! let bus = Arc::new(EventBus::new());
//! let mut rx = bus.on_raw("mixed-topic");
//!
//! bus.emit("mixed-topic", User { name: "Alice".into() });
//!
//! if let Ok(evt) = rx.recv().await {
//! match_event!(evt,
//! User => |u| println!("User: {:?}", u),
//! UdpMetric => |m| println!("Metric: {:?}", m),
//! );
//! }
//! # });
//! ```
mod bus;
// Réexports publics
pub use bus::{AnyEvent, EventBus};
/// Downcaste un [`AnyEvent`] vers un ou plusieurs types concrets et exécute
/// la closure correspondante si le type correspond.
///
/// Les branches non correspondantes sont ignorées silencieusement.
///
/// # Syntaxe
/// ```text
/// match_event!(evt, Type1 => |val| { ... }, Type2 => |val| { ... })
/// ```
///
/// # Exemple
/// ```rust,no_run
/// # use std::sync::Arc;
/// # use oxspeak_server_lib::event_bus::EventBus;
/// # use oxspeak_server_lib::match_event;
/// # #[derive(Clone, Debug)] struct User { name: String }
/// # #[derive(Clone, Debug)] struct UdpMetric { value: f32 }
/// # let bus = Arc::new(EventBus::new());
/// # tokio_test::block_on(async {
/// let mut rx = bus.on_raw("user-connected");
/// bus.emit("user-connected", User { name: "Alice".into() });
///
/// if let Ok(evt) = rx.recv().await {
/// match_event!(evt,
/// User => |u| println!("Utilisateur : {:?}", u),
/// UdpMetric => |m| println!("Metric : {:?}", m),
/// );
/// }
/// # });
/// ```
#[macro_export]
macro_rules! match_event {
($evt:expr, $($type:ty => $handler:expr),+ $(,)?) => {
$(
if let Some(val) = ($evt).downcast_ref::<$type>() {
($handler)(val.clone());
} else
)+
{
// Aucun type ne correspond → on ignore silencieusement
}
};
}
// ─────────────────────────────────────────────────────────────────────────────
// Tests
// ─────────────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests;
+293
View File
@@ -0,0 +1,293 @@
use crate::{match_event, EventBus};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
#[derive(Clone, Debug, PartialEq)]
struct User {
name: String,
}
#[derive(Clone, Debug, PartialEq)]
struct UdpMetric {
value: f32,
}
// ── on (callback sync) ────────────────────────────────────────────────────
#[tokio::test]
async fn test_on_callback_sync() {
let bus = Arc::new(EventBus::new());
let received = Arc::new(AtomicBool::new(false));
let flag = Arc::clone(&received);
bus.on::<User, _>("user-connected", move |user| {
if user.name == "Alice" {
flag.store(true, Ordering::SeqCst);
}
});
bus.emit(
"user-connected",
User {
name: "Alice".into(),
},
);
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(received.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_on_targeted_wakeup() {
// Émettre sur "user-connected" ne doit pas réveiller "udp-metrics-updated"
let bus = Arc::new(EventBus::new());
let metric_called = Arc::new(AtomicBool::new(false));
let flag = Arc::clone(&metric_called);
bus.on::<UdpMetric, _>("udp-metrics-updated", move |_| {
flag.store(true, Ordering::SeqCst);
});
bus.emit(
"user-connected",
User {
name: "Carol".into(),
},
);
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(!metric_called.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_on_type_mismatch_ignored() {
// Émettre un UdpMetric sur un topic écouté en User → handler pas appelé
let bus = Arc::new(EventBus::new());
let called = Arc::new(AtomicBool::new(false));
let flag = Arc::clone(&called);
bus.on::<User, _>("mixed-topic", move |_| {
flag.store(true, Ordering::SeqCst);
});
bus.emit("mixed-topic", UdpMetric { value: 1.0 });
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(!called.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_on_multiple_subscribers_same_topic() {
let bus = Arc::new(EventBus::new());
let count = Arc::new(AtomicU32::new(0));
for _ in 0..3 {
let c = Arc::clone(&count);
bus.on::<User, _>("user-connected", move |_| {
c.fetch_add(1, Ordering::SeqCst);
});
}
bus.emit(
"user-connected",
User {
name: "Grace".into(),
},
);
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert_eq!(count.load(Ordering::SeqCst), 3);
}
// ── on_async ──────────────────────────────────────────────────────────────
#[tokio::test]
async fn test_on_async_callback() {
let bus = Arc::new(EventBus::new());
let received = Arc::new(AtomicBool::new(false));
let flag = Arc::clone(&received);
bus.on_async::<User, _, _>("user-connected", move |user| {
let f = Arc::clone(&flag);
async move {
if user.name == "Async" {
f.store(true, Ordering::SeqCst);
}
}
});
bus.emit(
"user-connected",
User {
name: "Async".into(),
},
);
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(received.load(Ordering::SeqCst));
}
// ── on_pattern ────────────────────────────────────────────────────────────
#[tokio::test]
async fn test_on_pattern_callback_receives_topic_and_payload() {
let bus = Arc::new(EventBus::new());
let created = Arc::new(AtomicBool::new(false));
let deleted = Arc::new(AtomicBool::new(false));
let c = Arc::clone(&created);
let d = Arc::clone(&deleted);
bus.on_pattern::<User, _>("user-*", move |topic, user| match topic.as_str() {
"user-created" if user.name == "Dave" => c.store(true, Ordering::SeqCst),
"user-deleted" if user.name == "Eve" => d.store(true, Ordering::SeqCst),
_ => {}
});
bus.emit(
"user-created",
User {
name: "Dave".into(),
},
);
bus.emit("user-deleted", User { name: "Eve".into() });
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
assert!(created.load(Ordering::SeqCst));
assert!(deleted.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_on_pattern_does_not_match_other_topics() {
let bus = Arc::new(EventBus::new());
let called = Arc::new(AtomicBool::new(false));
let flag = Arc::clone(&called);
bus.on_pattern::<User, _>("user-*", move |_, _| {
flag.store(true, Ordering::SeqCst);
});
bus.emit(
"server-created",
User {
name: "Ghost".into(),
},
);
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(!called.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_on_pattern_no_pre_registration_needed() {
// Le pattern est enregistré avant que le topic n'existe
let bus = Arc::new(EventBus::new());
let received = Arc::new(AtomicBool::new(false));
let flag = Arc::clone(&received);
bus.on_pattern::<User, _>("user-*", move |topic, user| {
if topic == "user-new-topic" && user.name == "Frank" {
flag.store(true, Ordering::SeqCst);
}
});
bus.emit(
"user-new-topic",
User {
name: "Frank".into(),
},
);
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(received.load(Ordering::SeqCst));
}
// ── on_pattern_async ──────────────────────────────────────────────────────
#[tokio::test]
async fn test_on_pattern_async_callback() {
let bus = Arc::new(EventBus::new());
let received = Arc::new(AtomicBool::new(false));
let flag = Arc::clone(&received);
bus.on_pattern_async::<User, _, _>("user-*", move |topic, user| {
let f = Arc::clone(&flag);
async move {
if topic == "user-created" && user.name == "Hank" {
f.store(true, Ordering::SeqCst);
}
}
});
bus.emit(
"user-created",
User {
name: "Hank".into(),
},
);
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(received.load(Ordering::SeqCst));
}
// ── on_raw + match_event! ─────────────────────────────────────────────────
#[tokio::test]
async fn test_on_raw_and_match_event_macro() {
let bus = Arc::new(EventBus::new());
let mut rx = bus.on_raw("user-connected");
bus.emit(
"user-connected",
User {
name: "Frank".into(),
},
);
let evt = rx.recv().await.unwrap();
let mut received_name = String::new();
match_event!(evt,
User => |u: User| { received_name = u.name.clone(); },
UdpMetric => |_m: UdpMetric| { panic!("mauvais type"); }
);
assert_eq!(received_name, "Frank");
}
// ── Utilitaires ────────────────────────────────────────────────────────────
#[test]
fn test_topics_list() {
let bus = EventBus::new();
// on_raw enregistre le canal (get_or_create)
let _rx1 = bus.on_raw("user-connected");
let _rx2 = bus.on_raw("udp-metrics-updated");
let mut topics = bus.topics();
topics.sort();
assert_eq!(topics, vec!["udp-metrics-updated", "user-connected"]);
}
#[tokio::test]
async fn test_emit_multiple_types_same_bus() {
let bus = Arc::new(EventBus::new());
let user_ok = Arc::new(AtomicBool::new(false));
let metric_ok = Arc::new(AtomicBool::new(false));
let u = Arc::clone(&user_ok);
let m = Arc::clone(&metric_ok);
bus.on::<User, _>("user-connected", move |user| {
if user.name == "Bob" {
u.store(true, Ordering::SeqCst);
}
});
bus.on::<UdpMetric, _>("udp-metrics-updated", move |metric| {
if (metric.value - 3.14).abs() < 0.001 {
m.store(true, Ordering::SeqCst);
}
});
bus.emit("user-connected", User { name: "Bob".into() });
bus.emit("udp-metrics-updated", UdpMetric { value: 3.14 });
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(user_ok.load(Ordering::SeqCst));
assert!(metric_ok.load(Ordering::SeqCst));
}