diff --git a/Cargo.lock b/Cargo.lock index c8ff42b..84dde2d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -160,7 +160,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", - "tokio-tungstenite", + "tokio-tungstenite 0.16.1", ] [[package]] @@ -177,7 +177,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", - "tokio-tungstenite", + "tokio-tungstenite 0.16.1", ] [[package]] @@ -190,11 +190,13 @@ dependencies = [ "crossterm", "directories", "edit", + "futures", "parking_lot", "rusqlite", "serde", "serde_json", "tokio", + "tokio-tungstenite 0.17.1", "toss", ] @@ -973,6 +975,17 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "sha-1" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "028f48d513f9678cda28f6e4064755b3fbb2af6acd672f2c209b62323f7aea0f" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest 0.10.3", +] + [[package]] name = "sha2" version = "0.10.2" @@ -1187,10 +1200,22 @@ dependencies = [ "rustls-native-certs", "tokio", "tokio-rustls", - "tungstenite", + "tungstenite 0.16.0", "webpki", ] +[[package]] +name = "tokio-tungstenite" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06cda1232a49558c46f8a504d5b93101d42c0bf7f911f12a105ba48168f821ae" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.17.2", +] + [[package]] name = "toss" version = "0.1.0" @@ -1216,13 +1241,32 @@ dependencies = [ "log", "rand", "rustls", - "sha-1", + "sha-1 0.9.8", "thiserror", "url", "utf-8", "webpki", ] +[[package]] +name = "tungstenite" +version = "0.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96a2dea40e7570482f28eb57afbe42d97551905da6a9400acc5c328d24004f5" +dependencies = [ + "base64", + "byteorder", + "bytes", + "http", + "httparse", + "log", + "rand", + "sha-1 0.10.0", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.15.0" diff --git a/cove-tui/Cargo.toml b/cove-tui/Cargo.toml index 6b8684f..545f0fd 100644 --- a/cove-tui/Cargo.toml +++ b/cove-tui/Cargo.toml @@ -10,9 +10,11 @@ chrono = { version = "0.4.19", features = ["serde"] } crossterm = "0.23.2" directories = "4.0.1" edit = "0.1.4" +futures = "0.3.21" parking_lot = "0.12.1" rusqlite = { version = "0.27.0", features = ["chrono"] } serde = { version = "1.0.137", features = ["derive"] } serde_json = "1.0.81" tokio = { version = "1.19.2", features = ["full"] } +tokio-tungstenite = "0.17.1" toss = { git = "https://github.com/Garmelon/toss.git", rev = "761519c1a7cdc950eab70fd6539c71bf22919a50" } diff --git a/cove-tui/src/euph.rs b/cove-tui/src/euph.rs index 08d4a57..5cc6ad6 100644 --- a/cove-tui/src/euph.rs +++ b/cove-tui/src/euph.rs @@ -1,14 +1,5 @@ mod api; - -use std::convert::Infallible; - -use tokio::sync::{mpsc, oneshot}; +mod room; pub use api::{Message, SessionView, Snowflake, Time, UserId}; - -enum Request {} - -pub struct EuphRoom { - canary: oneshot::Sender, - tx: mpsc::Sender, -} +pub use room::Room; diff --git a/cove-tui/src/euph/api/types.rs b/cove-tui/src/euph/api/types.rs index 2d18bd6..876bed1 100644 --- a/cove-tui/src/euph/api/types.rs +++ b/cove-tui/src/euph/api/types.rs @@ -1,5 +1,7 @@ //! Field types. +// TODO Add newtype wrappers for different kinds of IDs? + // Serde's derive macros generate this warning and I can't turn it off locally, // so I'm turning it off for the entire module. #![allow(clippy::use_self)] diff --git a/cove-tui/src/euph/room.rs b/cove-tui/src/euph/room.rs new file mode 100644 index 0000000..95422fb --- /dev/null +++ b/cove-tui/src/euph/room.rs @@ -0,0 +1,113 @@ +use std::convert::Infallible; +use std::time::Duration; + +use futures::stream::{SplitSink, SplitStream}; +use futures::StreamExt; +use tokio::net::TcpStream; +use tokio::sync::{mpsc, oneshot}; +use tokio::{select, task, time}; +use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream}; + +type WsStream = WebSocketStream>; + +#[derive(Debug)] +enum Event { + Connected(SplitSink), + Disconnected, + Message(tungstenite::Message), + Ping, +} + +#[derive(Debug)] +struct Connected { + tx: SplitSink, +} + +#[derive(Debug)] +enum State { + Connecting, + Connected(Connected), +} + +impl State { + async fn run( + self, + canary: oneshot::Receiver, + tx: mpsc::UnboundedSender, + rx: mpsc::UnboundedReceiver, + url: String, + ) { + select! { + _ = canary => (), + _ = Self::maintain_connection(tx, url) => (), + _ = self.respond_to_events(rx) => (), + } + } + + async fn maintain_connection( + tx: mpsc::UnboundedSender, + url: String, + ) -> anyhow::Result<()> { + loop { + // TODO Cookies + let (ws, _) = tokio_tungstenite::connect_async(&url).await?; + let (ws_tx, ws_rx) = ws.split(); + tx.send(Event::Connected(ws_tx))?; + select! { + _ = Self::receive_messages(&tx, ws_rx) => (), + _ = Self::prompt_pings(&tx) => () + } + tx.send(Event::Disconnected)?; + // TODO Make reconnect delay configurable + time::sleep(Duration::from_secs(5)).await; + } + } + + async fn receive_messages( + tx: &mpsc::UnboundedSender, + mut rx: SplitStream, + ) -> anyhow::Result<()> { + while let Some(msg) = rx.next().await { + tx.send(Event::Message(msg?))?; + } + Ok(()) + } + + async fn prompt_pings(tx: &mpsc::UnboundedSender) -> anyhow::Result<()> { + loop { + // TODO Make ping delay configurable + time::sleep(Duration::from_secs(10)).await; + tx.send(Event::Ping)?; + } + } + + async fn respond_to_events(mut self, mut rx: mpsc::UnboundedReceiver) { + while let Some(event) = rx.recv().await { + match event { + Event::Connected(tx) => self = State::Connected(Connected { tx }), + Event::Disconnected => self = State::Connecting, + Event::Message(_) => todo!(), + Event::Ping => todo!(), + } + } + } +} + +pub struct Room { + canary: oneshot::Sender, + tx: mpsc::UnboundedSender, +} + +impl Room { + pub fn start(url: String) -> Self { + let (event_tx, event_rx) = mpsc::unbounded_channel(); + let (canary_tx, canary_rx) = oneshot::channel(); + + task::spawn(State::Connecting.run(canary_rx, event_tx.clone(), event_rx, url)); + + Self { + canary: canary_tx, + tx: event_tx, + } + } +}