use std::sync::{ atomic::{AtomicU64, Ordering}, Arc, }; use std::time::Instant; use criterion::{criterion_group, criterion_main, Criterion, Throughput}; use event_bus::EventBus; use tokio::runtime::Runtime; const TOPIC: &str = "bench-topic"; const PATTERN: &str = "bench-*"; #[derive(Clone)] struct SmallEvent { value: u64, } #[derive(Clone)] struct StringEvent { id: u64, name: String, message: String, } #[derive(Clone)] struct VecEvent { id: u64, payload: Vec, } #[derive(Clone)] struct ArcPayloadEvent { id: u64, payload: Arc<[u8]>, } fn runtime() -> Runtime { Runtime::new().expect("failed to create tokio runtime") } fn wait_until_received( received: &AtomicU64, expected: u64, ) -> impl std::future::Future + '_ { async move { while received.load(Ordering::Relaxed) < expected { tokio::task::yield_now().await; } } } fn bench_emit_no_subscriber(c: &mut Criterion) { let mut group = c.benchmark_group("event_bus/no_subscriber"); group.throughput(Throughput::Elements(1)); let bus = EventBus::with_capacity(1024); group.bench_function("u64", |b| { b.iter(|| { bus.emit(TOPIC, 42_u64); }); }); group.bench_function("small_struct", |b| { b.iter(|| { bus.emit(TOPIC, SmallEvent { value: 42 }); }); }); group.bench_function("string_struct", |b| { b.iter(|| { bus.emit( TOPIC, StringEvent { id: 42, name: "Alice".to_string(), message: "hello from benchmark".to_string(), }, ); }); }); group.bench_function("vec_payload_1kb", |b| { b.iter(|| { bus.emit( TOPIC, VecEvent { id: 42, payload: vec![7_u8; 1024], }, ); }); }); let shared_payload: Arc<[u8]> = Arc::from(vec![7_u8; 1024].into_boxed_slice()); group.bench_function("arc_payload_1kb", |b| { b.iter(|| { bus.emit( TOPIC, ArcPayloadEvent { id: 42, payload: Arc::clone(&shared_payload), }, ); }); }); group.finish(); } fn bench_raw_subscriber(c: &mut Criterion) { let rt = runtime(); let mut group = c.benchmark_group("event_bus/raw_subscriber"); group.throughput(Throughput::Elements(1)); group.bench_function("u64", |b| { b.to_async(&rt).iter_custom(|iters| async move { let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); let mut rx = bus.on_raw(TOPIC); let received = Arc::new(AtomicU64::new(0)); let receiver_count = Arc::clone(&received); let receiver = tokio::spawn(async move { while receiver_count.load(Ordering::Relaxed) < iters { if rx.recv().await.is_ok() { receiver_count.fetch_add(1, Ordering::Relaxed); } } }); let start = Instant::now(); for i in 0..iters { bus.emit(TOPIC, i); } wait_until_received(&received, iters).await; let elapsed = start.elapsed(); receiver.abort(); elapsed }); }); group.bench_function("small_struct", |b| { b.to_async(&rt).iter_custom(|iters| async move { let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); let mut rx = bus.on_raw(TOPIC); let received = Arc::new(AtomicU64::new(0)); let receiver_count = Arc::clone(&received); let receiver = tokio::spawn(async move { while receiver_count.load(Ordering::Relaxed) < iters { if rx.recv().await.is_ok() { receiver_count.fetch_add(1, Ordering::Relaxed); } } }); let start = Instant::now(); for i in 0..iters { bus.emit(TOPIC, SmallEvent { value: i }); } wait_until_received(&received, iters).await; let elapsed = start.elapsed(); receiver.abort(); elapsed }); }); group.bench_function("string_struct", |b| { b.to_async(&rt).iter_custom(|iters| async move { let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); let mut rx = bus.on_raw(TOPIC); let received = Arc::new(AtomicU64::new(0)); let receiver_count = Arc::clone(&received); let receiver = tokio::spawn(async move { while receiver_count.load(Ordering::Relaxed) < iters { if rx.recv().await.is_ok() { receiver_count.fetch_add(1, Ordering::Relaxed); } } }); let start = Instant::now(); for i in 0..iters { bus.emit( TOPIC, StringEvent { id: i, name: "Alice".to_string(), message: "hello from benchmark".to_string(), }, ); } wait_until_received(&received, iters).await; let elapsed = start.elapsed(); receiver.abort(); elapsed }); }); group.bench_function("vec_payload_1kb", |b| { b.to_async(&rt).iter_custom(|iters| async move { let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); let mut rx = bus.on_raw(TOPIC); let received = Arc::new(AtomicU64::new(0)); let receiver_count = Arc::clone(&received); let receiver = tokio::spawn(async move { while receiver_count.load(Ordering::Relaxed) < iters { if rx.recv().await.is_ok() { receiver_count.fetch_add(1, Ordering::Relaxed); } } }); let start = Instant::now(); for i in 0..iters { bus.emit( TOPIC, VecEvent { id: i, payload: vec![7_u8; 1024], }, ); } wait_until_received(&received, iters).await; let elapsed = start.elapsed(); receiver.abort(); elapsed }); }); group.bench_function("arc_payload_1kb", |b| { b.to_async(&rt).iter_custom(|iters| async move { let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); let mut rx = bus.on_raw(TOPIC); let payload: Arc<[u8]> = Arc::from(vec![7_u8; 1024].into_boxed_slice()); let received = Arc::new(AtomicU64::new(0)); let receiver_count = Arc::clone(&received); let receiver = tokio::spawn(async move { while receiver_count.load(Ordering::Relaxed) < iters { if rx.recv().await.is_ok() { receiver_count.fetch_add(1, Ordering::Relaxed); } } }); let start = Instant::now(); for i in 0..iters { bus.emit( TOPIC, ArcPayloadEvent { id: i, payload: Arc::clone(&payload), }, ); } wait_until_received(&received, iters).await; let elapsed = start.elapsed(); receiver.abort(); elapsed }); }); group.finish(); } fn bench_typed_callback(c: &mut Criterion) { let rt = runtime(); let mut group = c.benchmark_group("event_bus/typed_callback"); group.throughput(Throughput::Elements(1)); group.bench_function("u64", |b| { b.to_async(&rt).iter_custom(|iters| async move { let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); let received = Arc::new(AtomicU64::new(0)); let handler_count = Arc::clone(&received); let subscription = bus.on::(TOPIC, move |event| { let _ = event; handler_count.fetch_add(1, Ordering::Relaxed); }); let start = Instant::now(); for i in 0..iters { bus.emit(TOPIC, i); } wait_until_received(&received, iters).await; let elapsed = start.elapsed(); subscription.abort(); elapsed }); }); group.bench_function("small_struct", |b| { b.to_async(&rt).iter_custom(|iters| async move { let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); let received = Arc::new(AtomicU64::new(0)); let handler_count = Arc::clone(&received); let subscription = bus.on::(TOPIC, move |event| { let _ = event.value; handler_count.fetch_add(1, Ordering::Relaxed); }); let start = Instant::now(); for i in 0..iters { bus.emit(TOPIC, SmallEvent { value: i }); } wait_until_received(&received, iters).await; let elapsed = start.elapsed(); subscription.abort(); elapsed }); }); group.bench_function("string_struct", |b| { b.to_async(&rt).iter_custom(|iters| async move { let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); let received = Arc::new(AtomicU64::new(0)); let handler_count = Arc::clone(&received); let subscription = bus.on::(TOPIC, move |event| { let _ = event.id; let _ = event.name.len(); let _ = event.message.len(); handler_count.fetch_add(1, Ordering::Relaxed); }); let start = Instant::now(); for i in 0..iters { bus.emit( TOPIC, StringEvent { id: i, name: "Alice".to_string(), message: "hello from benchmark".to_string(), }, ); } wait_until_received(&received, iters).await; let elapsed = start.elapsed(); subscription.abort(); elapsed }); }); group.bench_function("vec_payload_1kb", |b| { b.to_async(&rt).iter_custom(|iters| async move { let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); let received = Arc::new(AtomicU64::new(0)); let handler_count = Arc::clone(&received); let subscription = bus.on::(TOPIC, move |event| { let _ = event.id; let _ = event.payload.len(); handler_count.fetch_add(1, Ordering::Relaxed); }); let start = Instant::now(); for i in 0..iters { bus.emit( TOPIC, VecEvent { id: i, payload: vec![7_u8; 1024], }, ); } wait_until_received(&received, iters).await; let elapsed = start.elapsed(); subscription.abort(); elapsed }); }); group.bench_function("arc_payload_1kb", |b| { b.to_async(&rt).iter_custom(|iters| async move { let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); let payload: Arc<[u8]> = Arc::from(vec![7_u8; 1024].into_boxed_slice()); let received = Arc::new(AtomicU64::new(0)); let handler_count = Arc::clone(&received); let subscription = bus.on::(TOPIC, move |event| { let _ = event.id; let _ = event.payload.len(); handler_count.fetch_add(1, Ordering::Relaxed); }); let start = Instant::now(); for i in 0..iters { bus.emit( TOPIC, ArcPayloadEvent { id: i, payload: Arc::clone(&payload), }, ); } wait_until_received(&received, iters).await; let elapsed = start.elapsed(); subscription.abort(); elapsed }); }); group.finish(); } fn bench_pattern_callback(c: &mut Criterion) { let rt = runtime(); let mut group = c.benchmark_group("event_bus/pattern_callback"); group.throughput(Throughput::Elements(1)); group.bench_function("small_struct", |b| { b.to_async(&rt).iter_custom(|iters| async move { let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); let received = Arc::new(AtomicU64::new(0)); let handler_count = Arc::clone(&received); let subscription = bus.on_pattern::(PATTERN, move |topic, event| { let _ = topic; let _ = event.value; handler_count.fetch_add(1, Ordering::Relaxed); }); let start = Instant::now(); for i in 0..iters { bus.emit(TOPIC, SmallEvent { value: i }); } wait_until_received(&received, iters).await; let elapsed = start.elapsed(); subscription.abort(); elapsed }); }); group.bench_function("arc_payload_1kb", |b| { b.to_async(&rt).iter_custom(|iters| async move { let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); let payload: Arc<[u8]> = Arc::from(vec![7_u8; 1024].into_boxed_slice()); let received = Arc::new(AtomicU64::new(0)); let handler_count = Arc::clone(&received); let subscription = bus.on_pattern::(PATTERN, move |topic, event| { let _ = topic; let _ = event.id; let _ = event.payload.len(); handler_count.fetch_add(1, Ordering::Relaxed); }); let start = Instant::now(); for i in 0..iters { bus.emit( TOPIC, ArcPayloadEvent { id: i, payload: Arc::clone(&payload), }, ); } wait_until_received(&received, iters).await; let elapsed = start.elapsed(); subscription.abort(); elapsed }); }); group.finish(); } fn bench_multiple_subscribers(c: &mut Criterion) { let rt = runtime(); let mut group = c.benchmark_group("event_bus/multiple_subscribers"); group.throughput(Throughput::Elements(1)); for subscriber_count in [1_u64, 2, 4, 8, 16, 32] { group.bench_function(format!("{subscriber_count}_subscribers"), |b| { b.to_async(&rt).iter_custom(|iters| async move { let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); let expected = iters * subscriber_count; let received = Arc::new(AtomicU64::new(0)); let mut subscriptions = Vec::with_capacity(subscriber_count as usize); for _ in 0..subscriber_count { let handler_count = Arc::clone(&received); let subscription = bus.on::(TOPIC, move |event| { let _ = event.value; handler_count.fetch_add(1, Ordering::Relaxed); }); subscriptions.push(subscription); } let start = Instant::now(); for i in 0..iters { bus.emit(TOPIC, SmallEvent { value: i }); } wait_until_received(&received, expected).await; let elapsed = start.elapsed(); for subscription in subscriptions { subscription.abort(); } elapsed }); }); } group.finish(); } fn bench_multiple_patterns(c: &mut Criterion) { let rt = runtime(); let mut group = c.benchmark_group("event_bus/multiple_patterns"); group.throughput(Throughput::Elements(1)); for pattern_count in [1_u64, 4, 16, 64, 256] { group.bench_function(format!("{pattern_count}_patterns_one_match"), |b| { b.to_async(&rt).iter_custom(|iters| async move { let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024)); let received = Arc::new(AtomicU64::new(0)); let mut subscriptions = Vec::with_capacity(pattern_count as usize); for index in 0..pattern_count { let pattern = if index == 0 { PATTERN.to_string() } else { format!("unused-{index}-*") }; let handler_count = Arc::clone(&received); let subscription = bus.on_pattern::(&pattern, move |topic, event| { let _ = topic; let _ = event.value; handler_count.fetch_add(1, Ordering::Relaxed); }); subscriptions.push(subscription); } let start = Instant::now(); for i in 0..iters { bus.emit(TOPIC, SmallEvent { value: i }); } wait_until_received(&received, iters).await; let elapsed = start.elapsed(); for subscription in subscriptions { subscription.abort(); } elapsed }); }); } group.finish(); } criterion_group!( benches, bench_emit_no_subscriber, bench_raw_subscriber, bench_typed_callback, bench_pattern_callback, bench_multiple_subscribers, bench_multiple_patterns, ); criterion_main!(benches);