From 3e5f97c2cd0e689b4f8cba38d0236609286650f9 Mon Sep 17 00:00:00 2001 From: Joscha Date: Thu, 23 Jun 2022 17:15:35 +0200 Subject: [PATCH] Structure room similar to conn --- src/euph.rs | 4 +- src/euph/conn.rs | 2 + src/euph/room.rs | 162 ++++++++++++++++++++++++++--------------------- 3 files changed, 93 insertions(+), 75 deletions(-) diff --git a/src/euph.rs b/src/euph.rs index 5972b6e..6fbdb28 100644 --- a/src/euph.rs +++ b/src/euph.rs @@ -1,5 +1,3 @@ pub mod api; pub mod conn; -// mod room; - -// pub use room::Room; +pub mod room; diff --git a/src/euph/conn.rs b/src/euph/conn.rs index dd99910..213b3a6 100644 --- a/src/euph/conn.rs +++ b/src/euph/conn.rs @@ -1,5 +1,7 @@ //! Connection state modeling. +// TODO Catch errors differently when sending into mpsc/oneshot + use std::collections::HashMap; use std::convert::Infallible; use std::time::Duration; diff --git a/src/euph/room.rs b/src/euph/room.rs index a3d6354..93a7f0e 100644 --- a/src/euph/room.rs +++ b/src/euph/room.rs @@ -1,110 +1,128 @@ use std::convert::Infallible; use std::time::Duration; -use futures::stream::{SplitSink, SplitStream}; -use futures::StreamExt; +use anyhow::bail; use tokio::sync::{mpsc, oneshot}; use tokio::{select, task, time}; use tokio_tungstenite::tungstenite; -use super::conn::{State, Status, WsStream}; +use super::api::Data; +use super::conn::{self, ConnRx, ConnTx, Status, WsStream}; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("room stopped")] + Stopped, +} #[derive(Debug)] enum Event { - Connected(SplitSink), + Connected(ConnTx), Disconnected, - WsMessage(tungstenite::Message), - DoPings, - GetStatus(oneshot::Sender>), + Data(Data), + Status(oneshot::Sender>), } -async fn run( - canary: oneshot::Receiver, - tx: mpsc::UnboundedSender, - rx: mpsc::UnboundedReceiver, - url: String, -) { - let state = State::default(); - select! { - _ = canary => (), - _ = respond_to_events(state, rx) => (), - _ = maintain_connection(tx, url) => (), +#[derive(Debug)] +struct State { + conn_tx: Option, +} + +impl State { + async fn run( + name: String, + canary: oneshot::Receiver, + event_tx: mpsc::UnboundedSender, + mut event_rx: mpsc::UnboundedReceiver, + ) { + let mut state = Self { conn_tx: None }; + + select! { + _ = canary => (), + _ = Self::reconnect(&name, &event_tx) => (), + _ = state.handle_events(&mut event_rx) => (), + } } -} -async fn respond_to_events( - mut state: State, - mut rx: mpsc::UnboundedReceiver, -) -> anyhow::Result<()> { - while let Some(event) = rx.recv().await { - match event { - Event::Connected(tx) => state.on_connected(tx), - Event::Disconnected => state.on_disconnected(), - Event::WsMessage(msg) => state.on_ws_message(msg)?, - Event::DoPings => state.on_do_pings()?, - Event::GetStatus(tx) => { - let _ = tx.send(state.status()); + async fn reconnect(name: &str, event_tx: &mpsc::UnboundedSender) -> anyhow::Result<()> { + loop { + let (conn_tx, mut conn_rx) = match Self::connect(name).await? { + Some(conn) => conn, + None => continue, + }; + event_tx.send(Event::Connected(conn_tx))?; + + while let Ok(data) = conn_rx.recv().await { + event_tx.send(Event::Data(data))?; + } + + event_tx.send(Event::Disconnected)?; + time::sleep(Duration::from_secs(5)).await; // TODO Make configurable + } + } + + async fn connect(name: &str) -> anyhow::Result> { + // TODO Cookies + let url = format!("wss://euphoria.io/room/{name}/ws"); + match tokio_tungstenite::connect_async(&url).await { + Ok((ws, _)) => Ok(Some(conn::wrap(ws))), + Err(tungstenite::Error::Http(resp)) if resp.status().is_client_error() => { + bail!("room {name} doesn't exist"); + } + Err(_) => Ok(None), + } + } + + async fn handle_events(&mut self, event_rx: &mut mpsc::UnboundedReceiver) { + while let Some(event) = event_rx.recv().await { + match event { + Event::Connected(conn_tx) => self.conn_tx = Some(conn_tx), + Event::Disconnected => self.conn_tx = None, + Event::Data(data) => self.on_data(data).await, + Event::Status(reply_tx) => self.on_status(reply_tx).await, } } } - Ok(()) -} -async fn maintain_connection(tx: mpsc::UnboundedSender, url: String) -> anyhow::Result<()> { - loop { - // TODO Cookies - let (ws, _) = tokio_tungstenite::connect_async(&url).await?; - let (ws_tx, ws_rx) = ws.split(); - tx.send(Event::Connected(ws_tx))?; - select! { - _ = receive_messages(&tx, ws_rx) => (), - _ = prompt_pings(&tx) => () - } - tx.send(Event::Disconnected)?; - // TODO Make reconnect delay configurable - time::sleep(Duration::from_secs(5)).await; - } -} - -async fn receive_messages( - tx: &mpsc::UnboundedSender, - mut rx: SplitStream, -) -> anyhow::Result<()> { - while let Some(msg) = rx.next().await { - tx.send(Event::WsMessage(msg?))?; - } - Ok(()) -} - -async fn prompt_pings(tx: &mpsc::UnboundedSender) -> anyhow::Result<()> { - loop { - // TODO Make ping delay configurable - time::sleep(Duration::from_secs(10)).await; - tx.send(Event::DoPings)?; + async fn on_data(&self, data: Data) { + todo!() + } + + async fn on_status(&self, reply_tx: oneshot::Sender>) { + let status = if let Some(conn_tx) = &self.conn_tx { + conn_tx.status().await.ok() + } else { + None + }; + + let _ = reply_tx.send(status); } } +#[derive(Debug)] pub struct Room { canary: oneshot::Sender, - tx: mpsc::UnboundedSender, + event_tx: mpsc::UnboundedSender, } impl Room { - pub fn start(url: String) -> Self { - let (event_tx, event_rx) = mpsc::unbounded_channel(); + pub fn new(name: String) -> Self { let (canary_tx, canary_rx) = oneshot::channel(); + let (event_tx, event_rx) = mpsc::unbounded_channel(); - task::spawn(run(canary_rx, event_tx.clone(), event_rx, url)); + task::spawn(State::run(name, canary_rx, event_tx.clone(), event_rx)); Self { canary: canary_tx, - tx: event_tx, + event_tx, } } - pub async fn status(&self) -> anyhow::Result> { + pub async fn status(&self) -> Result, Error> { let (tx, rx) = oneshot::channel(); - self.tx.send(Event::GetStatus(tx))?; - Ok(rx.await?) + self.event_tx + .send(Event::Status(tx)) + .map_err(|_| Error::Stopped)?; + rx.await.map_err(|_| Error::Stopped) } }