//! Connection state modeling. // TODO Catch errors differently when sending into mpsc/oneshot use std::collections::HashMap; use std::convert::Infallible; use std::future::Future; use std::time::Duration; use std::{error, fmt}; use futures::channel::oneshot; use futures::stream::{SplitSink, SplitStream}; use futures::{SinkExt, StreamExt}; use tokio::net::TcpStream; use tokio::sync::mpsc; use tokio::{select, task, time}; use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream}; use crate::api::packet::{Command, Packet, ParsedPacket}; use crate::api::{ BounceEvent, Data, HelloEvent, LoginReply, NickEvent, PersonalAccountView, Ping, PingReply, SessionId, SessionView, SnapshotEvent, Time, UserId, }; use crate::replies::{self, PendingReply, Replies}; pub type WsStream = WebSocketStream>; #[derive(Debug)] pub enum Error { ConnectionClosed, TimedOut, IncorrectReplyType, Euph(String), } impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::ConnectionClosed => write!(f, "connection closed"), Self::TimedOut => write!(f, "packet timed out"), Self::IncorrectReplyType => write!(f, "incorrect reply type"), Self::Euph(error_msg) => write!(f, "{error_msg}"), } } } impl error::Error for Error {} type InternalResult = Result>; #[derive(Debug)] enum Event { Message(tungstenite::Message), SendCmd(Data, oneshot::Sender>), SendRpl(Option, Data), Status(oneshot::Sender), DoPings, } impl Event { fn send_cmd>(cmd: C, rpl: oneshot::Sender>) -> Self { Self::SendCmd(cmd.into(), rpl) } fn send_rpl>(id: Option, rpl: C) -> Self { Self::SendRpl(id, rpl.into()) } } #[derive(Debug, Clone, Default)] pub struct Joining { pub hello: Option, pub snapshot: Option, pub bounce: Option, } impl Joining { fn on_data(&mut self, data: &Data) -> InternalResult<()> { match data { Data::BounceEvent(p) => self.bounce = Some(p.clone()), Data::HelloEvent(p) => self.hello = Some(p.clone()), Data::SnapshotEvent(p) => self.snapshot = Some(p.clone()), Data::JoinEvent(_) | Data::NetworkEvent(_) | Data::NickEvent(_) | Data::EditMessageEvent(_) | Data::PartEvent(_) | Data::PmInitiateEvent(_) | Data::SendEvent(_) => return Err("unexpected packet type".into()), _ => {} } Ok(()) } fn joined(&self) -> Option { if let (Some(hello), Some(snapshot)) = (&self.hello, &self.snapshot) { let mut session = hello.session.clone(); if let Some(nick) = &snapshot.nick { session.name = nick.clone(); } let listing = snapshot .listing .iter() .cloned() .map(|s| (s.session_id.clone(), SessionInfo::Full(s))) .collect::>(); Some(Joined { session, account: hello.account.clone(), listing, }) } else { None } } } #[derive(Debug, Clone)] pub enum SessionInfo { Full(SessionView), Partial(NickEvent), } impl SessionInfo { pub fn id(&self) -> &UserId { match self { Self::Full(sess) => &sess.id, Self::Partial(nick) => &nick.id, } } pub fn session_id(&self) -> &SessionId { match self { Self::Full(sess) => &sess.session_id, Self::Partial(nick) => &nick.session_id, } } pub fn name(&self) -> &str { match self { Self::Full(sess) => &sess.name, Self::Partial(nick) => &nick.to, } } } #[derive(Debug, Clone)] pub struct Joined { pub session: SessionView, pub account: Option, pub listing: HashMap, } impl Joined { fn on_data(&mut self, data: &Data) { match data { Data::JoinEvent(p) => { self.listing .insert(p.0.session_id.clone(), SessionInfo::Full(p.0.clone())); } Data::SendEvent(p) => { self.listing.insert( p.0.sender.session_id.clone(), SessionInfo::Full(p.0.sender.clone()), ); } Data::PartEvent(p) => { self.listing.remove(&p.0.session_id); } Data::NetworkEvent(p) => { if p.r#type == "partition" { self.listing.retain(|_, s| match s { SessionInfo::Full(s) => { s.server_id != p.server_id && s.server_era != p.server_era } // We can't know if the session was disconnected by the // partition or not, so we're erring on the side of // caution and assuming they were kicked. If we're // wrong, we'll re-add the session as soon as it // performs another visible action. // // If we always kept such sessions, we might keep // disconnected ones indefinitely, thereby keeping them // from moving on, instead forever tethering them to the // digital realm. SessionInfo::Partial(_) => false, }); } } Data::NickEvent(p) => { self.listing .entry(p.session_id.clone()) .and_modify(|s| match s { SessionInfo::Full(session) => session.name = p.to.clone(), SessionInfo::Partial(_) => *s = SessionInfo::Partial(p.clone()), }) .or_insert_with(|| SessionInfo::Partial(p.clone())); } Data::NickReply(p) => { assert_eq!(self.session.id, p.id); self.session.name = p.to.clone(); } // The who reply is broken and can't be trusted right now, so we'll // not even look at it. _ => {} } } } #[derive(Debug, Clone)] #[allow(clippy::large_enum_variant)] pub enum Status { Joining(Joining), Joined(Joined), } struct State { ws_tx: SplitSink, last_id: usize, replies: Replies, packet_tx: mpsc::UnboundedSender, // The server may send a pong frame with arbitrary payload unprompted at any // time (see RFC 6455 5.5.3). Because of this, we can't just remember the // last pong payload. ws_ping_counter: u64, last_ws_ping: Option>, last_ws_ping_replied_to: bool, last_euph_ping: Option