diff --git a/Cargo.lock b/Cargo.lock index fbf0dd2..77f1d0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -192,6 +192,7 @@ dependencies = [ "edit", "futures", "parking_lot", + "rand", "rusqlite", "serde", "serde_json", diff --git a/cove-tui/Cargo.toml b/cove-tui/Cargo.toml index 3d6be6e..4af78f4 100644 --- a/cove-tui/Cargo.toml +++ b/cove-tui/Cargo.toml @@ -12,6 +12,7 @@ directories = "4.0.1" edit = "0.1.4" futures = "0.3.21" parking_lot = "0.12.1" +rand = "0.8.5" rusqlite = { version = "0.27.0", features = ["chrono"] } serde = { version = "1.0.137", features = ["derive"] } serde_json = "1.0.81" diff --git a/cove-tui/src/euph.rs b/cove-tui/src/euph.rs index 5cc6ad6..2056485 100644 --- a/cove-tui/src/euph.rs +++ b/cove-tui/src/euph.rs @@ -1,5 +1,6 @@ mod api; -mod room; +mod conn; +// mod room; pub use api::{Message, SessionView, Snowflake, Time, UserId}; -pub use room::Room; +// pub use room::Room; diff --git a/cove-tui/src/euph/api/packet.rs b/cove-tui/src/euph/api/packet.rs index ddd4d73..60f4f17 100644 --- a/cove-tui/src/euph/api/packet.rs +++ b/cove-tui/src/euph/api/packet.rs @@ -184,6 +184,14 @@ impl fmt::Display for PacketType { } } +#[derive(Debug, thiserror::Error)] +pub enum PacketError { + #[error("throttled: {0}")] + Throttled(String), + #[error("error: {0}")] + Error(String), +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Packet { pub id: Option, @@ -195,6 +203,21 @@ pub struct Packet { pub throttled_reason: Option, } +impl Packet { + pub fn data(self) -> Result { + if self.throttled { + let reason = self + .throttled_reason + .unwrap_or_else(|| "no reason given".to_string()); + Err(PacketError::Throttled(reason)) + } else if let Some(error) = self.error { + Err(PacketError::Error(error)) + } else { + Ok(self.data.unwrap_or_default()) + } + } +} + pub trait HasPacketType { fn packet_type() -> PacketType; } @@ -234,14 +257,10 @@ pub enum DecodeError { expected: PacketType, actual: PacketType, }, - #[error("throttled: {0}")] - Throttled(String), - #[error("error: {0}")] - Error(String), - #[error("no data")] - NoData, #[error("{0}")] SerdeJson(#[from] serde_json::Error), + #[error("{0}")] + Packet(#[from] PacketError), } pub trait FromPacket: Sized { @@ -255,15 +274,8 @@ impl FromPacket for T { expected: Self::packet_type(), actual: packet.r#type, }) - } else if packet.throttled { - let reason = packet - .throttled_reason - .unwrap_or_else(|| "no reason given".to_string()); - Err(DecodeError::Throttled(reason)) - } else if let Some(error) = packet.error { - Err(DecodeError::Error(error)) } else { - let data = packet.data.unwrap_or_default(); + let data = packet.data()?; Ok(serde_json::from_value(data)?) } } diff --git a/cove-tui/src/euph/api/types.rs b/cove-tui/src/euph/api/types.rs index 32d4328..381e40f 100644 --- a/cove-tui/src/euph/api/types.rs +++ b/cove-tui/src/euph/api/types.rs @@ -10,7 +10,6 @@ use std::fmt; use chrono::{DateTime, Utc}; use serde::{de, ser, Deserialize, Serialize}; -use serde_json::Value; /// Describes an account and its preferred name. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -146,7 +145,7 @@ impl<'de> Deserialize<'de> for Snowflake { /// Time is specified as a signed 64-bit integer, giving the number of seconds /// since the Unix Epoch. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub struct Time(#[serde(with = "chrono::serde::ts_seconds")] pub DateTime); /// Identifies a user. @@ -156,5 +155,5 @@ pub struct Time(#[serde(with = "chrono::serde::ts_seconds")] pub DateTime); /// /// It is possible for this value to have no prefix and colon, and there is no /// fixed format for the unique value. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] pub struct UserId(pub String); diff --git a/cove-tui/src/euph/conn.rs b/cove-tui/src/euph/conn.rs new file mode 100644 index 0000000..1682626 --- /dev/null +++ b/cove-tui/src/euph/conn.rs @@ -0,0 +1,404 @@ +//! Connection state modeling. + +use std::collections::HashMap; +use std::convert::Infallible; +use std::time::Duration; + +use anyhow::bail; +use chrono::Utc; +use futures::channel::oneshot; +use futures::stream::{SplitSink, SplitStream}; +use futures::{SinkExt, StreamExt}; +use rand::Rng; +use tokio::net::TcpStream; +use tokio::sync::mpsc; +use tokio::{select, task, time}; +use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream}; + +use crate::replies::{self, Replies}; + +use super::api::{ + BounceEvent, FromPacket, HelloEvent, JoinEvent, NetworkEvent, NickEvent, NickReply, Packet, + PacketType, PartEvent, PersonalAccountView, Ping, PingEvent, PingReply, SendEvent, + SnapshotEvent, ToPacket, +}; +use super::{SessionView, Time, UserId}; + +pub type WsStream = WebSocketStream>; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("connection closed")] + ConnectionClosed, + #[error("packet timed out")] + TimedOut, +} + +#[derive(Debug)] +enum Event { + Message(tungstenite::Message), + Send(Packet, oneshot::Sender>), + Status(oneshot::Sender), + DoPings, +} + +#[derive(Debug, Clone, Default)] +pub struct Joining { + hello: Option, + snapshot: Option, + bounce: Option, +} + +impl Joining { + fn on_packet(&mut self, packet: Packet) -> anyhow::Result<()> { + match packet.r#type { + PacketType::BounceEvent => self.bounce = Some(BounceEvent::from_packet(packet)?), + PacketType::HelloEvent => self.hello = Some(HelloEvent::from_packet(packet)?), + PacketType::SnapshotEvent => self.snapshot = Some(SnapshotEvent::from_packet(packet)?), + _ => {} + } + Ok(()) + } + + fn joined(&self) -> Option { + if let (Some(hello), Some(snapshot)) = (&self.hello, &self.snapshot) { + let listing = snapshot + .listing + .iter() + .cloned() + .map(|s| (s.id.clone(), s)) + .collect::>(); + Some(Joined { + session: hello.session.clone(), + account: hello.account.clone(), + listing, + }) + } else { + None + } + } +} + +#[derive(Debug, Clone)] +pub struct Joined { + session: SessionView, + account: Option, + listing: HashMap, +} + +impl Joined { + fn on_packet(&mut self, packet: Packet) -> anyhow::Result<()> { + match packet.r#type { + PacketType::JoinEvent => { + let packet = JoinEvent::from_packet(packet)?; + self.listing.insert(packet.0.id.clone(), packet.0); + } + PacketType::SendEvent => { + let packet = SendEvent::from_packet(packet)?; + self.listing + .insert(packet.0.sender.id.clone(), packet.0.sender); + } + PacketType::PartEvent => { + let packet = PartEvent::from_packet(packet)?; + self.listing.remove(&packet.0.id); + } + PacketType::NetworkEvent => { + let p = NetworkEvent::from_packet(packet)?; + if p.r#type == "partition" { + self.listing.retain(|_, s| { + !(s.server_id == p.server_id && s.server_era == p.server_era) + }); + } + } + PacketType::NickEvent => { + let packet = NickEvent::from_packet(packet)?; + if let Some(session) = self.listing.get_mut(&packet.id) { + session.name = packet.to; + } + } + PacketType::NickReply => { + // Since this is a reply, it may contain errors, for example if + // the user specified an invalid nick. We can't just die if that + // happens, so we ignore the error case. + if let Ok(packet) = NickReply::from_packet(packet) { + assert_eq!(self.session.id, packet.id); + self.session.name = packet.to; + } + } + // The who reply is broken and can't be trusted right now, so we'll + // not even look at it. + _ => {} + } + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub enum Status { + Joining(Joining), + Joined(Joined), +} + +struct State { + ws_tx: SplitSink, + last_id: usize, + replies: Replies, + + packet_tx: mpsc::UnboundedSender, + + last_ws_ping: Option>, + last_ws_pong: Option>, + last_euph_ping: Option