diff --git a/Cargo.lock b/Cargo.lock index 87a3098..434b761 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -42,6 +42,15 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd" +[[package]] +name = "alloca" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5a7d05ea6aea7e9e64d25b9156ba2fee3fdd659e34e41063cd2fc7cd020d7f4" +dependencies = [ + "cc", +] + [[package]] name = "allocator-api2" version = "0.2.21" @@ -57,6 +66,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "anstream" version = "1.0.0" @@ -596,6 +611,12 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "cc" version = "1.2.60" @@ -641,6 +662,33 @@ dependencies = [ "windows-link", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "clap" version = "4.6.1" @@ -790,6 +838,61 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "criterion" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "950046b2aa2492f9a536f5f4f9a3de7b9e2476e575e05bd6c333371add4d98f3" +dependencies = [ + "alloca", + "anes", + "cast", + "ciborium", + "clap", + "criterion-plot", + "itertools 0.13.0", + "num-traits", + "oorandom", + "page_size", + "plotters", + "rayon", + "regex", + "serde", + "serde_json", + "tinytemplate", + "tokio", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8d80a2f4f5b554395e47b5d8305bc3d27813bacb73493eb1001e8f76dae29ea" +dependencies = [ + "cast", + "itertools 0.13.0", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-queue" version = "0.3.12" @@ -1023,9 +1126,11 @@ dependencies = [ name = "event_bus" version = "0.1.0" dependencies = [ + "criterion", "glob", "parking_lot", "tokio", + "tracing", ] [[package]] @@ -1514,6 +1619,15 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.14.0" @@ -1867,6 +1981,12 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" +[[package]] +name = "oorandom" +version = "11.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" + [[package]] name = "ordered-float" version = "4.6.0" @@ -1932,6 +2052,16 @@ dependencies = [ "uuid", ] +[[package]] +name = "page_size" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30d5b2194ed13191c1999ae0704b7839fb18384fa22e49b57eeaa97d79ce40da" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "parking" version = "2.2.1" @@ -2090,6 +2220,34 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" +[[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + [[package]] name = "pluralizer" version = "0.5.0" @@ -2295,6 +2453,26 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69" +[[package]] +name = "rayon" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb39b166781f92d482534ef4b4b1b2568f42613b53e5b6c160e24cfbfa30926d" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.5.18" @@ -2475,6 +2653,15 @@ version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -2506,7 +2693,7 @@ dependencies = [ "chrono", "derive_more", "futures-util", - "itertools", + "itertools 0.14.0", "log", "mac_address", "ouroboros", @@ -2563,7 +2750,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae1374d83dd5b43f14dcc90fc726486c556f4db774b680b12b8c680af76e8233" dependencies = [ "heck 0.5.0", - "itertools", + "itertools 0.14.0", "pluralizer", "proc-macro2", "quote", @@ -3210,6 +3397,16 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.11.0" @@ -3227,9 +3424,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.52.1" +version = "1.52.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b67dee974fe86fd92cc45b7a95fdd2f99a36a6d7b0d431a231178d3d670bbcc6" +checksum = "110a78583f19d5cdb2c5ccf321d1290344e71313c6c37d43520d386027d18386" dependencies = [ "bytes", "libc", @@ -3520,6 +3717,16 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -3640,6 +3847,16 @@ dependencies = [ "semver", ] +[[package]] +name = "web-sys" +version = "0.3.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f2dfbb17949fa2088e5d39408c48368947b86f7834484e87b73de55bc14d97d" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "whoami" version = "1.6.1" @@ -3666,6 +3883,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 87da2e1..6d0b55e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ crate-type = ["rlib"] members = [".", "migration", "event_bus"] [dependencies] -tokio = { version = "1.52.1", features = ["full"] } +tokio = { version = "1.52.2", features = ["full"] } config = "0.15.22" sea-orm = { version = "2.0.0-rc.38", features = ["sqlx-sqlite", "sqlx-postgres", "sqlx-mysql", "runtime-tokio", "with-chrono", "with-uuid", "with-json", "schema-sync"] } migration = { path = "migration" } diff --git a/event_bus/Cargo.toml b/event_bus/Cargo.toml index 58fa0c4..4546334 100644 --- a/event_bus/Cargo.toml +++ b/event_bus/Cargo.toml @@ -8,10 +8,16 @@ publish = false name = "event_bus" path = "src/lib.rs" +[[bench]] +name = "event_bus_throughput" +harness = false + [dependencies] tokio = { version = "1.52.1", default-features = false, features = ["rt", "sync"] } glob = "0.3.3" parking_lot = "0.12.5" +tracing = "0.1" [dev-dependencies] -tokio = { version = "1.52.1", default-features = false, features = ["rt", "rt-multi-thread", "macros", "time", "sync"] } \ No newline at end of file +tokio = { version = "1.52.1", default-features = false, features = ["rt", "rt-multi-thread", "macros", "time", "sync"] } +criterion = { version = "0.8.2", features = ["async_tokio"] } \ No newline at end of file diff --git a/event_bus/benches/event_bus_throughput.rs b/event_bus/benches/event_bus_throughput.rs new file mode 100644 index 0000000..92cc0e7 --- /dev/null +++ b/event_bus/benches/event_bus_throughput.rs @@ -0,0 +1,665 @@ +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; +use std::time::Instant; + +use criterion::{criterion_group, criterion_main, Criterion, Throughput}; +use event_bus::EventBus; +use tokio::runtime::Runtime; + +const TOPIC: &str = "bench-topic"; +const PATTERN: &str = "bench-*"; + +#[derive(Clone)] +struct SmallEvent { + value: u64, +} + +#[derive(Clone)] +struct StringEvent { + id: u64, + name: String, + message: String, +} + +#[derive(Clone)] +struct VecEvent { + id: u64, + payload: Vec, +} + +#[derive(Clone)] +struct ArcPayloadEvent { + id: u64, + payload: Arc<[u8]>, +} + +fn runtime() -> Runtime { + Runtime::new().expect("failed to create tokio runtime") +} + +fn wait_until_received( + received: &AtomicU64, + expected: u64, +) -> impl std::future::Future + '_ { + async move { + while received.load(Ordering::Relaxed) < expected { + tokio::task::yield_now().await; + } + } +} + +fn bench_emit_no_subscriber(c: &mut Criterion) { + let mut group = c.benchmark_group("event_bus/no_subscriber"); + group.throughput(Throughput::Elements(1)); + + let bus = EventBus::with_capacity(1024); + + group.bench_function("u64", |b| { + b.iter(|| { + bus.emit(TOPIC, 42_u64); + }); + }); + + group.bench_function("small_struct", |b| { + b.iter(|| { + bus.emit(TOPIC, SmallEvent { value: 42 }); + }); + }); + + group.bench_function("string_struct", |b| { + b.iter(|| { + bus.emit( + TOPIC, + StringEvent { + id: 42, + name: "Alice".to_string(), + message: "hello from benchmark".to_string(), + }, + ); + }); + }); + + group.bench_function("vec_payload_1kb", |b| { + b.iter(|| { + bus.emit( + TOPIC, + VecEvent { + id: 42, + payload: vec![7_u8; 1024], + }, + ); + }); + }); + + let shared_payload: Arc<[u8]> = Arc::from(vec![7_u8; 1024].into_boxed_slice()); + + group.bench_function("arc_payload_1kb", |b| { + b.iter(|| { + bus.emit( + TOPIC, + ArcPayloadEvent { + id: 42, + payload: Arc::clone(&shared_payload), + }, + ); + }); + }); + + group.finish(); +} + +fn bench_raw_subscriber(c: &mut Criterion) { + let rt = runtime(); + + let mut group = c.benchmark_group("event_bus/raw_subscriber"); + group.throughput(Throughput::Elements(1)); + + group.bench_function("u64", |b| { + b.to_async(&rt).iter_custom(|iters| async move { + let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); + let mut rx = bus.on_raw(TOPIC); + + let received = Arc::new(AtomicU64::new(0)); + let receiver_count = Arc::clone(&received); + + let receiver = tokio::spawn(async move { + while receiver_count.load(Ordering::Relaxed) < iters { + if rx.recv().await.is_ok() { + receiver_count.fetch_add(1, Ordering::Relaxed); + } + } + }); + + let start = Instant::now(); + + for i in 0..iters { + bus.emit(TOPIC, i); + } + + wait_until_received(&received, iters).await; + + let elapsed = start.elapsed(); + + receiver.abort(); + + elapsed + }); + }); + + group.bench_function("small_struct", |b| { + b.to_async(&rt).iter_custom(|iters| async move { + let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); + let mut rx = bus.on_raw(TOPIC); + + let received = Arc::new(AtomicU64::new(0)); + let receiver_count = Arc::clone(&received); + + let receiver = tokio::spawn(async move { + while receiver_count.load(Ordering::Relaxed) < iters { + if rx.recv().await.is_ok() { + receiver_count.fetch_add(1, Ordering::Relaxed); + } + } + }); + + let start = Instant::now(); + + for i in 0..iters { + bus.emit(TOPIC, SmallEvent { value: i }); + } + + wait_until_received(&received, iters).await; + + let elapsed = start.elapsed(); + + receiver.abort(); + + elapsed + }); + }); + + group.bench_function("string_struct", |b| { + b.to_async(&rt).iter_custom(|iters| async move { + let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); + let mut rx = bus.on_raw(TOPIC); + + let received = Arc::new(AtomicU64::new(0)); + let receiver_count = Arc::clone(&received); + + let receiver = tokio::spawn(async move { + while receiver_count.load(Ordering::Relaxed) < iters { + if rx.recv().await.is_ok() { + receiver_count.fetch_add(1, Ordering::Relaxed); + } + } + }); + + let start = Instant::now(); + + for i in 0..iters { + bus.emit( + TOPIC, + StringEvent { + id: i, + name: "Alice".to_string(), + message: "hello from benchmark".to_string(), + }, + ); + } + + wait_until_received(&received, iters).await; + + let elapsed = start.elapsed(); + + receiver.abort(); + + elapsed + }); + }); + + group.bench_function("vec_payload_1kb", |b| { + b.to_async(&rt).iter_custom(|iters| async move { + let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); + let mut rx = bus.on_raw(TOPIC); + + let received = Arc::new(AtomicU64::new(0)); + let receiver_count = Arc::clone(&received); + + let receiver = tokio::spawn(async move { + while receiver_count.load(Ordering::Relaxed) < iters { + if rx.recv().await.is_ok() { + receiver_count.fetch_add(1, Ordering::Relaxed); + } + } + }); + + let start = Instant::now(); + + for i in 0..iters { + bus.emit( + TOPIC, + VecEvent { + id: i, + payload: vec![7_u8; 1024], + }, + ); + } + + wait_until_received(&received, iters).await; + + let elapsed = start.elapsed(); + + receiver.abort(); + + elapsed + }); + }); + + group.bench_function("arc_payload_1kb", |b| { + b.to_async(&rt).iter_custom(|iters| async move { + let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); + let mut rx = bus.on_raw(TOPIC); + + let payload: Arc<[u8]> = Arc::from(vec![7_u8; 1024].into_boxed_slice()); + + let received = Arc::new(AtomicU64::new(0)); + let receiver_count = Arc::clone(&received); + + let receiver = tokio::spawn(async move { + while receiver_count.load(Ordering::Relaxed) < iters { + if rx.recv().await.is_ok() { + receiver_count.fetch_add(1, Ordering::Relaxed); + } + } + }); + + let start = Instant::now(); + + for i in 0..iters { + bus.emit( + TOPIC, + ArcPayloadEvent { + id: i, + payload: Arc::clone(&payload), + }, + ); + } + + wait_until_received(&received, iters).await; + + let elapsed = start.elapsed(); + + receiver.abort(); + + elapsed + }); + }); + + group.finish(); +} + +fn bench_typed_callback(c: &mut Criterion) { + let rt = runtime(); + + let mut group = c.benchmark_group("event_bus/typed_callback"); + group.throughput(Throughput::Elements(1)); + + group.bench_function("u64", |b| { + b.to_async(&rt).iter_custom(|iters| async move { + let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); + + let received = Arc::new(AtomicU64::new(0)); + let handler_count = Arc::clone(&received); + + let subscription = bus.on::(TOPIC, move |event| { + let _ = event; + handler_count.fetch_add(1, Ordering::Relaxed); + }); + + let start = Instant::now(); + + for i in 0..iters { + bus.emit(TOPIC, i); + } + + wait_until_received(&received, iters).await; + + let elapsed = start.elapsed(); + + subscription.abort(); + + elapsed + }); + }); + + group.bench_function("small_struct", |b| { + b.to_async(&rt).iter_custom(|iters| async move { + let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); + + let received = Arc::new(AtomicU64::new(0)); + let handler_count = Arc::clone(&received); + + let subscription = bus.on::(TOPIC, move |event| { + let _ = event.value; + handler_count.fetch_add(1, Ordering::Relaxed); + }); + + let start = Instant::now(); + + for i in 0..iters { + bus.emit(TOPIC, SmallEvent { value: i }); + } + + wait_until_received(&received, iters).await; + + let elapsed = start.elapsed(); + + subscription.abort(); + + elapsed + }); + }); + + group.bench_function("string_struct", |b| { + b.to_async(&rt).iter_custom(|iters| async move { + let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); + + let received = Arc::new(AtomicU64::new(0)); + let handler_count = Arc::clone(&received); + + let subscription = bus.on::(TOPIC, move |event| { + let _ = event.id; + let _ = event.name.len(); + let _ = event.message.len(); + handler_count.fetch_add(1, Ordering::Relaxed); + }); + + let start = Instant::now(); + + for i in 0..iters { + bus.emit( + TOPIC, + StringEvent { + id: i, + name: "Alice".to_string(), + message: "hello from benchmark".to_string(), + }, + ); + } + + wait_until_received(&received, iters).await; + + let elapsed = start.elapsed(); + + subscription.abort(); + + elapsed + }); + }); + + group.bench_function("vec_payload_1kb", |b| { + b.to_async(&rt).iter_custom(|iters| async move { + let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); + + let received = Arc::new(AtomicU64::new(0)); + let handler_count = Arc::clone(&received); + + let subscription = bus.on::(TOPIC, move |event| { + let _ = event.id; + let _ = event.payload.len(); + handler_count.fetch_add(1, Ordering::Relaxed); + }); + + let start = Instant::now(); + + for i in 0..iters { + bus.emit( + TOPIC, + VecEvent { + id: i, + payload: vec![7_u8; 1024], + }, + ); + } + + wait_until_received(&received, iters).await; + + let elapsed = start.elapsed(); + + subscription.abort(); + + elapsed + }); + }); + + group.bench_function("arc_payload_1kb", |b| { + b.to_async(&rt).iter_custom(|iters| async move { + let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); + let payload: Arc<[u8]> = Arc::from(vec![7_u8; 1024].into_boxed_slice()); + + let received = Arc::new(AtomicU64::new(0)); + let handler_count = Arc::clone(&received); + + let subscription = bus.on::(TOPIC, move |event| { + let _ = event.id; + let _ = event.payload.len(); + handler_count.fetch_add(1, Ordering::Relaxed); + }); + + let start = Instant::now(); + + for i in 0..iters { + bus.emit( + TOPIC, + ArcPayloadEvent { + id: i, + payload: Arc::clone(&payload), + }, + ); + } + + wait_until_received(&received, iters).await; + + let elapsed = start.elapsed(); + + subscription.abort(); + + elapsed + }); + }); + + group.finish(); +} + +fn bench_pattern_callback(c: &mut Criterion) { + let rt = runtime(); + + let mut group = c.benchmark_group("event_bus/pattern_callback"); + group.throughput(Throughput::Elements(1)); + + group.bench_function("small_struct", |b| { + b.to_async(&rt).iter_custom(|iters| async move { + let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); + + let received = Arc::new(AtomicU64::new(0)); + let handler_count = Arc::clone(&received); + + let subscription = bus.on_pattern::(PATTERN, move |topic, event| { + let _ = topic; + let _ = event.value; + handler_count.fetch_add(1, Ordering::Relaxed); + }); + + let start = Instant::now(); + + for i in 0..iters { + bus.emit(TOPIC, SmallEvent { value: i }); + } + + wait_until_received(&received, iters).await; + + let elapsed = start.elapsed(); + + subscription.abort(); + + elapsed + }); + }); + + group.bench_function("arc_payload_1kb", |b| { + b.to_async(&rt).iter_custom(|iters| async move { + let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); + let payload: Arc<[u8]> = Arc::from(vec![7_u8; 1024].into_boxed_slice()); + + let received = Arc::new(AtomicU64::new(0)); + let handler_count = Arc::clone(&received); + + let subscription = + bus.on_pattern::(PATTERN, move |topic, event| { + let _ = topic; + let _ = event.id; + let _ = event.payload.len(); + handler_count.fetch_add(1, Ordering::Relaxed); + }); + + let start = Instant::now(); + + for i in 0..iters { + bus.emit( + TOPIC, + ArcPayloadEvent { + id: i, + payload: Arc::clone(&payload), + }, + ); + } + + wait_until_received(&received, iters).await; + + let elapsed = start.elapsed(); + + subscription.abort(); + + elapsed + }); + }); + + group.finish(); +} + +fn bench_multiple_subscribers(c: &mut Criterion) { + let rt = runtime(); + + let mut group = c.benchmark_group("event_bus/multiple_subscribers"); + group.throughput(Throughput::Elements(1)); + + for subscriber_count in [1_u64, 2, 4, 8, 16, 32] { + group.bench_function(format!("{subscriber_count}_subscribers"), |b| { + b.to_async(&rt).iter_custom(|iters| async move { + let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); + + let expected = iters * subscriber_count; + let received = Arc::new(AtomicU64::new(0)); + let mut subscriptions = Vec::with_capacity(subscriber_count as usize); + + for _ in 0..subscriber_count { + let handler_count = Arc::clone(&received); + + let subscription = bus.on::(TOPIC, move |event| { + let _ = event.value; + handler_count.fetch_add(1, Ordering::Relaxed); + }); + + subscriptions.push(subscription); + } + + let start = Instant::now(); + + for i in 0..iters { + bus.emit(TOPIC, SmallEvent { value: i }); + } + + wait_until_received(&received, expected).await; + + let elapsed = start.elapsed(); + + for subscription in subscriptions { + subscription.abort(); + } + + elapsed + }); + }); + } + + group.finish(); +} + +fn bench_multiple_patterns(c: &mut Criterion) { + let rt = runtime(); + + let mut group = c.benchmark_group("event_bus/multiple_patterns"); + group.throughput(Throughput::Elements(1)); + + for pattern_count in [1_u64, 4, 16, 64, 256] { + group.bench_function(format!("{pattern_count}_patterns_one_match"), |b| { + b.to_async(&rt).iter_custom(|iters| async move { + let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); + + let received = Arc::new(AtomicU64::new(0)); + let mut subscriptions = Vec::with_capacity(pattern_count as usize); + + for index in 0..pattern_count { + let pattern = if index == 0 { + PATTERN.to_string() + } else { + format!("unused-{index}-*") + }; + + let handler_count = Arc::clone(&received); + + let subscription = + bus.on_pattern::(&pattern, move |topic, event| { + let _ = topic; + let _ = event.value; + handler_count.fetch_add(1, Ordering::Relaxed); + }); + + subscriptions.push(subscription); + } + + let start = Instant::now(); + + for i in 0..iters { + bus.emit(TOPIC, SmallEvent { value: i }); + } + + wait_until_received(&received, iters).await; + + let elapsed = start.elapsed(); + + for subscription in subscriptions { + subscription.abort(); + } + + elapsed + }); + }); + } + + group.finish(); +} + +criterion_group!( + benches, + bench_emit_no_subscriber, + bench_raw_subscriber, + bench_typed_callback, + bench_pattern_callback, + bench_multiple_subscribers, + bench_multiple_patterns, +); + +criterion_main!(benches); diff --git a/event_bus/src/bus.rs b/event_bus/src/bus.rs index 13a21eb..2176f34 100644 --- a/event_bus/src/bus.rs +++ b/event_bus/src/bus.rs @@ -7,20 +7,21 @@ use parking_lot::RwLock; use std::collections::HashMap; use tokio::sync::broadcast; use tokio::task::JoinHandle; +use tracing::{debug, trace, warn}; -/// Type brut d'un événement : pointeur atomique vers n'importe quelle valeur. +/// Raw event type: an atomic reference-counted pointer to any value. pub type AnyEvent = Arc; -/// Capacité par défaut du buffer de chaque canal broadcast. +/// Default buffer capacity for each broadcast channel. const DEFAULT_CAPACITY: usize = 64; -/// Le bus d'événements central. +/// The central event bus. /// -/// Partagez-le via `Arc` entre les modules. -/// Chaque topic possède son propre canal broadcast : seuls les abonnés du bon -/// topic sont réveillés lors d'un `emit` (wake-up ciblé). +/// Share it via `Arc` across modules. +/// Each topic has its own broadcast channel: only subscribers of the matching +/// topic are woken up on `emit` (targeted wake-up). /// -/// # Exemple minimal — callback sync +/// # Minimal example — sync callback /// ```rust,no_run /// use std::sync::Arc; /// use oxspeak_server_lib::event_bus::EventBus; @@ -32,7 +33,7 @@ const DEFAULT_CAPACITY: usize = 64; /// let bus = Arc::new(EventBus::new()); /// /// bus.on::("user-connected", |user| { -/// println!("Connecté : {:?}", user); +/// println!("Connected: {:?}", user); /// }); /// /// bus.emit("user-connected", User { name: "Alice".into() }); @@ -40,7 +41,7 @@ const DEFAULT_CAPACITY: usize = 64; /// # }); /// ``` /// -/// # Exemple — callback async +/// # Example — async callback /// ```rust,no_run /// use std::sync::Arc; /// use oxspeak_server_lib::event_bus::EventBus; @@ -52,7 +53,7 @@ const DEFAULT_CAPACITY: usize = 64; /// let bus = Arc::new(EventBus::new()); /// /// bus.on_async::("user-connected", |user| async move { -/// println!("(async) Connecté : {:?}", user); +/// println!("(async) Connected: {:?}", user); /// }); /// /// bus.emit("user-connected", User { name: "Bob".into() }); @@ -60,7 +61,7 @@ const DEFAULT_CAPACITY: usize = 64; /// # }); /// ``` /// -/// # Exemple — pattern glob avec topic +/// # Example — glob pattern with topic /// ```rust,no_run /// use std::sync::Arc; /// use oxspeak_server_lib::event_bus::EventBus; @@ -73,8 +74,8 @@ const DEFAULT_CAPACITY: usize = 64; /// /// bus.on_pattern::("user-*", |topic, user| { /// match topic.as_str() { -/// "user-created" => println!("Créé : {:?}", user), -/// "user-deleted" => println!("Supprimé : {:?}", user), +/// "user-created" => println!("Created : {:?}", user), +/// "user-deleted" => println!("Deleted : {:?}", user), /// other => println!("{}: {:?}", other, user), /// } /// }); @@ -85,16 +86,20 @@ const DEFAULT_CAPACITY: usize = 64; /// # }); /// ``` pub struct EventBus { - /// Canaux par topic exact. + /// Channels indexed by exact topic. channels: RwLock>>, - /// Canaux pour les souscriptions par pattern glob. + /// Channels for glob-pattern subscriptions. patterns: RwLock)>>, capacity: usize, } impl EventBus { - /// Crée un bus avec la capacité par défaut (64 messages par canal). + /// Creates a bus with the default capacity (64 messages per channel). pub fn new() -> Self { + debug!( + "EventBus created with default capacity ({})", + DEFAULT_CAPACITY + ); Self { channels: RwLock::new(HashMap::new()), patterns: RwLock::new(Vec::new()), @@ -102,8 +107,9 @@ impl EventBus { } } - /// Crée un bus avec une capacité de buffer personnalisée. + /// Creates a bus with a custom buffer capacity. pub fn with_capacity(capacity: usize) -> Self { + debug!("EventBus created with capacity {}", capacity); Self { channels: RwLock::new(HashMap::new()), patterns: RwLock::new(Vec::new()), @@ -112,7 +118,7 @@ impl EventBus { } // ───────────────────────────────────────────────────────────────────────── - // Interne + // Internal // ───────────────────────────────────────────────────────────────────────── fn get_or_create_sender(&self, topic: &str) -> broadcast::Sender { @@ -123,26 +129,31 @@ impl EventBus { } } let mut channels = self.channels.write(); - channels + let created = !channels.contains_key(topic); + let tx = channels .entry(topic.to_string()) .or_insert_with(|| { let (tx, _) = broadcast::channel(self.capacity); tx }) - .clone() + .clone(); + if created { + debug!(topic, "New broadcast channel created"); + } + tx } // ───────────────────────────────────────────────────────────────────────── - // Émission + // Emission // ───────────────────────────────────────────────────────────────────────── - /// Émet un événement sur un topic. + /// Emits an event on a topic. /// - /// - Pousse l'event dans le canal du topic exact (si des abonnés existent). - /// - Pousse l'event dans tous les canaux de pattern qui matchent le topic. - /// - Si personne n'écoute, l'événement est ignoré silencieusement. + /// - Pushes the event into the exact-topic channel (if subscribers exist). + /// - Pushes the event into all glob-pattern channels that match the topic. + /// - If nobody is listening, the event is silently dropped. /// - /// # Exemple + /// # Example /// ```rust,no_run /// # use std::sync::Arc; /// # use oxspeak_server_lib::event_bus::EventBus; @@ -152,39 +163,53 @@ impl EventBus { /// bus.emit("user-deleted", uuid::Uuid::new_v4()); /// ``` pub fn emit(&self, topic: &str, event: T) { + trace!(topic, "Emitting event"); let event: AnyEvent = Arc::new(event); - // Abonnés au topic exact + // Exact-topic subscribers if let Some(tx) = self.channels.read().get(topic) { + let receiver_count = tx.receiver_count(); let _ = tx.send(Arc::clone(&event)); + trace!( + topic, + receiver_count, "Event delivered to exact-topic channel" + ); } - // Abonnés aux patterns glob correspondants - for (pattern, tx) in self.patterns.read().iter() { + // Glob-pattern subscribers + let patterns = self.patterns.read(); + for (pattern, tx) in patterns.iter() { if pattern.matches(topic) { + let receiver_count = tx.receiver_count(); let _ = tx.send((topic.to_string(), Arc::clone(&event))); + trace!( + topic, + pattern = pattern.as_str(), + receiver_count, + "Event delivered to pattern channel" + ); } } } // ───────────────────────────────────────────────────────────────────────── - // Abonnement — callbacks (API principale) + // Subscription — callbacks (main API) // ───────────────────────────────────────────────────────────────────────── - /// S'abonne à un topic et appelle `handler` à chaque événement de type `T`. + /// Subscribes to a topic and calls `handler` on each event of type `T`. /// - /// Le handler est exécuté dans une tâche Tokio dédiée (fire-and-forget). - /// Les événements d'un autre type sont ignorés silencieusement. - /// Retourne un [`JoinHandle`] pour annuler l'abonnement si besoin. + /// The handler runs in a dedicated Tokio task (fire-and-forget). + /// Events of a different type are silently ignored. + /// Returns a [`JoinHandle`] to cancel the subscription if needed. /// - /// # Exemple + /// # Example /// ```rust,no_run /// # use std::sync::Arc; /// # use oxspeak_server_lib::event_bus::EventBus; /// # #[derive(Clone, Debug)] struct User { name: String } /// # let bus = Arc::new(EventBus::new()); /// bus.on::("user-connected", |user| { - /// println!("Connecté : {:?}", user); + /// println!("Connected: {:?}", user); /// }); /// ``` pub fn on(&self, topic: &str, handler: F) -> JoinHandle<()> @@ -193,37 +218,53 @@ impl EventBus { F: Fn(T) + Send + Sync + 'static, { let mut rx = self.get_or_create_sender(topic).subscribe(); + let topic_owned = topic.to_string(); + + debug!(topic, "Sync subscriber registered"); tokio::spawn(async move { loop { match rx.recv().await { Ok(evt) => { if let Some(typed) = evt.downcast_ref::() { + trace!(topic = topic_owned, "Sync handler invoked"); handler(typed.clone()); } } - Err(broadcast::error::RecvError::Lagged(_)) => {} - Err(broadcast::error::RecvError::Closed) => break, + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!( + topic = topic_owned, + skipped = n, + "Subscriber lagged, messages dropped" + ); + } + Err(broadcast::error::RecvError::Closed) => { + debug!( + topic = topic_owned, + "Channel closed, sync subscriber exiting" + ); + break; + } } } }) } - /// S'abonne à un topic et appelle un handler **async** à chaque événement de type `T`. + /// Subscribes to a topic and calls an **async** handler on each event of type `T`. /// - /// Parfait pour effectuer des opérations async dans le handler - /// (requête DB, appel HTTP, broadcast WebSocket…). - /// Retourne un [`JoinHandle`] pour annuler l'abonnement si besoin. + /// Ideal for performing async operations in the handler + /// (DB query, HTTP call, WebSocket broadcast, …). + /// Returns a [`JoinHandle`] to cancel the subscription if needed. /// - /// # Exemple + /// # Example /// ```rust,no_run /// # use std::sync::Arc; /// # use oxspeak_server_lib::event_bus::EventBus; /// # #[derive(Clone, Debug)] struct User { name: String } /// # let bus = Arc::new(EventBus::new()); /// bus.on_async::("user-connected", |user| async move { - /// println!("(async) Connecté : {:?}", user); - /// // Ici tu peux faire du async : requête DB, HTTP, etc. + /// println!("(async) Connected: {:?}", user); + /// // async work here: DB query, HTTP, etc. /// }); /// ``` pub fn on_async(&self, topic: &str, handler: F) -> JoinHandle<()> @@ -233,34 +274,50 @@ impl EventBus { Fut: Future + Send + 'static, { let mut rx = self.get_or_create_sender(topic).subscribe(); + let topic_owned = topic.to_string(); + + debug!(topic, "Async subscriber registered"); tokio::spawn(async move { loop { match rx.recv().await { Ok(evt) => { if let Some(typed) = evt.downcast_ref::() { + trace!(topic = topic_owned, "Async handler invoked"); handler(typed.clone()).await; } } - Err(broadcast::error::RecvError::Lagged(_)) => {} - Err(broadcast::error::RecvError::Closed) => break, + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!( + topic = topic_owned, + skipped = n, + "Subscriber lagged, messages dropped" + ); + } + Err(broadcast::error::RecvError::Closed) => { + debug!( + topic = topic_owned, + "Channel closed, async subscriber exiting" + ); + break; + } } } }) } - /// S'abonne à tous les topics correspondant à un pattern glob. + /// Subscribes to all topics matching a glob pattern. /// - /// Le handler reçoit `(topic, valeur)` — le nom du topic est inclus pour - /// distinguer `user-created` de `user-deleted` par exemple. + /// The handler receives `(topic, value)` — the topic name is included to + /// distinguish `user-created` from `user-deleted`, for example. /// - /// **Aucun pré-enregistrement nécessaire** : les topics futurs sont couverts. - /// Supporte la syntaxe glob : `*` (toute séquence), `?` (un caractère), - /// `[abc]` (classe de caractères). + /// **No pre-registration required**: future topics are automatically covered. + /// Supports glob syntax: `*` (any sequence), `?` (one character), + /// `[abc]` (character class). /// - /// Retourne un [`JoinHandle`] pour annuler l'abonnement si besoin. + /// Returns a [`JoinHandle`] to cancel the subscription if needed. /// - /// # Exemple + /// # Example /// ```rust,no_run /// # use std::sync::Arc; /// # use oxspeak_server_lib::event_bus::EventBus; @@ -268,8 +325,8 @@ impl EventBus { /// # let bus = Arc::new(EventBus::new()); /// bus.on_pattern::("user-*", |topic, user| { /// match topic.as_str() { - /// "user-created" => println!("Créé : {:?}", user), - /// "user-deleted" => println!("Supprimé : {:?}", user), + /// "user-created" => println!("Created : {:?}", user), + /// "user-deleted" => println!("Deleted : {:?}", user), /// other => println!("{}: {:?}", other, user), /// } /// }); @@ -282,30 +339,50 @@ impl EventBus { T: Any + Send + Sync + Clone + 'static, F: Fn(String, T) + Send + Sync + 'static, { - let glob = Pattern::new(pattern).expect("pattern glob invalide"); + let glob = Pattern::new(pattern).expect("invalid glob pattern"); let (tx, mut rx) = broadcast::channel(self.capacity); self.patterns.write().push((glob, tx)); + let pattern_owned = pattern.to_string(); + debug!(pattern, "Sync pattern subscriber registered"); + tokio::spawn(async move { loop { match rx.recv().await { Ok((topic, evt)) => { if let Some(typed) = evt.downcast_ref::() { + trace!( + topic, + pattern = pattern_owned, + "Sync pattern handler invoked" + ); handler(topic, typed.clone()); } } - Err(broadcast::error::RecvError::Lagged(_)) => {} - Err(broadcast::error::RecvError::Closed) => break, + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!( + pattern = pattern_owned, + skipped = n, + "Pattern subscriber lagged, messages dropped" + ); + } + Err(broadcast::error::RecvError::Closed) => { + debug!( + pattern = pattern_owned, + "Channel closed, sync pattern subscriber exiting" + ); + break; + } } } }) } - /// S'abonne à tous les topics correspondant à un pattern glob, avec un handler **async**. + /// Subscribes to all topics matching a glob pattern, with an **async** handler. /// - /// Retourne un [`JoinHandle`] pour annuler l'abonnement si besoin. + /// Returns a [`JoinHandle`] to cancel the subscription if needed. /// - /// # Exemple + /// # Example /// ```rust,no_run /// # use std::sync::Arc; /// # use oxspeak_server_lib::event_bus::EventBus; @@ -313,8 +390,8 @@ impl EventBus { /// # let bus = Arc::new(EventBus::new()); /// bus.on_pattern_async::("user-*", |topic, user| async move { /// match topic.as_str() { - /// "user-created" => println!("(async) Créé : {:?}", user), - /// "user-deleted" => println!("(async) Supprimé : {:?}", user), + /// "user-created" => println!("(async) Created : {:?}", user), + /// "user-deleted" => println!("(async) Deleted : {:?}", user), /// other => println!("(async) {}: {:?}", other, user), /// } /// }); @@ -327,35 +404,55 @@ impl EventBus { F: Fn(String, T) -> Fut + Send + Sync + 'static, Fut: Future + Send + 'static, { - let glob = Pattern::new(pattern).expect("pattern glob invalide"); + let glob = Pattern::new(pattern).expect("invalid glob pattern"); let (tx, mut rx) = broadcast::channel(self.capacity); self.patterns.write().push((glob, tx)); + let pattern_owned = pattern.to_string(); + debug!(pattern, "Async pattern subscriber registered"); + tokio::spawn(async move { loop { match rx.recv().await { Ok((topic, evt)) => { if let Some(typed) = evt.downcast_ref::() { + trace!( + topic, + pattern = pattern_owned, + "Async pattern handler invoked" + ); handler(topic, typed.clone()).await; } } - Err(broadcast::error::RecvError::Lagged(_)) => {} - Err(broadcast::error::RecvError::Closed) => break, + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!( + pattern = pattern_owned, + skipped = n, + "Pattern subscriber lagged, messages dropped" + ); + } + Err(broadcast::error::RecvError::Closed) => { + debug!( + pattern = pattern_owned, + "Channel closed, async pattern subscriber exiting" + ); + break; + } } } }) } // ───────────────────────────────────────────────────────────────────────── - // Abonnement — accès bas niveau (cas avancés) + // Subscription — low-level access (advanced use cases) // ───────────────────────────────────────────────────────────────────────── - /// Retourne un receiver brut ([`AnyEvent`]) pour gérer toi-même la boucle. + /// Returns a raw [`AnyEvent`] receiver to manage the loop yourself. /// - /// Utile avec la macro [`match_event!`][crate::match_event] pour gérer - /// plusieurs types différents sur un même topic. + /// Useful with the [`match_event!`][crate::match_event] macro to handle + /// multiple different types on the same topic. /// - /// # Exemple + /// # Example /// ```rust,no_run /// # use std::sync::Arc; /// # use oxspeak_server_lib::event_bus::EventBus; @@ -376,14 +473,15 @@ impl EventBus { /// # }); /// ``` pub fn on_raw(&self, topic: &str) -> broadcast::Receiver { + debug!(topic, "Raw subscriber registered"); self.get_or_create_sender(topic).subscribe() } // ───────────────────────────────────────────────────────────────────────── - // Utilitaires + // Utilities // ───────────────────────────────────────────────────────────────────────── - /// Retourne la liste des topics actuellement enregistrés. + /// Returns the list of currently registered topics. pub fn topics(&self) -> Vec { self.channels.read().keys().cloned().collect() } diff --git a/event_bus/src/lib.rs b/event_bus/src/lib.rs index b225064..3501c15 100644 --- a/event_bus/src/lib.rs +++ b/event_bus/src/lib.rs @@ -1,18 +1,18 @@ //! # Event Bus //! -//! Un bus d'événements asynchrone permettant de faire transiter des messages typés -//! entre plusieurs modules, sans couplage direct. +//! An asynchronous event bus for routing typed messages between modules +//! without direct coupling. //! -//! ## Caractéristiques +//! ## Features //! -//! - **Association clé → événement** : chaque topic (`&str`) est indépendant -//! - **Sans restriction de type** : n'importe quel `T: Any + Send + Sync + Clone` -//! - **Wake-up ciblé** : seuls les abonnés du bon topic sont réveillés -//! - **API callback** : style JavaScript — `bus.on("topic", |payload| { ... })` -//! - **Pattern glob** : `on_pattern("user-*", |topic, payload| { ... })` -//! - **Handlers async** : `on_async` et `on_pattern_async` +//! - **Key → event mapping**: each topic (`&str`) is independent +//! - **Type-unrestricted**: any `T: Any + Send + Sync + Clone` +//! - **Targeted wake-up**: only subscribers of the matching topic are woken up +//! - **Callback API**: JavaScript-style — `bus.on("topic", |payload| { ... })` +//! - **Glob pattern**: `on_pattern("user-*", |topic, payload| { ... })` +//! - **Async handlers**: `on_async` and `on_pattern_async` //! -//! ## Exemple — callback sync +//! ## Example — sync callback //! //! ```rust,no_run //! use std::sync::Arc; @@ -25,7 +25,7 @@ //! let bus = Arc::new(EventBus::new()); //! //! bus.on::("user-connected", |user| { -//! println!("Connecté : {:?}", user); +//! println!("Connected: {:?}", user); //! }); //! //! bus.emit("user-connected", User { name: "Alice".into() }); @@ -33,7 +33,7 @@ //! # }); //! ``` //! -//! ## Exemple — callback async +//! ## Example — async callback //! //! ```rust,no_run //! use std::sync::Arc; @@ -46,7 +46,7 @@ //! let bus = Arc::new(EventBus::new()); //! //! bus.on_async::("user-connected", |user| async move { -//! println!("(async) Connecté : {:?}", user); +//! println!("(async) Connected: {:?}", user); //! }); //! //! bus.emit("user-connected", User { name: "Bob".into() }); @@ -54,7 +54,7 @@ //! # }); //! ``` //! -//! ## Exemple — pattern glob (topic inclus dans le callback) +//! ## Example — glob pattern (topic included in the callback) //! //! ```rust,no_run //! use std::sync::Arc; @@ -68,8 +68,8 @@ //! //! bus.on_pattern::("user-*", |topic, user| { //! match topic.as_str() { -//! "user-created" => println!("Créé : {:?}", user), -//! "user-deleted" => println!("Supprimé : {:?}", user), +//! "user-created" => println!("Created : {:?}", user), +//! "user-deleted" => println!("Deleted : {:?}", user), //! other => println!("{}: {:?}", other, user), //! } //! }); @@ -80,7 +80,7 @@ //! # }); //! ``` //! -//! ## Exemple — multi-types avec `match_event!` (cas avancé) +//! ## Example — multi-type with `match_event!` (advanced) //! //! ```rust,no_run //! use std::sync::Arc; @@ -107,20 +107,20 @@ mod bus; -// Réexports publics +// Public re-exports pub use bus::{AnyEvent, EventBus}; -/// Downcaste un [`AnyEvent`] vers un ou plusieurs types concrets et exécute -/// la closure correspondante si le type correspond. +/// Downcasts an [`AnyEvent`] to one or more concrete types and executes +/// the matching closure if the type matches. /// -/// Les branches non correspondantes sont ignorées silencieusement. +/// Non-matching branches are silently ignored. /// -/// # Syntaxe +/// # Syntax /// ```text /// match_event!(evt, Type1 => |val| { ... }, Type2 => |val| { ... }) /// ``` /// -/// # Exemple +/// # Example /// ```rust,no_run /// # use std::sync::Arc; /// # use oxspeak_server_lib::event_bus::EventBus; @@ -134,8 +134,8 @@ pub use bus::{AnyEvent, EventBus}; /// /// if let Ok(evt) = rx.recv().await { /// match_event!(evt, -/// User => |u| println!("Utilisateur : {:?}", u), -/// UdpMetric => |m| println!("Metric : {:?}", m), +/// User => |u| println!("User: {:?}", u), +/// UdpMetric => |m| println!("Metric: {:?}", m), /// ); /// } /// # }); @@ -149,7 +149,7 @@ macro_rules! match_event { } else )+ { - // Aucun type ne correspond → on ignore silencieusement + // No matching type → silently ignored } }; }