Init
This commit is contained in:
@@ -0,0 +1,665 @@
|
||||
use std::sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
};
|
||||
use std::time::Instant;
|
||||
|
||||
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
|
||||
use event_bus::EventBus;
|
||||
use tokio::runtime::Runtime;
|
||||
|
||||
const TOPIC: &str = "bench-topic";
|
||||
const PATTERN: &str = "bench-*";
|
||||
|
||||
#[derive(Clone)]
|
||||
struct SmallEvent {
|
||||
value: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct StringEvent {
|
||||
id: u64,
|
||||
name: String,
|
||||
message: String,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct VecEvent {
|
||||
id: u64,
|
||||
payload: Vec<u8>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ArcPayloadEvent {
|
||||
id: u64,
|
||||
payload: Arc<[u8]>,
|
||||
}
|
||||
|
||||
fn runtime() -> Runtime {
|
||||
Runtime::new().expect("failed to create tokio runtime")
|
||||
}
|
||||
|
||||
fn wait_until_received(
|
||||
received: &AtomicU64,
|
||||
expected: u64,
|
||||
) -> impl std::future::Future<Output = ()> + '_ {
|
||||
async move {
|
||||
while received.load(Ordering::Relaxed) < expected {
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn bench_emit_no_subscriber(c: &mut Criterion) {
|
||||
let mut group = c.benchmark_group("event_bus/no_subscriber");
|
||||
group.throughput(Throughput::Elements(1));
|
||||
|
||||
let bus = EventBus::with_capacity(1024);
|
||||
|
||||
group.bench_function("u64", |b| {
|
||||
b.iter(|| {
|
||||
bus.emit(TOPIC, 42_u64);
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function("small_struct", |b| {
|
||||
b.iter(|| {
|
||||
bus.emit(TOPIC, SmallEvent { value: 42 });
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function("string_struct", |b| {
|
||||
b.iter(|| {
|
||||
bus.emit(
|
||||
TOPIC,
|
||||
StringEvent {
|
||||
id: 42,
|
||||
name: "Alice".to_string(),
|
||||
message: "hello from benchmark".to_string(),
|
||||
},
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function("vec_payload_1kb", |b| {
|
||||
b.iter(|| {
|
||||
bus.emit(
|
||||
TOPIC,
|
||||
VecEvent {
|
||||
id: 42,
|
||||
payload: vec![7_u8; 1024],
|
||||
},
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
let shared_payload: Arc<[u8]> = Arc::from(vec![7_u8; 1024].into_boxed_slice());
|
||||
|
||||
group.bench_function("arc_payload_1kb", |b| {
|
||||
b.iter(|| {
|
||||
bus.emit(
|
||||
TOPIC,
|
||||
ArcPayloadEvent {
|
||||
id: 42,
|
||||
payload: Arc::clone(&shared_payload),
|
||||
},
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
fn bench_raw_subscriber(c: &mut Criterion) {
|
||||
let rt = runtime();
|
||||
|
||||
let mut group = c.benchmark_group("event_bus/raw_subscriber");
|
||||
group.throughput(Throughput::Elements(1));
|
||||
|
||||
group.bench_function("u64", |b| {
|
||||
b.to_async(&rt).iter_custom(|iters| async move {
|
||||
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
||||
let mut rx = bus.on_raw(TOPIC);
|
||||
|
||||
let received = Arc::new(AtomicU64::new(0));
|
||||
let receiver_count = Arc::clone(&received);
|
||||
|
||||
let receiver = tokio::spawn(async move {
|
||||
while receiver_count.load(Ordering::Relaxed) < iters {
|
||||
if rx.recv().await.is_ok() {
|
||||
receiver_count.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
for i in 0..iters {
|
||||
bus.emit(TOPIC, i);
|
||||
}
|
||||
|
||||
wait_until_received(&received, iters).await;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
receiver.abort();
|
||||
|
||||
elapsed
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function("small_struct", |b| {
|
||||
b.to_async(&rt).iter_custom(|iters| async move {
|
||||
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
||||
let mut rx = bus.on_raw(TOPIC);
|
||||
|
||||
let received = Arc::new(AtomicU64::new(0));
|
||||
let receiver_count = Arc::clone(&received);
|
||||
|
||||
let receiver = tokio::spawn(async move {
|
||||
while receiver_count.load(Ordering::Relaxed) < iters {
|
||||
if rx.recv().await.is_ok() {
|
||||
receiver_count.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
for i in 0..iters {
|
||||
bus.emit(TOPIC, SmallEvent { value: i });
|
||||
}
|
||||
|
||||
wait_until_received(&received, iters).await;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
receiver.abort();
|
||||
|
||||
elapsed
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function("string_struct", |b| {
|
||||
b.to_async(&rt).iter_custom(|iters| async move {
|
||||
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
||||
let mut rx = bus.on_raw(TOPIC);
|
||||
|
||||
let received = Arc::new(AtomicU64::new(0));
|
||||
let receiver_count = Arc::clone(&received);
|
||||
|
||||
let receiver = tokio::spawn(async move {
|
||||
while receiver_count.load(Ordering::Relaxed) < iters {
|
||||
if rx.recv().await.is_ok() {
|
||||
receiver_count.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
for i in 0..iters {
|
||||
bus.emit(
|
||||
TOPIC,
|
||||
StringEvent {
|
||||
id: i,
|
||||
name: "Alice".to_string(),
|
||||
message: "hello from benchmark".to_string(),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
wait_until_received(&received, iters).await;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
receiver.abort();
|
||||
|
||||
elapsed
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function("vec_payload_1kb", |b| {
|
||||
b.to_async(&rt).iter_custom(|iters| async move {
|
||||
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
||||
let mut rx = bus.on_raw(TOPIC);
|
||||
|
||||
let received = Arc::new(AtomicU64::new(0));
|
||||
let receiver_count = Arc::clone(&received);
|
||||
|
||||
let receiver = tokio::spawn(async move {
|
||||
while receiver_count.load(Ordering::Relaxed) < iters {
|
||||
if rx.recv().await.is_ok() {
|
||||
receiver_count.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
for i in 0..iters {
|
||||
bus.emit(
|
||||
TOPIC,
|
||||
VecEvent {
|
||||
id: i,
|
||||
payload: vec![7_u8; 1024],
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
wait_until_received(&received, iters).await;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
receiver.abort();
|
||||
|
||||
elapsed
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function("arc_payload_1kb", |b| {
|
||||
b.to_async(&rt).iter_custom(|iters| async move {
|
||||
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
||||
let mut rx = bus.on_raw(TOPIC);
|
||||
|
||||
let payload: Arc<[u8]> = Arc::from(vec![7_u8; 1024].into_boxed_slice());
|
||||
|
||||
let received = Arc::new(AtomicU64::new(0));
|
||||
let receiver_count = Arc::clone(&received);
|
||||
|
||||
let receiver = tokio::spawn(async move {
|
||||
while receiver_count.load(Ordering::Relaxed) < iters {
|
||||
if rx.recv().await.is_ok() {
|
||||
receiver_count.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
for i in 0..iters {
|
||||
bus.emit(
|
||||
TOPIC,
|
||||
ArcPayloadEvent {
|
||||
id: i,
|
||||
payload: Arc::clone(&payload),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
wait_until_received(&received, iters).await;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
receiver.abort();
|
||||
|
||||
elapsed
|
||||
});
|
||||
});
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
fn bench_typed_callback(c: &mut Criterion) {
|
||||
let rt = runtime();
|
||||
|
||||
let mut group = c.benchmark_group("event_bus/typed_callback");
|
||||
group.throughput(Throughput::Elements(1));
|
||||
|
||||
group.bench_function("u64", |b| {
|
||||
b.to_async(&rt).iter_custom(|iters| async move {
|
||||
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
||||
|
||||
let received = Arc::new(AtomicU64::new(0));
|
||||
let handler_count = Arc::clone(&received);
|
||||
|
||||
let subscription = bus.on::<u64, _>(TOPIC, move |event| {
|
||||
let _ = event;
|
||||
handler_count.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
for i in 0..iters {
|
||||
bus.emit(TOPIC, i);
|
||||
}
|
||||
|
||||
wait_until_received(&received, iters).await;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
subscription.abort();
|
||||
|
||||
elapsed
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function("small_struct", |b| {
|
||||
b.to_async(&rt).iter_custom(|iters| async move {
|
||||
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
||||
|
||||
let received = Arc::new(AtomicU64::new(0));
|
||||
let handler_count = Arc::clone(&received);
|
||||
|
||||
let subscription = bus.on::<SmallEvent, _>(TOPIC, move |event| {
|
||||
let _ = event.value;
|
||||
handler_count.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
for i in 0..iters {
|
||||
bus.emit(TOPIC, SmallEvent { value: i });
|
||||
}
|
||||
|
||||
wait_until_received(&received, iters).await;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
subscription.abort();
|
||||
|
||||
elapsed
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function("string_struct", |b| {
|
||||
b.to_async(&rt).iter_custom(|iters| async move {
|
||||
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
||||
|
||||
let received = Arc::new(AtomicU64::new(0));
|
||||
let handler_count = Arc::clone(&received);
|
||||
|
||||
let subscription = bus.on::<StringEvent, _>(TOPIC, move |event| {
|
||||
let _ = event.id;
|
||||
let _ = event.name.len();
|
||||
let _ = event.message.len();
|
||||
handler_count.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
for i in 0..iters {
|
||||
bus.emit(
|
||||
TOPIC,
|
||||
StringEvent {
|
||||
id: i,
|
||||
name: "Alice".to_string(),
|
||||
message: "hello from benchmark".to_string(),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
wait_until_received(&received, iters).await;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
subscription.abort();
|
||||
|
||||
elapsed
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function("vec_payload_1kb", |b| {
|
||||
b.to_async(&rt).iter_custom(|iters| async move {
|
||||
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
||||
|
||||
let received = Arc::new(AtomicU64::new(0));
|
||||
let handler_count = Arc::clone(&received);
|
||||
|
||||
let subscription = bus.on::<VecEvent, _>(TOPIC, move |event| {
|
||||
let _ = event.id;
|
||||
let _ = event.payload.len();
|
||||
handler_count.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
for i in 0..iters {
|
||||
bus.emit(
|
||||
TOPIC,
|
||||
VecEvent {
|
||||
id: i,
|
||||
payload: vec![7_u8; 1024],
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
wait_until_received(&received, iters).await;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
subscription.abort();
|
||||
|
||||
elapsed
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function("arc_payload_1kb", |b| {
|
||||
b.to_async(&rt).iter_custom(|iters| async move {
|
||||
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
||||
let payload: Arc<[u8]> = Arc::from(vec![7_u8; 1024].into_boxed_slice());
|
||||
|
||||
let received = Arc::new(AtomicU64::new(0));
|
||||
let handler_count = Arc::clone(&received);
|
||||
|
||||
let subscription = bus.on::<ArcPayloadEvent, _>(TOPIC, move |event| {
|
||||
let _ = event.id;
|
||||
let _ = event.payload.len();
|
||||
handler_count.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
for i in 0..iters {
|
||||
bus.emit(
|
||||
TOPIC,
|
||||
ArcPayloadEvent {
|
||||
id: i,
|
||||
payload: Arc::clone(&payload),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
wait_until_received(&received, iters).await;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
subscription.abort();
|
||||
|
||||
elapsed
|
||||
});
|
||||
});
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
fn bench_pattern_callback(c: &mut Criterion) {
|
||||
let rt = runtime();
|
||||
|
||||
let mut group = c.benchmark_group("event_bus/pattern_callback");
|
||||
group.throughput(Throughput::Elements(1));
|
||||
|
||||
group.bench_function("small_struct", |b| {
|
||||
b.to_async(&rt).iter_custom(|iters| async move {
|
||||
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
||||
|
||||
let received = Arc::new(AtomicU64::new(0));
|
||||
let handler_count = Arc::clone(&received);
|
||||
|
||||
let subscription = bus.on_pattern::<SmallEvent, _>(PATTERN, move |topic, event| {
|
||||
let _ = topic;
|
||||
let _ = event.value;
|
||||
handler_count.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
for i in 0..iters {
|
||||
bus.emit(TOPIC, SmallEvent { value: i });
|
||||
}
|
||||
|
||||
wait_until_received(&received, iters).await;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
subscription.abort();
|
||||
|
||||
elapsed
|
||||
});
|
||||
});
|
||||
|
||||
group.bench_function("arc_payload_1kb", |b| {
|
||||
b.to_async(&rt).iter_custom(|iters| async move {
|
||||
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
||||
let payload: Arc<[u8]> = Arc::from(vec![7_u8; 1024].into_boxed_slice());
|
||||
|
||||
let received = Arc::new(AtomicU64::new(0));
|
||||
let handler_count = Arc::clone(&received);
|
||||
|
||||
let subscription =
|
||||
bus.on_pattern::<ArcPayloadEvent, _>(PATTERN, move |topic, event| {
|
||||
let _ = topic;
|
||||
let _ = event.id;
|
||||
let _ = event.payload.len();
|
||||
handler_count.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
for i in 0..iters {
|
||||
bus.emit(
|
||||
TOPIC,
|
||||
ArcPayloadEvent {
|
||||
id: i,
|
||||
payload: Arc::clone(&payload),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
wait_until_received(&received, iters).await;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
subscription.abort();
|
||||
|
||||
elapsed
|
||||
});
|
||||
});
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
fn bench_multiple_subscribers(c: &mut Criterion) {
|
||||
let rt = runtime();
|
||||
|
||||
let mut group = c.benchmark_group("event_bus/multiple_subscribers");
|
||||
group.throughput(Throughput::Elements(1));
|
||||
|
||||
for subscriber_count in [1_u64, 2, 4, 8, 16, 32] {
|
||||
group.bench_function(format!("{subscriber_count}_subscribers"), |b| {
|
||||
b.to_async(&rt).iter_custom(|iters| async move {
|
||||
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
||||
|
||||
let expected = iters * subscriber_count;
|
||||
let received = Arc::new(AtomicU64::new(0));
|
||||
let mut subscriptions = Vec::with_capacity(subscriber_count as usize);
|
||||
|
||||
for _ in 0..subscriber_count {
|
||||
let handler_count = Arc::clone(&received);
|
||||
|
||||
let subscription = bus.on::<SmallEvent, _>(TOPIC, move |event| {
|
||||
let _ = event.value;
|
||||
handler_count.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
|
||||
subscriptions.push(subscription);
|
||||
}
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
for i in 0..iters {
|
||||
bus.emit(TOPIC, SmallEvent { value: i });
|
||||
}
|
||||
|
||||
wait_until_received(&received, expected).await;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
for subscription in subscriptions {
|
||||
subscription.abort();
|
||||
}
|
||||
|
||||
elapsed
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
fn bench_multiple_patterns(c: &mut Criterion) {
|
||||
let rt = runtime();
|
||||
|
||||
let mut group = c.benchmark_group("event_bus/multiple_patterns");
|
||||
group.throughput(Throughput::Elements(1));
|
||||
|
||||
for pattern_count in [1_u64, 4, 16, 64, 256] {
|
||||
group.bench_function(format!("{pattern_count}_patterns_one_match"), |b| {
|
||||
b.to_async(&rt).iter_custom(|iters| async move {
|
||||
let bus = Arc::new(EventBus::with_capacity(iters as usize + 1024));
|
||||
|
||||
let received = Arc::new(AtomicU64::new(0));
|
||||
let mut subscriptions = Vec::with_capacity(pattern_count as usize);
|
||||
|
||||
for index in 0..pattern_count {
|
||||
let pattern = if index == 0 {
|
||||
PATTERN.to_string()
|
||||
} else {
|
||||
format!("unused-{index}-*")
|
||||
};
|
||||
|
||||
let handler_count = Arc::clone(&received);
|
||||
|
||||
let subscription =
|
||||
bus.on_pattern::<SmallEvent, _>(&pattern, move |topic, event| {
|
||||
let _ = topic;
|
||||
let _ = event.value;
|
||||
handler_count.fetch_add(1, Ordering::Relaxed);
|
||||
});
|
||||
|
||||
subscriptions.push(subscription);
|
||||
}
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
for i in 0..iters {
|
||||
bus.emit(TOPIC, SmallEvent { value: i });
|
||||
}
|
||||
|
||||
wait_until_received(&received, iters).await;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
for subscription in subscriptions {
|
||||
subscription.abort();
|
||||
}
|
||||
|
||||
elapsed
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
criterion_group!(
|
||||
benches,
|
||||
bench_emit_no_subscriber,
|
||||
bench_raw_subscriber,
|
||||
bench_typed_callback,
|
||||
bench_pattern_callback,
|
||||
bench_multiple_subscribers,
|
||||
bench_multiple_patterns,
|
||||
);
|
||||
|
||||
criterion_main!(benches);
|
||||
Reference in New Issue
Block a user