diff --git a/Cargo.toml b/Cargo.toml index 19961bd..965e63e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,10 +5,13 @@ edition = "2021" [dependencies] caseless = "0.2.1" +futures-util = "0.3.31" jiff = { version = "0.1.15", default-features = false, features = ["std"] } +log = "0.4.22" serde = { version = "1.0.215", features = ["derive"] } serde_json = "1.0.133" -tokio = { version = "1.42.0", features = ["sync", "time"] } +tokio = { version = "1.42.0", features = ["macros", "sync", "time"] } +tokio-tungstenite = "0.24.0" unicode-normalization = "0.1.24" [lints] diff --git a/src/api/packets.rs b/src/api/packets.rs index 6c07b6c..210ddc5 100644 --- a/src/api/packets.rs +++ b/src/api/packets.rs @@ -216,6 +216,17 @@ pub struct ParsedPacket { } impl ParsedPacket { + /// Convert a [`Data`]-compatible value into a [`ParsedPacket`]. + pub fn from_data(id: Option, data: impl Into) -> Self { + let data = data.into(); + Self { + id, + r#type: data.packet_type(), + content: Ok(data), + throttled: None, + } + } + /// Convert a [`Packet`] into a [`ParsedPacket`]. /// /// This method may fail if the packet data is invalid. diff --git a/src/conn.rs b/src/conn.rs index 7255d60..315468b 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -1,361 +1,101 @@ -//! Connection state modeling. +//! Basic connection between client and server. -use std::collections::HashMap; -use std::convert::Infallible; -use std::future::Future; -use std::time::{Duration, Instant}; -use std::{error, fmt, result}; +use std::{error, fmt, result, time::Duration}; -use futures_util::SinkExt; +use futures_util::{SinkExt, StreamExt}; use jiff::Timestamp; use log::debug; -use tokio::net::TcpStream; -use tokio::select; -use tokio::sync::{mpsc, oneshot}; -use tokio_stream::StreamExt; -use tokio_tungstenite::tungstenite::client::IntoClientRequest; -use tokio_tungstenite::tungstenite::http::{header, HeaderValue}; -use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream}; - -use crate::api::packet::{Command, ParsedPacket}; -use crate::api::{ - BounceEvent, Data, HelloEvent, LoginReply, NickEvent, PersonalAccountView, Ping, PingReply, - SessionId, SessionView, SnapshotEvent, Time, UserId, +use tokio::{ + net::TcpStream, + select, + time::{self, Instant}, +}; +use tokio_tungstenite::{ + tungstenite::{self, client::IntoClientRequest, handshake::client::Response, Message}, + MaybeTlsStream, WebSocketStream, }; -use crate::replies::{self, PendingReply, Replies}; -pub type WsStream = WebSocketStream>; +use crate::api::{Data, Packet, PacketType, ParsedPacket, Ping, PingEvent, PingReply, Time}; +/// An error that can occur while using an [`EuphConn`]. #[derive(Debug)] pub enum Error { - /// The connection is now closed. + /// The connection is closed. ConnectionClosed, - /// The connection was not opened in time. - ConnectionTimedOut, - /// The server didn't reply to one of our commands in time. - CommandTimedOut, - /// The server did something that violated the api specification. - ProtocolViolation(&'static str), - /// An error returned by the euphoria server. - Euph(String), + + /// A ping was not replied to in time. + PingTimeout, + + /// A packet was not sent because it was malformed. + MalformedPacket(serde_json::Error), + + /// A malformed packet was received. + ReceivedMalformedPacket(serde_json::Error), + + /// A binary message was received. + ReceivedBinaryMessage, Tungstenite(tungstenite::Error), - SerdeJson(serde_json::Error), } impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::ConnectionClosed => write!(f, "connection closed"), - Self::ConnectionTimedOut => write!(f, "connection did not open in time"), - Self::CommandTimedOut => write!(f, "server did not reply to command in time"), - Self::ProtocolViolation(msg) => write!(f, "{msg}"), - Self::Euph(msg) => write!(f, "{msg}"), + Self::PingTimeout => write!(f, "ping timed out"), + Self::MalformedPacket(err) => write!(f, "malformed packet: {err}"), + Self::ReceivedMalformedPacket(err) => write!(f, "received malformed packet: {err}"), + Self::ReceivedBinaryMessage => write!(f, "received binary message"), Self::Tungstenite(err) => write!(f, "{err}"), - Self::SerdeJson(err) => write!(f, "{err}"), } } } +impl error::Error for Error {} + impl From for Error { fn from(err: tungstenite::Error) -> Self { Self::Tungstenite(err) } } -impl From for Error { - fn from(err: serde_json::Error) -> Self { - Self::SerdeJson(err) - } -} - -impl error::Error for Error {} - +/// An alias of [`Result`](result::Result) for [`Error`]. pub type Result = result::Result; -#[derive(Debug, Clone)] -pub struct Joining { - pub since: Timestamp, - pub hello: Option, - pub snapshot: Option, - pub bounce: Option, +/// Which side of the connection we're on. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Side { + /// We're the client and are talking to a server. + Client, + /// We're the server and are talking to a client. + Server, } -impl Joining { - fn new() -> Self { +/// Configuration options for a [`Conn`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ConnConfig { + /// How long to wait in-between pings. + pub ping_interval: Duration, +} + +impl Default for ConnConfig { + fn default() -> Self { Self { - since: Timestamp::now(), - hello: None, - snapshot: None, - bounce: None, - } - } - - fn on_data(&mut self, data: &Data) -> Result<()> { - match data { - Data::BounceEvent(p) => self.bounce = Some(p.clone()), - Data::HelloEvent(p) => self.hello = Some(p.clone()), - Data::SnapshotEvent(p) => self.snapshot = Some(p.clone()), - // TODO Check and maybe expand list of unexpected packet types - Data::JoinEvent(_) - | Data::NetworkEvent(_) - | Data::NickEvent(_) - | Data::EditMessageEvent(_) - | Data::PartEvent(_) - | Data::PmInitiateEvent(_) - | Data::SendEvent(_) => return Err(Error::ProtocolViolation("unexpected packet type")), - _ => {} - } - Ok(()) - } - - fn joined(&self) -> Option { - if let (Some(hello), Some(snapshot)) = (&self.hello, &self.snapshot) { - let mut session = hello.session.clone(); - if let Some(nick) = &snapshot.nick { - session.name = nick.clone(); - } - let listing = snapshot - .listing - .iter() - .cloned() - .map(|s| (s.session_id.clone(), SessionInfo::Full(s))) - .collect::>(); - Some(Joined { - since: Timestamp::now(), - session, - account: hello.account.clone(), - listing, - }) - } else { - None + ping_interval: Duration::from_secs(30), } } } -#[derive(Debug, Clone)] -pub enum SessionInfo { - Full(SessionView), - Partial(NickEvent), -} - -impl SessionInfo { - pub fn id(&self) -> &UserId { - match self { - Self::Full(sess) => &sess.id, - Self::Partial(nick) => &nick.id, - } - } - - pub fn session_id(&self) -> &SessionId { - match self { - Self::Full(sess) => &sess.session_id, - Self::Partial(nick) => &nick.session_id, - } - } - - pub fn name(&self) -> &str { - match self { - Self::Full(sess) => &sess.name, - Self::Partial(nick) => &nick.to, - } - } -} - -#[derive(Debug, Clone)] -pub struct Joined { - pub since: Timestamp, - pub session: SessionView, - pub account: Option, - pub listing: HashMap, -} - -impl Joined { - fn on_data(&mut self, data: &Data) { - match data { - Data::JoinEvent(p) => { - debug!("Updating listing after join-event"); - self.listing - .insert(p.0.session_id.clone(), SessionInfo::Full(p.0.clone())); - } - Data::SendEvent(p) => { - debug!("Updating listing after send-event"); - self.listing.insert( - p.0.sender.session_id.clone(), - SessionInfo::Full(p.0.sender.clone()), - ); - } - Data::PartEvent(p) => { - debug!("Updating listing after part-event"); - self.listing.remove(&p.0.session_id); - } - Data::NetworkEvent(p) => { - if p.r#type == "partition" { - debug!("Updating listing after network-event with type partition"); - self.listing.retain(|_, s| match s { - SessionInfo::Full(s) => { - s.server_id != p.server_id && s.server_era != p.server_era - } - // We can't know if the session was disconnected by the - // partition or not, so we're erring on the side of - // caution and assuming they were kicked. If we're - // wrong, we'll re-add the session as soon as it - // performs another visible action. - // - // If we always kept such sessions, we might keep - // disconnected ones indefinitely, thereby keeping them - // from moving on, instead forever tethering them to the - // digital realm. - SessionInfo::Partial(_) => false, - }); - } - } - Data::NickEvent(p) => { - debug!("Updating listing after nick-event"); - self.listing - .entry(p.session_id.clone()) - .and_modify(|s| match s { - SessionInfo::Full(session) => session.name = p.to.clone(), - SessionInfo::Partial(_) => *s = SessionInfo::Partial(p.clone()), - }) - .or_insert_with(|| SessionInfo::Partial(p.clone())); - } - Data::NickReply(p) => { - debug!("Updating own session after nick-reply"); - assert_eq!(self.session.id, p.id); - self.session.name = p.to.clone(); - } - // The who reply is broken and can't be trusted right now, so we'll - // not even look at it. - _ => {} - } - } -} - -#[derive(Debug, Clone)] -#[allow(clippy::large_enum_variant)] -pub enum State { - Joining(Joining), - Joined(Joined), -} - -impl State { - pub fn into_joining(self) -> Option { - match self { - Self::Joining(joining) => Some(joining), - Self::Joined(_) => None, - } - } - - pub fn into_joined(self) -> Option { - match self { - Self::Joining(_) => None, - Self::Joined(joined) => Some(joined), - } - } - - pub fn joining(&self) -> Option<&Joining> { - match self { - Self::Joining(joining) => Some(joining), - Self::Joined(_) => None, - } - } - - pub fn joined(&self) -> Option<&Joined> { - match self { - Self::Joining(_) => None, - Self::Joined(joined) => Some(joined), - } - } -} - -#[allow(clippy::large_enum_variant)] -enum ConnCommand { - SendCmd(Data, oneshot::Sender>), - GetState(oneshot::Sender), -} - -#[derive(Debug, Clone)] -pub struct ConnTx { - cmd_tx: mpsc::UnboundedSender, -} - -impl ConnTx { - /// The async part of sending a command. - /// - /// This is split into a separate function so that [`Self::send`] can be - /// fully synchronous (you can safely throw away the returned future) while - /// still guaranteeing that the packet was sent. - async fn finish_send(rx: oneshot::Receiver>) -> Result - where - C: Command, - C::Reply: TryFrom, - { - let pending_reply = rx - .await - // This should only happen if something goes wrong during encoding - // of the packet or while sending it through the websocket. Assuming - // the first doesn't happen, the connection is probably closed. - .map_err(|_| Error::ConnectionClosed)?; - - let data = pending_reply - .get() - .await - .map_err(|e| match e { - replies::Error::TimedOut => Error::CommandTimedOut, - replies::Error::Canceled => Error::ConnectionClosed, - })? - .content - .map_err(Error::Euph)?; - - data.try_into() - .map_err(|_| Error::ProtocolViolation("incorrect command reply type")) - } - - /// Send a command to the server. - /// - /// Returns a future containing the server's reply. This future does not - /// have to be awaited and can be safely ignored if you are not interested - /// in the reply. - /// - /// This function may return before the command was sent. To ensure that it - /// was sent before doing something else, await the returned future first. - /// - /// When called multiple times, this function guarantees that the commands - /// are sent in the order that the function is called. - pub fn send(&self, cmd: C) -> impl Future> - where - C: Command + Into, - C::Reply: TryFrom, - { - let (tx, rx) = oneshot::channel(); - let _ = self.cmd_tx.send(ConnCommand::SendCmd(cmd.into(), tx)); - Self::finish_send::(rx) - } - - /// Like [`Self::send`] but ignoring the server's reply. - pub fn send_only>(&self, cmd: C) { - let (tx, _) = oneshot::channel(); - let _ = self.cmd_tx.send(ConnCommand::SendCmd(cmd.into(), tx)); - } - - pub async fn state(&self) -> Result { - let (tx, rx) = oneshot::channel(); - self.cmd_tx - .send(ConnCommand::GetState(tx)) - .map_err(|_| Error::ConnectionClosed)?; - rx.await.map_err(|_| Error::ConnectionClosed) - } -} - -#[derive(Debug)] +/// A basic connection between a client and a server. +/// +/// The connection can be used both from a server's and from a client's +/// perspective. In both cases, it performs regular websocket *and* euphoria +/// pings and terminates the connection if the other side does not reply before +/// the next ping is sent. pub struct Conn { - ws: WsStream, - last_id: usize, - replies: Replies, - - conn_tx: ConnTx, - cmd_rx: mpsc::UnboundedReceiver, + ws: WebSocketStream>, + side: Side, + config: ConnConfig, // The websocket server may send a pong frame with arbitrary payload // unprompted at any time (see RFC 6455 5.5.3). Because of this, we can't @@ -365,161 +105,136 @@ pub struct Conn { last_ws_ping_replied_to: bool, last_euph_ping_payload: Option