Files
2026-05-08 00:17:03 +02:00

666 lines
18 KiB
Rust

use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use std::time::Instant;
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use event_bus::EventBus;
use tokio::runtime::Runtime;
const TOPIC: &str = "bench-topic";
const PATTERN: &str = "bench-*";
#[derive(Clone)]
struct SmallEvent {
value: u64,
}
#[derive(Clone)]
struct StringEvent {
id: u64,
name: String,
message: String,
}
#[derive(Clone)]
struct VecEvent {
id: u64,
payload: Vec<u8>,
}
#[derive(Clone)]
struct ArcPayloadEvent {
id: u64,
payload: Arc<[u8]>,
}
fn runtime() -> Runtime {
Runtime::new().expect("failed to create tokio runtime")
}
fn wait_until_received(
received: &AtomicU64,
expected: u64,
) -> impl std::future::Future<Output = ()> + '_ {
async move {
while received.load(Ordering::Relaxed) < expected {
tokio::task::yield_now().await;
}
}
}
fn bench_emit_no_subscriber(c: &mut Criterion) {
let mut group = c.benchmark_group("event_bus/no_subscriber");
group.throughput(Throughput::Elements(1));
let bus = EventBus::with_capacity(1024);
group.bench_function("u64", |b| {
b.iter(|| {
bus.emit(TOPIC, 42_u64);
});
});
group.bench_function("small_struct", |b| {
b.iter(|| {
bus.emit(TOPIC, SmallEvent { value: 42 });
});
});
group.bench_function("string_struct", |b| {
b.iter(|| {
bus.emit(
TOPIC,
StringEvent {
id: 42,
name: "Alice".to_string(),
message: "hello from benchmark".to_string(),
},
);
});
});
group.bench_function("vec_payload_1kb", |b| {
b.iter(|| {
bus.emit(
TOPIC,
VecEvent {
id: 42,
payload: vec![7_u8; 1024],
},
);
});
});
let shared_payload: Arc<[u8]> = Arc::from(vec![7_u8; 1024].into_boxed_slice());
group.bench_function("arc_payload_1kb", |b| {
b.iter(|| {
bus.emit(
TOPIC,
ArcPayloadEvent {
id: 42,
payload: Arc::clone(&shared_payload),
},
);
});
});
group.finish();
}
fn bench_raw_subscriber(c: &mut Criterion) {
let rt = runtime();
let mut group = c.benchmark_group("event_bus/raw_subscriber");
group.throughput(Throughput::Elements(1));
group.bench_function("u64", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
let mut rx = bus.on_raw(TOPIC);
let received = Arc::new(AtomicU64::new(0));
let receiver_count = Arc::clone(&received);
let receiver = tokio::spawn(async move {
while receiver_count.load(Ordering::Relaxed) < iters {
if rx.recv().await.is_ok() {
receiver_count.fetch_add(1, Ordering::Relaxed);
}
}
});
let start = Instant::now();
for i in 0..iters {
bus.emit(TOPIC, i);
}
wait_until_received(&received, iters).await;
let elapsed = start.elapsed();
receiver.abort();
elapsed
});
});
group.bench_function("small_struct", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
let mut rx = bus.on_raw(TOPIC);
let received = Arc::new(AtomicU64::new(0));
let receiver_count = Arc::clone(&received);
let receiver = tokio::spawn(async move {
while receiver_count.load(Ordering::Relaxed) < iters {
if rx.recv().await.is_ok() {
receiver_count.fetch_add(1, Ordering::Relaxed);
}
}
});
let start = Instant::now();
for i in 0..iters {
bus.emit(TOPIC, SmallEvent { value: i });
}
wait_until_received(&received, iters).await;
let elapsed = start.elapsed();
receiver.abort();
elapsed
});
});
group.bench_function("string_struct", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
let mut rx = bus.on_raw(TOPIC);
let received = Arc::new(AtomicU64::new(0));
let receiver_count = Arc::clone(&received);
let receiver = tokio::spawn(async move {
while receiver_count.load(Ordering::Relaxed) < iters {
if rx.recv().await.is_ok() {
receiver_count.fetch_add(1, Ordering::Relaxed);
}
}
});
let start = Instant::now();
for i in 0..iters {
bus.emit(
TOPIC,
StringEvent {
id: i,
name: "Alice".to_string(),
message: "hello from benchmark".to_string(),
},
);
}
wait_until_received(&received, iters).await;
let elapsed = start.elapsed();
receiver.abort();
elapsed
});
});
group.bench_function("vec_payload_1kb", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
let mut rx = bus.on_raw(TOPIC);
let received = Arc::new(AtomicU64::new(0));
let receiver_count = Arc::clone(&received);
let receiver = tokio::spawn(async move {
while receiver_count.load(Ordering::Relaxed) < iters {
if rx.recv().await.is_ok() {
receiver_count.fetch_add(1, Ordering::Relaxed);
}
}
});
let start = Instant::now();
for i in 0..iters {
bus.emit(
TOPIC,
VecEvent {
id: i,
payload: vec![7_u8; 1024],
},
);
}
wait_until_received(&received, iters).await;
let elapsed = start.elapsed();
receiver.abort();
elapsed
});
});
group.bench_function("arc_payload_1kb", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
let mut rx = bus.on_raw(TOPIC);
let payload: Arc<[u8]> = Arc::from(vec![7_u8; 1024].into_boxed_slice());
let received = Arc::new(AtomicU64::new(0));
let receiver_count = Arc::clone(&received);
let receiver = tokio::spawn(async move {
while receiver_count.load(Ordering::Relaxed) < iters {
if rx.recv().await.is_ok() {
receiver_count.fetch_add(1, Ordering::Relaxed);
}
}
});
let start = Instant::now();
for i in 0..iters {
bus.emit(
TOPIC,
ArcPayloadEvent {
id: i,
payload: Arc::clone(&payload),
},
);
}
wait_until_received(&received, iters).await;
let elapsed = start.elapsed();
receiver.abort();
elapsed
});
});
group.finish();
}
fn bench_typed_callback(c: &mut Criterion) {
let rt = runtime();
let mut group = c.benchmark_group("event_bus/typed_callback");
group.throughput(Throughput::Elements(1));
group.bench_function("u64", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
let received = Arc::new(AtomicU64::new(0));
let handler_count = Arc::clone(&received);
let subscription = bus.on::<u64, _>(TOPIC, move |event| {
let _ = event;
handler_count.fetch_add(1, Ordering::Relaxed);
});
let start = Instant::now();
for i in 0..iters {
bus.emit(TOPIC, i);
}
wait_until_received(&received, iters).await;
let elapsed = start.elapsed();
subscription.abort();
elapsed
});
});
group.bench_function("small_struct", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
let received = Arc::new(AtomicU64::new(0));
let handler_count = Arc::clone(&received);
let subscription = bus.on::<SmallEvent, _>(TOPIC, move |event| {
let _ = event.value;
handler_count.fetch_add(1, Ordering::Relaxed);
});
let start = Instant::now();
for i in 0..iters {
bus.emit(TOPIC, SmallEvent { value: i });
}
wait_until_received(&received, iters).await;
let elapsed = start.elapsed();
subscription.abort();
elapsed
});
});
group.bench_function("string_struct", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
let received = Arc::new(AtomicU64::new(0));
let handler_count = Arc::clone(&received);
let subscription = bus.on::<StringEvent, _>(TOPIC, move |event| {
let _ = event.id;
let _ = event.name.len();
let _ = event.message.len();
handler_count.fetch_add(1, Ordering::Relaxed);
});
let start = Instant::now();
for i in 0..iters {
bus.emit(
TOPIC,
StringEvent {
id: i,
name: "Alice".to_string(),
message: "hello from benchmark".to_string(),
},
);
}
wait_until_received(&received, iters).await;
let elapsed = start.elapsed();
subscription.abort();
elapsed
});
});
group.bench_function("vec_payload_1kb", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
let received = Arc::new(AtomicU64::new(0));
let handler_count = Arc::clone(&received);
let subscription = bus.on::<VecEvent, _>(TOPIC, move |event| {
let _ = event.id;
let _ = event.payload.len();
handler_count.fetch_add(1, Ordering::Relaxed);
});
let start = Instant::now();
for i in 0..iters {
bus.emit(
TOPIC,
VecEvent {
id: i,
payload: vec![7_u8; 1024],
},
);
}
wait_until_received(&received, iters).await;
let elapsed = start.elapsed();
subscription.abort();
elapsed
});
});
group.bench_function("arc_payload_1kb", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
let payload: Arc<[u8]> = Arc::from(vec![7_u8; 1024].into_boxed_slice());
let received = Arc::new(AtomicU64::new(0));
let handler_count = Arc::clone(&received);
let subscription = bus.on::<ArcPayloadEvent, _>(TOPIC, move |event| {
let _ = event.id;
let _ = event.payload.len();
handler_count.fetch_add(1, Ordering::Relaxed);
});
let start = Instant::now();
for i in 0..iters {
bus.emit(
TOPIC,
ArcPayloadEvent {
id: i,
payload: Arc::clone(&payload),
},
);
}
wait_until_received(&received, iters).await;
let elapsed = start.elapsed();
subscription.abort();
elapsed
});
});
group.finish();
}
fn bench_pattern_callback(c: &mut Criterion) {
let rt = runtime();
let mut group = c.benchmark_group("event_bus/pattern_callback");
group.throughput(Throughput::Elements(1));
group.bench_function("small_struct", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
let received = Arc::new(AtomicU64::new(0));
let handler_count = Arc::clone(&received);
let subscription = bus.on_pattern::<SmallEvent, _>(PATTERN, move |topic, event| {
let _ = topic;
let _ = event.value;
handler_count.fetch_add(1, Ordering::Relaxed);
});
let start = Instant::now();
for i in 0..iters {
bus.emit(TOPIC, SmallEvent { value: i });
}
wait_until_received(&received, iters).await;
let elapsed = start.elapsed();
subscription.abort();
elapsed
});
});
group.bench_function("arc_payload_1kb", |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
let payload: Arc<[u8]> = Arc::from(vec![7_u8; 1024].into_boxed_slice());
let received = Arc::new(AtomicU64::new(0));
let handler_count = Arc::clone(&received);
let subscription =
bus.on_pattern::<ArcPayloadEvent, _>(PATTERN, move |topic, event| {
let _ = topic;
let _ = event.id;
let _ = event.payload.len();
handler_count.fetch_add(1, Ordering::Relaxed);
});
let start = Instant::now();
for i in 0..iters {
bus.emit(
TOPIC,
ArcPayloadEvent {
id: i,
payload: Arc::clone(&payload),
},
);
}
wait_until_received(&received, iters).await;
let elapsed = start.elapsed();
subscription.abort();
elapsed
});
});
group.finish();
}
fn bench_multiple_subscribers(c: &mut Criterion) {
let rt = runtime();
let mut group = c.benchmark_group("event_bus/multiple_subscribers");
group.throughput(Throughput::Elements(1));
for subscriber_count in [1_u64, 2, 4, 8, 16, 32] {
group.bench_function(format!("{subscriber_count}_subscribers"), |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
let expected = iters * subscriber_count;
let received = Arc::new(AtomicU64::new(0));
let mut subscriptions = Vec::with_capacity(subscriber_count as usize);
for _ in 0..subscriber_count {
let handler_count = Arc::clone(&received);
let subscription = bus.on::<SmallEvent, _>(TOPIC, move |event| {
let _ = event.value;
handler_count.fetch_add(1, Ordering::Relaxed);
});
subscriptions.push(subscription);
}
let start = Instant::now();
for i in 0..iters {
bus.emit(TOPIC, SmallEvent { value: i });
}
wait_until_received(&received, expected).await;
let elapsed = start.elapsed();
for subscription in subscriptions {
subscription.abort();
}
elapsed
});
});
}
group.finish();
}
fn bench_multiple_patterns(c: &mut Criterion) {
let rt = runtime();
let mut group = c.benchmark_group("event_bus/multiple_patterns");
group.throughput(Throughput::Elements(1));
for pattern_count in [1_u64, 4, 16, 64, 256] {
group.bench_function(format!("{pattern_count}_patterns_one_match"), |b| {
b.to_async(&rt).iter_custom(|iters| async move {
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
let received = Arc::new(AtomicU64::new(0));
let mut subscriptions = Vec::with_capacity(pattern_count as usize);
for index in 0..pattern_count {
let pattern = if index == 0 {
PATTERN.to_string()
} else {
format!("unused-{index}-*")
};
let handler_count = Arc::clone(&received);
let subscription =
bus.on_pattern::<SmallEvent, _>(&pattern, move |topic, event| {
let _ = topic;
let _ = event.value;
handler_count.fetch_add(1, Ordering::Relaxed);
});
subscriptions.push(subscription);
}
let start = Instant::now();
for i in 0..iters {
bus.emit(TOPIC, SmallEvent { value: i });
}
wait_until_received(&received, iters).await;
let elapsed = start.elapsed();
for subscription in subscriptions {
subscription.abort();
}
elapsed
});
});
}
group.finish();
}
criterion_group!(
benches,
bench_emit_no_subscriber,
bench_raw_subscriber,
bench_typed_callback,
bench_pattern_callback,
bench_multiple_subscribers,
bench_multiple_patterns,
);
criterion_main!(benches);