200 lines
7.5 KiB
Rust
200 lines
7.5 KiB
Rust
use std::any::Any;
|
|
use std::sync::Arc;
|
|
|
|
use tokio::sync::broadcast;
|
|
|
|
/// Type brut d'un événement : pointeur atomique vers n'importe quelle valeur.
|
|
pub type AnyEvent = Arc<dyn Any + Send + Sync>;
|
|
|
|
// ─────────────────────────────────────────────────────────────────────────────
|
|
// Erreurs
|
|
// ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
/// Erreurs possibles lors d'un `recv().await`.
|
|
#[derive(Debug, thiserror::Error)]
|
|
pub enum RecvError {
|
|
#[error("le canal est fermé")]
|
|
Closed,
|
|
#[error("messages perdus (lag) : {0} ignorés")]
|
|
Lagged(u64),
|
|
}
|
|
|
|
/// Erreurs possibles lors d'un `try_recv()`.
|
|
#[derive(Debug, thiserror::Error)]
|
|
pub enum TryRecvError {
|
|
#[error("aucun message disponible")]
|
|
Empty,
|
|
#[error("le canal est fermé")]
|
|
Closed,
|
|
#[error("messages perdus (lag) : {0} ignorés")]
|
|
Lagged(u64),
|
|
}
|
|
|
|
impl From<broadcast::error::RecvError> for RecvError {
|
|
fn from(e: broadcast::error::RecvError) -> Self {
|
|
match e {
|
|
broadcast::error::RecvError::Closed => RecvError::Closed,
|
|
broadcast::error::RecvError::Lagged(n) => RecvError::Lagged(n),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl From<broadcast::error::TryRecvError> for TryRecvError {
|
|
fn from(e: broadcast::error::TryRecvError) -> Self {
|
|
match e {
|
|
broadcast::error::TryRecvError::Empty => TryRecvError::Empty,
|
|
broadcast::error::TryRecvError::Closed => TryRecvError::Closed,
|
|
broadcast::error::TryRecvError::Lagged(n) => TryRecvError::Lagged(n),
|
|
}
|
|
}
|
|
}
|
|
|
|
// ─────────────────────────────────────────────────────────────────────────────
|
|
// TypedReceiver<T>
|
|
// ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
/// Receiver typé pour un topic précis.
|
|
///
|
|
/// `recv().await` retourne directement un `T` — le downcast est automatique.
|
|
/// Les événements d'un type différent sont ignorés silencieusement.
|
|
///
|
|
/// # Exemple
|
|
/// ```rust,no_run
|
|
/// # use std::sync::Arc;
|
|
/// # use oxspeak_server_lib::event_bus::EventBus;
|
|
/// # #[derive(Clone, Debug)] struct User { name: String }
|
|
/// # let bus = Arc::new(EventBus::new());
|
|
/// # tokio_test::block_on(async {
|
|
/// let mut rx = bus.on::<User>("user-connected");
|
|
/// bus.emit("user-connected", User { name: "Alice".into() });
|
|
/// let user = rx.recv().await.unwrap();
|
|
/// println!("{:?}", user);
|
|
/// # });
|
|
/// ```
|
|
pub struct TypedReceiver<T> {
|
|
pub(crate) inner: broadcast::Receiver<AnyEvent>,
|
|
pub(crate) _marker: std::marker::PhantomData<T>,
|
|
}
|
|
|
|
impl<T> TypedReceiver<T>
|
|
where
|
|
T: Any + Send + Sync + Clone + 'static,
|
|
{
|
|
pub(crate) fn new(inner: broadcast::Receiver<AnyEvent>) -> Self {
|
|
Self {
|
|
inner,
|
|
_marker: std::marker::PhantomData,
|
|
}
|
|
}
|
|
|
|
/// Attend le prochain événement de type `T` sur ce topic.
|
|
/// Les événements d'un autre type sont ignorés silencieusement.
|
|
pub async fn recv(&mut self) -> Result<T, RecvError> {
|
|
loop {
|
|
match self.inner.recv().await {
|
|
Ok(evt) => {
|
|
if let Some(val) = evt.downcast_ref::<T>() {
|
|
return Ok(val.clone());
|
|
}
|
|
// Mauvais type → on ignore et on attend le suivant
|
|
}
|
|
Err(e) => return Err(e.into()),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Version non-bloquante.
|
|
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
|
|
loop {
|
|
match self.inner.try_recv() {
|
|
Ok(evt) => {
|
|
if let Some(val) = evt.downcast_ref::<T>() {
|
|
return Ok(val.clone());
|
|
}
|
|
}
|
|
Err(broadcast::error::TryRecvError::Empty) => return Err(TryRecvError::Empty),
|
|
Err(e) => return Err(e.into()),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Accès au receiver brut sous-jacent.
|
|
pub fn into_inner(self) -> broadcast::Receiver<AnyEvent> {
|
|
self.inner
|
|
}
|
|
}
|
|
|
|
// ─────────────────────────────────────────────────────────────────────────────
|
|
// PatternReceiver<T>
|
|
// ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
/// Receiver unique pour un pattern wildcard (ex: `"user-*"`).
|
|
///
|
|
/// Un seul receiver reçoit les événements de **tous** les topics correspondants,
|
|
/// passés et futurs. `recv().await` retourne `(nom_du_topic, valeur)`.
|
|
///
|
|
/// # Exemple
|
|
/// ```rust,no_run
|
|
/// # use std::sync::Arc;
|
|
/// # use oxspeak_server_lib::event_bus::EventBus;
|
|
/// # #[derive(Clone, Debug)] struct User { name: String }
|
|
/// # let bus = Arc::new(EventBus::new());
|
|
/// # tokio_test::block_on(async {
|
|
/// let mut rx = bus.on_pattern::<User>("user-*");
|
|
///
|
|
/// bus.emit("user-created", User { name: "Alice".into() });
|
|
/// bus.emit("user-deleted", User { name: "Bob".into() });
|
|
///
|
|
/// let (topic, user) = rx.recv().await.unwrap();
|
|
/// println!("{}: {:?}", topic, user);
|
|
/// # });
|
|
/// ```
|
|
pub struct PatternReceiver<T> {
|
|
pub(crate) inner: broadcast::Receiver<(String, AnyEvent)>,
|
|
pub(crate) _marker: std::marker::PhantomData<T>,
|
|
}
|
|
|
|
impl<T> PatternReceiver<T>
|
|
where
|
|
T: Any + Send + Sync + Clone + 'static,
|
|
{
|
|
pub(crate) fn new(inner: broadcast::Receiver<(String, AnyEvent)>) -> Self {
|
|
Self {
|
|
inner,
|
|
_marker: std::marker::PhantomData,
|
|
}
|
|
}
|
|
|
|
/// Attend le prochain événement de type `T` sur n'importe quel topic du pattern.
|
|
/// Retourne `(nom_du_topic, valeur)`.
|
|
/// Les événements d'un type différent sont ignorés silencieusement.
|
|
pub async fn recv(&mut self) -> Result<(String, T), RecvError> {
|
|
loop {
|
|
match self.inner.recv().await {
|
|
Ok((topic, evt)) => {
|
|
if let Some(val) = evt.downcast_ref::<T>() {
|
|
return Ok((topic, val.clone()));
|
|
}
|
|
// Mauvais type → on ignore et on attend le suivant
|
|
}
|
|
Err(e) => return Err(e.into()),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Version non-bloquante.
|
|
pub fn try_recv(&mut self) -> Result<(String, T), TryRecvError> {
|
|
loop {
|
|
match self.inner.try_recv() {
|
|
Ok((topic, evt)) => {
|
|
if let Some(val) = evt.downcast_ref::<T>() {
|
|
return Ok((topic, val.clone()));
|
|
}
|
|
}
|
|
Err(broadcast::error::TryRecvError::Empty) => return Err(TryRecvError::Empty),
|
|
Err(e) => return Err(e.into()),
|
|
}
|
|
}
|
|
}
|
|
}
|