Add draft for euph room connection

This commit is contained in:
Joscha 2022-06-21 00:51:41 +02:00
parent a57e15c9f3
commit 21010fc48a
5 changed files with 167 additions and 15 deletions

52
Cargo.lock generated
View file

@ -160,7 +160,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-stream",
"tokio-tungstenite",
"tokio-tungstenite 0.16.1",
]
[[package]]
@ -177,7 +177,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-stream",
"tokio-tungstenite",
"tokio-tungstenite 0.16.1",
]
[[package]]
@ -190,11 +190,13 @@ dependencies = [
"crossterm",
"directories",
"edit",
"futures",
"parking_lot",
"rusqlite",
"serde",
"serde_json",
"tokio",
"tokio-tungstenite 0.17.1",
"toss",
]
@ -973,6 +975,17 @@ dependencies = [
"opaque-debug",
]
[[package]]
name = "sha-1"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "028f48d513f9678cda28f6e4064755b3fbb2af6acd672f2c209b62323f7aea0f"
dependencies = [
"cfg-if",
"cpufeatures",
"digest 0.10.3",
]
[[package]]
name = "sha2"
version = "0.10.2"
@ -1187,10 +1200,22 @@ dependencies = [
"rustls-native-certs",
"tokio",
"tokio-rustls",
"tungstenite",
"tungstenite 0.16.0",
"webpki",
]
[[package]]
name = "tokio-tungstenite"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06cda1232a49558c46f8a504d5b93101d42c0bf7f911f12a105ba48168f821ae"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite 0.17.2",
]
[[package]]
name = "toss"
version = "0.1.0"
@ -1216,13 +1241,32 @@ dependencies = [
"log",
"rand",
"rustls",
"sha-1",
"sha-1 0.9.8",
"thiserror",
"url",
"utf-8",
"webpki",
]
[[package]]
name = "tungstenite"
version = "0.17.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d96a2dea40e7570482f28eb57afbe42d97551905da6a9400acc5c328d24004f5"
dependencies = [
"base64",
"byteorder",
"bytes",
"http",
"httparse",
"log",
"rand",
"sha-1 0.10.0",
"thiserror",
"url",
"utf-8",
]
[[package]]
name = "typenum"
version = "1.15.0"

View file

@ -10,9 +10,11 @@ chrono = { version = "0.4.19", features = ["serde"] }
crossterm = "0.23.2"
directories = "4.0.1"
edit = "0.1.4"
futures = "0.3.21"
parking_lot = "0.12.1"
rusqlite = { version = "0.27.0", features = ["chrono"] }
serde = { version = "1.0.137", features = ["derive"] }
serde_json = "1.0.81"
tokio = { version = "1.19.2", features = ["full"] }
tokio-tungstenite = "0.17.1"
toss = { git = "https://github.com/Garmelon/toss.git", rev = "761519c1a7cdc950eab70fd6539c71bf22919a50" }

View file

@ -1,14 +1,5 @@
mod api;
use std::convert::Infallible;
use tokio::sync::{mpsc, oneshot};
mod room;
pub use api::{Message, SessionView, Snowflake, Time, UserId};
enum Request {}
pub struct EuphRoom {
canary: oneshot::Sender<Infallible>,
tx: mpsc::Sender<Request>,
}
pub use room::Room;

View file

@ -1,5 +1,7 @@
//! Field types.
// TODO Add newtype wrappers for different kinds of IDs?
// Serde's derive macros generate this warning and I can't turn it off locally,
// so I'm turning it off for the entire module.
#![allow(clippy::use_self)]

113
cove-tui/src/euph/room.rs Normal file
View file

@ -0,0 +1,113 @@
use std::convert::Infallible;
use std::time::Duration;
use futures::stream::{SplitSink, SplitStream};
use futures::StreamExt;
use tokio::net::TcpStream;
use tokio::sync::{mpsc, oneshot};
use tokio::{select, task, time};
use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream};
type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
#[derive(Debug)]
enum Event {
Connected(SplitSink<WsStream, tungstenite::Message>),
Disconnected,
Message(tungstenite::Message),
Ping,
}
#[derive(Debug)]
struct Connected {
tx: SplitSink<WsStream, tungstenite::Message>,
}
#[derive(Debug)]
enum State {
Connecting,
Connected(Connected),
}
impl State {
async fn run(
self,
canary: oneshot::Receiver<Infallible>,
tx: mpsc::UnboundedSender<Event>,
rx: mpsc::UnboundedReceiver<Event>,
url: String,
) {
select! {
_ = canary => (),
_ = Self::maintain_connection(tx, url) => (),
_ = self.respond_to_events(rx) => (),
}
}
async fn maintain_connection(
tx: mpsc::UnboundedSender<Event>,
url: String,
) -> anyhow::Result<()> {
loop {
// TODO Cookies
let (ws, _) = tokio_tungstenite::connect_async(&url).await?;
let (ws_tx, ws_rx) = ws.split();
tx.send(Event::Connected(ws_tx))?;
select! {
_ = Self::receive_messages(&tx, ws_rx) => (),
_ = Self::prompt_pings(&tx) => ()
}
tx.send(Event::Disconnected)?;
// TODO Make reconnect delay configurable
time::sleep(Duration::from_secs(5)).await;
}
}
async fn receive_messages(
tx: &mpsc::UnboundedSender<Event>,
mut rx: SplitStream<WsStream>,
) -> anyhow::Result<()> {
while let Some(msg) = rx.next().await {
tx.send(Event::Message(msg?))?;
}
Ok(())
}
async fn prompt_pings(tx: &mpsc::UnboundedSender<Event>) -> anyhow::Result<()> {
loop {
// TODO Make ping delay configurable
time::sleep(Duration::from_secs(10)).await;
tx.send(Event::Ping)?;
}
}
async fn respond_to_events(mut self, mut rx: mpsc::UnboundedReceiver<Event>) {
while let Some(event) = rx.recv().await {
match event {
Event::Connected(tx) => self = State::Connected(Connected { tx }),
Event::Disconnected => self = State::Connecting,
Event::Message(_) => todo!(),
Event::Ping => todo!(),
}
}
}
}
pub struct Room {
canary: oneshot::Sender<Infallible>,
tx: mpsc::UnboundedSender<Event>,
}
impl Room {
pub fn start(url: String) -> Self {
let (event_tx, event_rx) = mpsc::unbounded_channel();
let (canary_tx, canary_rx) = oneshot::channel();
task::spawn(State::Connecting.run(canary_rx, event_tx.clone(), event_rx, url));
Self {
canary: canary_tx,
tx: event_tx,
}
}
}