Structure room similar to conn

This commit is contained in:
Joscha 2022-06-23 17:15:35 +02:00
parent 1ae81899a6
commit 3e5f97c2cd
3 changed files with 93 additions and 75 deletions

View file

@ -1,5 +1,3 @@
pub mod api; pub mod api;
pub mod conn; pub mod conn;
// mod room; pub mod room;
// pub use room::Room;

View file

@ -1,5 +1,7 @@
//! Connection state modeling. //! Connection state modeling.
// TODO Catch errors differently when sending into mpsc/oneshot
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::Infallible; use std::convert::Infallible;
use std::time::Duration; use std::time::Duration;

View file

@ -1,110 +1,128 @@
use std::convert::Infallible; use std::convert::Infallible;
use std::time::Duration; use std::time::Duration;
use futures::stream::{SplitSink, SplitStream}; use anyhow::bail;
use futures::StreamExt;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use tokio::{select, task, time}; use tokio::{select, task, time};
use tokio_tungstenite::tungstenite; use tokio_tungstenite::tungstenite;
use super::conn::{State, Status, WsStream}; use super::api::Data;
use super::conn::{self, ConnRx, ConnTx, Status, WsStream};
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("room stopped")]
Stopped,
}
#[derive(Debug)] #[derive(Debug)]
enum Event { enum Event {
Connected(SplitSink<WsStream, tungstenite::Message>), Connected(ConnTx),
Disconnected, Disconnected,
WsMessage(tungstenite::Message), Data(Data),
DoPings, Status(oneshot::Sender<Option<Status>>),
GetStatus(oneshot::Sender<Option<Status>>),
} }
async fn run( #[derive(Debug)]
canary: oneshot::Receiver<Infallible>, struct State {
tx: mpsc::UnboundedSender<Event>, conn_tx: Option<ConnTx>,
rx: mpsc::UnboundedReceiver<Event>, }
url: String,
) { impl State {
let state = State::default(); async fn run(
select! { name: String,
_ = canary => (), canary: oneshot::Receiver<Infallible>,
_ = respond_to_events(state, rx) => (), event_tx: mpsc::UnboundedSender<Event>,
_ = maintain_connection(tx, url) => (), mut event_rx: mpsc::UnboundedReceiver<Event>,
) {
let mut state = Self { conn_tx: None };
select! {
_ = canary => (),
_ = Self::reconnect(&name, &event_tx) => (),
_ = state.handle_events(&mut event_rx) => (),
}
} }
}
async fn respond_to_events( async fn reconnect(name: &str, event_tx: &mpsc::UnboundedSender<Event>) -> anyhow::Result<()> {
mut state: State, loop {
mut rx: mpsc::UnboundedReceiver<Event>, let (conn_tx, mut conn_rx) = match Self::connect(name).await? {
) -> anyhow::Result<()> { Some(conn) => conn,
while let Some(event) = rx.recv().await { None => continue,
match event { };
Event::Connected(tx) => state.on_connected(tx), event_tx.send(Event::Connected(conn_tx))?;
Event::Disconnected => state.on_disconnected(),
Event::WsMessage(msg) => state.on_ws_message(msg)?, while let Ok(data) = conn_rx.recv().await {
Event::DoPings => state.on_do_pings()?, event_tx.send(Event::Data(data))?;
Event::GetStatus(tx) => { }
let _ = tx.send(state.status());
event_tx.send(Event::Disconnected)?;
time::sleep(Duration::from_secs(5)).await; // TODO Make configurable
}
}
async fn connect(name: &str) -> anyhow::Result<Option<(ConnTx, ConnRx)>> {
// TODO Cookies
let url = format!("wss://euphoria.io/room/{name}/ws");
match tokio_tungstenite::connect_async(&url).await {
Ok((ws, _)) => Ok(Some(conn::wrap(ws))),
Err(tungstenite::Error::Http(resp)) if resp.status().is_client_error() => {
bail!("room {name} doesn't exist");
}
Err(_) => Ok(None),
}
}
async fn handle_events(&mut self, event_rx: &mut mpsc::UnboundedReceiver<Event>) {
while let Some(event) = event_rx.recv().await {
match event {
Event::Connected(conn_tx) => self.conn_tx = Some(conn_tx),
Event::Disconnected => self.conn_tx = None,
Event::Data(data) => self.on_data(data).await,
Event::Status(reply_tx) => self.on_status(reply_tx).await,
} }
} }
} }
Ok(())
}
async fn maintain_connection(tx: mpsc::UnboundedSender<Event>, url: String) -> anyhow::Result<()> { async fn on_data(&self, data: Data) {
loop { todo!()
// TODO Cookies }
let (ws, _) = tokio_tungstenite::connect_async(&url).await?;
let (ws_tx, ws_rx) = ws.split(); async fn on_status(&self, reply_tx: oneshot::Sender<Option<Status>>) {
tx.send(Event::Connected(ws_tx))?; let status = if let Some(conn_tx) = &self.conn_tx {
select! { conn_tx.status().await.ok()
_ = receive_messages(&tx, ws_rx) => (), } else {
_ = prompt_pings(&tx) => () None
} };
tx.send(Event::Disconnected)?;
// TODO Make reconnect delay configurable let _ = reply_tx.send(status);
time::sleep(Duration::from_secs(5)).await;
}
}
async fn receive_messages(
tx: &mpsc::UnboundedSender<Event>,
mut rx: SplitStream<WsStream>,
) -> anyhow::Result<()> {
while let Some(msg) = rx.next().await {
tx.send(Event::WsMessage(msg?))?;
}
Ok(())
}
async fn prompt_pings(tx: &mpsc::UnboundedSender<Event>) -> anyhow::Result<()> {
loop {
// TODO Make ping delay configurable
time::sleep(Duration::from_secs(10)).await;
tx.send(Event::DoPings)?;
} }
} }
#[derive(Debug)]
pub struct Room { pub struct Room {
canary: oneshot::Sender<Infallible>, canary: oneshot::Sender<Infallible>,
tx: mpsc::UnboundedSender<Event>, event_tx: mpsc::UnboundedSender<Event>,
} }
impl Room { impl Room {
pub fn start(url: String) -> Self { pub fn new(name: String) -> Self {
let (event_tx, event_rx) = mpsc::unbounded_channel();
let (canary_tx, canary_rx) = oneshot::channel(); let (canary_tx, canary_rx) = oneshot::channel();
let (event_tx, event_rx) = mpsc::unbounded_channel();
task::spawn(run(canary_rx, event_tx.clone(), event_rx, url)); task::spawn(State::run(name, canary_rx, event_tx.clone(), event_rx));
Self { Self {
canary: canary_tx, canary: canary_tx,
tx: event_tx, event_tx,
} }
} }
pub async fn status(&self) -> anyhow::Result<Option<Status>> { pub async fn status(&self) -> Result<Option<Status>, Error> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.tx.send(Event::GetStatus(tx))?; self.event_tx
Ok(rx.await?) .send(Event::Status(tx))
.map_err(|_| Error::Stopped)?;
rx.await.map_err(|_| Error::Stopped)
} }
} }