666 lines
18 KiB
Rust
666 lines
18 KiB
Rust
use std::sync::{
|
|
atomic::{AtomicU64, Ordering},
|
|
Arc,
|
|
};
|
|
use std::time::Instant;
|
|
|
|
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
|
|
use event_bus::EventBus;
|
|
use tokio::runtime::Runtime;
|
|
|
|
const TOPIC: &str = "bench-topic";
|
|
const PATTERN: &str = "bench-*";
|
|
|
|
#[derive(Clone)]
|
|
struct SmallEvent {
|
|
value: u64,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
struct StringEvent {
|
|
id: u64,
|
|
name: String,
|
|
message: String,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
struct VecEvent {
|
|
id: u64,
|
|
payload: Vec<u8>,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
struct ArcPayloadEvent {
|
|
id: u64,
|
|
payload: Arc<[u8]>,
|
|
}
|
|
|
|
fn runtime() -> Runtime {
|
|
Runtime::new().expect("failed to create tokio runtime")
|
|
}
|
|
|
|
fn wait_until_received(
|
|
received: &AtomicU64,
|
|
expected: u64,
|
|
) -> impl std::future::Future<Output = ()> + '_ {
|
|
async move {
|
|
while received.load(Ordering::Relaxed) < expected {
|
|
tokio::task::yield_now().await;
|
|
}
|
|
}
|
|
}
|
|
|
|
fn bench_emit_no_subscriber(c: &mut Criterion) {
|
|
let mut group = c.benchmark_group("event_bus/no_subscriber");
|
|
group.throughput(Throughput::Elements(1));
|
|
|
|
let bus = EventBus::with_capacity(1024);
|
|
|
|
group.bench_function("u64", |b| {
|
|
b.iter(|| {
|
|
bus.emit(TOPIC, 42_u64);
|
|
});
|
|
});
|
|
|
|
group.bench_function("small_struct", |b| {
|
|
b.iter(|| {
|
|
bus.emit(TOPIC, SmallEvent { value: 42 });
|
|
});
|
|
});
|
|
|
|
group.bench_function("string_struct", |b| {
|
|
b.iter(|| {
|
|
bus.emit(
|
|
TOPIC,
|
|
StringEvent {
|
|
id: 42,
|
|
name: "Alice".to_string(),
|
|
message: "hello from benchmark".to_string(),
|
|
},
|
|
);
|
|
});
|
|
});
|
|
|
|
group.bench_function("vec_payload_1kb", |b| {
|
|
b.iter(|| {
|
|
bus.emit(
|
|
TOPIC,
|
|
VecEvent {
|
|
id: 42,
|
|
payload: vec![7_u8; 1024],
|
|
},
|
|
);
|
|
});
|
|
});
|
|
|
|
let shared_payload: Arc<[u8]> = Arc::from(vec![7_u8; 1024].into_boxed_slice());
|
|
|
|
group.bench_function("arc_payload_1kb", |b| {
|
|
b.iter(|| {
|
|
bus.emit(
|
|
TOPIC,
|
|
ArcPayloadEvent {
|
|
id: 42,
|
|
payload: Arc::clone(&shared_payload),
|
|
},
|
|
);
|
|
});
|
|
});
|
|
|
|
group.finish();
|
|
}
|
|
|
|
fn bench_raw_subscriber(c: &mut Criterion) {
|
|
let rt = runtime();
|
|
|
|
let mut group = c.benchmark_group("event_bus/raw_subscriber");
|
|
group.throughput(Throughput::Elements(1));
|
|
|
|
group.bench_function("u64", |b| {
|
|
b.to_async(&rt).iter_custom(|iters| async move {
|
|
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
|
let mut rx = bus.on_raw(TOPIC);
|
|
|
|
let received = Arc::new(AtomicU64::new(0));
|
|
let receiver_count = Arc::clone(&received);
|
|
|
|
let receiver = tokio::spawn(async move {
|
|
while receiver_count.load(Ordering::Relaxed) < iters {
|
|
if rx.recv().await.is_ok() {
|
|
receiver_count.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
}
|
|
});
|
|
|
|
let start = Instant::now();
|
|
|
|
for i in 0..iters {
|
|
bus.emit(TOPIC, i);
|
|
}
|
|
|
|
wait_until_received(&received, iters).await;
|
|
|
|
let elapsed = start.elapsed();
|
|
|
|
receiver.abort();
|
|
|
|
elapsed
|
|
});
|
|
});
|
|
|
|
group.bench_function("small_struct", |b| {
|
|
b.to_async(&rt).iter_custom(|iters| async move {
|
|
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
|
let mut rx = bus.on_raw(TOPIC);
|
|
|
|
let received = Arc::new(AtomicU64::new(0));
|
|
let receiver_count = Arc::clone(&received);
|
|
|
|
let receiver = tokio::spawn(async move {
|
|
while receiver_count.load(Ordering::Relaxed) < iters {
|
|
if rx.recv().await.is_ok() {
|
|
receiver_count.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
}
|
|
});
|
|
|
|
let start = Instant::now();
|
|
|
|
for i in 0..iters {
|
|
bus.emit(TOPIC, SmallEvent { value: i });
|
|
}
|
|
|
|
wait_until_received(&received, iters).await;
|
|
|
|
let elapsed = start.elapsed();
|
|
|
|
receiver.abort();
|
|
|
|
elapsed
|
|
});
|
|
});
|
|
|
|
group.bench_function("string_struct", |b| {
|
|
b.to_async(&rt).iter_custom(|iters| async move {
|
|
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
|
let mut rx = bus.on_raw(TOPIC);
|
|
|
|
let received = Arc::new(AtomicU64::new(0));
|
|
let receiver_count = Arc::clone(&received);
|
|
|
|
let receiver = tokio::spawn(async move {
|
|
while receiver_count.load(Ordering::Relaxed) < iters {
|
|
if rx.recv().await.is_ok() {
|
|
receiver_count.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
}
|
|
});
|
|
|
|
let start = Instant::now();
|
|
|
|
for i in 0..iters {
|
|
bus.emit(
|
|
TOPIC,
|
|
StringEvent {
|
|
id: i,
|
|
name: "Alice".to_string(),
|
|
message: "hello from benchmark".to_string(),
|
|
},
|
|
);
|
|
}
|
|
|
|
wait_until_received(&received, iters).await;
|
|
|
|
let elapsed = start.elapsed();
|
|
|
|
receiver.abort();
|
|
|
|
elapsed
|
|
});
|
|
});
|
|
|
|
group.bench_function("vec_payload_1kb", |b| {
|
|
b.to_async(&rt).iter_custom(|iters| async move {
|
|
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
|
let mut rx = bus.on_raw(TOPIC);
|
|
|
|
let received = Arc::new(AtomicU64::new(0));
|
|
let receiver_count = Arc::clone(&received);
|
|
|
|
let receiver = tokio::spawn(async move {
|
|
while receiver_count.load(Ordering::Relaxed) < iters {
|
|
if rx.recv().await.is_ok() {
|
|
receiver_count.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
}
|
|
});
|
|
|
|
let start = Instant::now();
|
|
|
|
for i in 0..iters {
|
|
bus.emit(
|
|
TOPIC,
|
|
VecEvent {
|
|
id: i,
|
|
payload: vec![7_u8; 1024],
|
|
},
|
|
);
|
|
}
|
|
|
|
wait_until_received(&received, iters).await;
|
|
|
|
let elapsed = start.elapsed();
|
|
|
|
receiver.abort();
|
|
|
|
elapsed
|
|
});
|
|
});
|
|
|
|
group.bench_function("arc_payload_1kb", |b| {
|
|
b.to_async(&rt).iter_custom(|iters| async move {
|
|
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
|
let mut rx = bus.on_raw(TOPIC);
|
|
|
|
let payload: Arc<[u8]> = Arc::from(vec![7_u8; 1024].into_boxed_slice());
|
|
|
|
let received = Arc::new(AtomicU64::new(0));
|
|
let receiver_count = Arc::clone(&received);
|
|
|
|
let receiver = tokio::spawn(async move {
|
|
while receiver_count.load(Ordering::Relaxed) < iters {
|
|
if rx.recv().await.is_ok() {
|
|
receiver_count.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
}
|
|
});
|
|
|
|
let start = Instant::now();
|
|
|
|
for i in 0..iters {
|
|
bus.emit(
|
|
TOPIC,
|
|
ArcPayloadEvent {
|
|
id: i,
|
|
payload: Arc::clone(&payload),
|
|
},
|
|
);
|
|
}
|
|
|
|
wait_until_received(&received, iters).await;
|
|
|
|
let elapsed = start.elapsed();
|
|
|
|
receiver.abort();
|
|
|
|
elapsed
|
|
});
|
|
});
|
|
|
|
group.finish();
|
|
}
|
|
|
|
fn bench_typed_callback(c: &mut Criterion) {
|
|
let rt = runtime();
|
|
|
|
let mut group = c.benchmark_group("event_bus/typed_callback");
|
|
group.throughput(Throughput::Elements(1));
|
|
|
|
group.bench_function("u64", |b| {
|
|
b.to_async(&rt).iter_custom(|iters| async move {
|
|
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
|
|
|
let received = Arc::new(AtomicU64::new(0));
|
|
let handler_count = Arc::clone(&received);
|
|
|
|
let subscription = bus.on::<u64, _>(TOPIC, move |event| {
|
|
let _ = event;
|
|
handler_count.fetch_add(1, Ordering::Relaxed);
|
|
});
|
|
|
|
let start = Instant::now();
|
|
|
|
for i in 0..iters {
|
|
bus.emit(TOPIC, i);
|
|
}
|
|
|
|
wait_until_received(&received, iters).await;
|
|
|
|
let elapsed = start.elapsed();
|
|
|
|
subscription.abort();
|
|
|
|
elapsed
|
|
});
|
|
});
|
|
|
|
group.bench_function("small_struct", |b| {
|
|
b.to_async(&rt).iter_custom(|iters| async move {
|
|
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
|
|
|
let received = Arc::new(AtomicU64::new(0));
|
|
let handler_count = Arc::clone(&received);
|
|
|
|
let subscription = bus.on::<SmallEvent, _>(TOPIC, move |event| {
|
|
let _ = event.value;
|
|
handler_count.fetch_add(1, Ordering::Relaxed);
|
|
});
|
|
|
|
let start = Instant::now();
|
|
|
|
for i in 0..iters {
|
|
bus.emit(TOPIC, SmallEvent { value: i });
|
|
}
|
|
|
|
wait_until_received(&received, iters).await;
|
|
|
|
let elapsed = start.elapsed();
|
|
|
|
subscription.abort();
|
|
|
|
elapsed
|
|
});
|
|
});
|
|
|
|
group.bench_function("string_struct", |b| {
|
|
b.to_async(&rt).iter_custom(|iters| async move {
|
|
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
|
|
|
let received = Arc::new(AtomicU64::new(0));
|
|
let handler_count = Arc::clone(&received);
|
|
|
|
let subscription = bus.on::<StringEvent, _>(TOPIC, move |event| {
|
|
let _ = event.id;
|
|
let _ = event.name.len();
|
|
let _ = event.message.len();
|
|
handler_count.fetch_add(1, Ordering::Relaxed);
|
|
});
|
|
|
|
let start = Instant::now();
|
|
|
|
for i in 0..iters {
|
|
bus.emit(
|
|
TOPIC,
|
|
StringEvent {
|
|
id: i,
|
|
name: "Alice".to_string(),
|
|
message: "hello from benchmark".to_string(),
|
|
},
|
|
);
|
|
}
|
|
|
|
wait_until_received(&received, iters).await;
|
|
|
|
let elapsed = start.elapsed();
|
|
|
|
subscription.abort();
|
|
|
|
elapsed
|
|
});
|
|
});
|
|
|
|
group.bench_function("vec_payload_1kb", |b| {
|
|
b.to_async(&rt).iter_custom(|iters| async move {
|
|
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
|
|
|
let received = Arc::new(AtomicU64::new(0));
|
|
let handler_count = Arc::clone(&received);
|
|
|
|
let subscription = bus.on::<VecEvent, _>(TOPIC, move |event| {
|
|
let _ = event.id;
|
|
let _ = event.payload.len();
|
|
handler_count.fetch_add(1, Ordering::Relaxed);
|
|
});
|
|
|
|
let start = Instant::now();
|
|
|
|
for i in 0..iters {
|
|
bus.emit(
|
|
TOPIC,
|
|
VecEvent {
|
|
id: i,
|
|
payload: vec![7_u8; 1024],
|
|
},
|
|
);
|
|
}
|
|
|
|
wait_until_received(&received, iters).await;
|
|
|
|
let elapsed = start.elapsed();
|
|
|
|
subscription.abort();
|
|
|
|
elapsed
|
|
});
|
|
});
|
|
|
|
group.bench_function("arc_payload_1kb", |b| {
|
|
b.to_async(&rt).iter_custom(|iters| async move {
|
|
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
|
let payload: Arc<[u8]> = Arc::from(vec![7_u8; 1024].into_boxed_slice());
|
|
|
|
let received = Arc::new(AtomicU64::new(0));
|
|
let handler_count = Arc::clone(&received);
|
|
|
|
let subscription = bus.on::<ArcPayloadEvent, _>(TOPIC, move |event| {
|
|
let _ = event.id;
|
|
let _ = event.payload.len();
|
|
handler_count.fetch_add(1, Ordering::Relaxed);
|
|
});
|
|
|
|
let start = Instant::now();
|
|
|
|
for i in 0..iters {
|
|
bus.emit(
|
|
TOPIC,
|
|
ArcPayloadEvent {
|
|
id: i,
|
|
payload: Arc::clone(&payload),
|
|
},
|
|
);
|
|
}
|
|
|
|
wait_until_received(&received, iters).await;
|
|
|
|
let elapsed = start.elapsed();
|
|
|
|
subscription.abort();
|
|
|
|
elapsed
|
|
});
|
|
});
|
|
|
|
group.finish();
|
|
}
|
|
|
|
fn bench_pattern_callback(c: &mut Criterion) {
|
|
let rt = runtime();
|
|
|
|
let mut group = c.benchmark_group("event_bus/pattern_callback");
|
|
group.throughput(Throughput::Elements(1));
|
|
|
|
group.bench_function("small_struct", |b| {
|
|
b.to_async(&rt).iter_custom(|iters| async move {
|
|
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
|
|
|
let received = Arc::new(AtomicU64::new(0));
|
|
let handler_count = Arc::clone(&received);
|
|
|
|
let subscription = bus.on_pattern::<SmallEvent, _>(PATTERN, move |topic, event| {
|
|
let _ = topic;
|
|
let _ = event.value;
|
|
handler_count.fetch_add(1, Ordering::Relaxed);
|
|
});
|
|
|
|
let start = Instant::now();
|
|
|
|
for i in 0..iters {
|
|
bus.emit(TOPIC, SmallEvent { value: i });
|
|
}
|
|
|
|
wait_until_received(&received, iters).await;
|
|
|
|
let elapsed = start.elapsed();
|
|
|
|
subscription.abort();
|
|
|
|
elapsed
|
|
});
|
|
});
|
|
|
|
group.bench_function("arc_payload_1kb", |b| {
|
|
b.to_async(&rt).iter_custom(|iters| async move {
|
|
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
|
let payload: Arc<[u8]> = Arc::from(vec![7_u8; 1024].into_boxed_slice());
|
|
|
|
let received = Arc::new(AtomicU64::new(0));
|
|
let handler_count = Arc::clone(&received);
|
|
|
|
let subscription =
|
|
bus.on_pattern::<ArcPayloadEvent, _>(PATTERN, move |topic, event| {
|
|
let _ = topic;
|
|
let _ = event.id;
|
|
let _ = event.payload.len();
|
|
handler_count.fetch_add(1, Ordering::Relaxed);
|
|
});
|
|
|
|
let start = Instant::now();
|
|
|
|
for i in 0..iters {
|
|
bus.emit(
|
|
TOPIC,
|
|
ArcPayloadEvent {
|
|
id: i,
|
|
payload: Arc::clone(&payload),
|
|
},
|
|
);
|
|
}
|
|
|
|
wait_until_received(&received, iters).await;
|
|
|
|
let elapsed = start.elapsed();
|
|
|
|
subscription.abort();
|
|
|
|
elapsed
|
|
});
|
|
});
|
|
|
|
group.finish();
|
|
}
|
|
|
|
fn bench_multiple_subscribers(c: &mut Criterion) {
|
|
let rt = runtime();
|
|
|
|
let mut group = c.benchmark_group("event_bus/multiple_subscribers");
|
|
group.throughput(Throughput::Elements(1));
|
|
|
|
for subscriber_count in [1_u64, 2, 4, 8, 16, 32] {
|
|
group.bench_function(format!("{subscriber_count}_subscribers"), |b| {
|
|
b.to_async(&rt).iter_custom(|iters| async move {
|
|
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
|
|
|
let expected = iters * subscriber_count;
|
|
let received = Arc::new(AtomicU64::new(0));
|
|
let mut subscriptions = Vec::with_capacity(subscriber_count as usize);
|
|
|
|
for _ in 0..subscriber_count {
|
|
let handler_count = Arc::clone(&received);
|
|
|
|
let subscription = bus.on::<SmallEvent, _>(TOPIC, move |event| {
|
|
let _ = event.value;
|
|
handler_count.fetch_add(1, Ordering::Relaxed);
|
|
});
|
|
|
|
subscriptions.push(subscription);
|
|
}
|
|
|
|
let start = Instant::now();
|
|
|
|
for i in 0..iters {
|
|
bus.emit(TOPIC, SmallEvent { value: i });
|
|
}
|
|
|
|
wait_until_received(&received, expected).await;
|
|
|
|
let elapsed = start.elapsed();
|
|
|
|
for subscription in subscriptions {
|
|
subscription.abort();
|
|
}
|
|
|
|
elapsed
|
|
});
|
|
});
|
|
}
|
|
|
|
group.finish();
|
|
}
|
|
|
|
fn bench_multiple_patterns(c: &mut Criterion) {
|
|
let rt = runtime();
|
|
|
|
let mut group = c.benchmark_group("event_bus/multiple_patterns");
|
|
group.throughput(Throughput::Elements(1));
|
|
|
|
for pattern_count in [1_u64, 4, 16, 64, 256] {
|
|
group.bench_function(format!("{pattern_count}_patterns_one_match"), |b| {
|
|
b.to_async(&rt).iter_custom(|iters| async move {
|
|
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
|
|
|
let received = Arc::new(AtomicU64::new(0));
|
|
let mut subscriptions = Vec::with_capacity(pattern_count as usize);
|
|
|
|
for index in 0..pattern_count {
|
|
let pattern = if index == 0 {
|
|
PATTERN.to_string()
|
|
} else {
|
|
format!("unused-{index}-*")
|
|
};
|
|
|
|
let handler_count = Arc::clone(&received);
|
|
|
|
let subscription =
|
|
bus.on_pattern::<SmallEvent, _>(&pattern, move |topic, event| {
|
|
let _ = topic;
|
|
let _ = event.value;
|
|
handler_count.fetch_add(1, Ordering::Relaxed);
|
|
});
|
|
|
|
subscriptions.push(subscription);
|
|
}
|
|
|
|
let start = Instant::now();
|
|
|
|
for i in 0..iters {
|
|
bus.emit(TOPIC, SmallEvent { value: i });
|
|
}
|
|
|
|
wait_until_received(&received, iters).await;
|
|
|
|
let elapsed = start.elapsed();
|
|
|
|
for subscription in subscriptions {
|
|
subscription.abort();
|
|
}
|
|
|
|
elapsed
|
|
});
|
|
});
|
|
}
|
|
|
|
group.finish();
|
|
}
|
|
|
|
criterion_group!(
|
|
benches,
|
|
bench_emit_no_subscriber,
|
|
bench_raw_subscriber,
|
|
bench_typed_callback,
|
|
bench_pattern_callback,
|
|
bench_multiple_subscribers,
|
|
bench_multiple_patterns,
|
|
);
|
|
|
|
criterion_main!(benches);
|