//! Connection state modeling. // TODO Catch errors differently when sending into mpsc/oneshot use std::collections::HashMap; use std::convert::Infallible; use std::error; use std::future::Future; use std::time::Duration; use futures::channel::oneshot; use futures::stream::{SplitSink, SplitStream}; use futures::{SinkExt, StreamExt}; use tokio::net::TcpStream; use tokio::sync::mpsc; use tokio::{select, task, time}; use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream}; use crate::api::packet::{Command, Packet, ParsedPacket}; use crate::api::{ BounceEvent, Data, HelloEvent, LoginReply, PersonalAccountView, Ping, PingReply, SessionId, SessionView, SnapshotEvent, Time, }; use crate::replies::{self, PendingReply, Replies}; pub type WsStream = WebSocketStream>; #[derive(Debug, thiserror::Error)] pub enum Error { #[error("connection closed")] ConnectionClosed, #[error("packet timed out")] TimedOut, #[error("incorrect reply type")] IncorrectReplyType, #[error("{0}")] Euph(String), } type InternalResult = Result>; #[derive(Debug)] enum Event { Message(tungstenite::Message), SendCmd(Data, oneshot::Sender>), SendRpl(Option, Data), Status(oneshot::Sender), DoPings, } impl Event { fn send_cmd>(cmd: C, rpl: oneshot::Sender>) -> Self { Self::SendCmd(cmd.into(), rpl) } fn send_rpl>(id: Option, rpl: C) -> Self { Self::SendRpl(id, rpl.into()) } } #[derive(Debug, Clone, Default)] pub struct Joining { pub hello: Option, pub snapshot: Option, pub bounce: Option, } impl Joining { fn on_data(&mut self, data: &Data) -> InternalResult<()> { match data { Data::BounceEvent(p) => self.bounce = Some(p.clone()), Data::HelloEvent(p) => self.hello = Some(p.clone()), Data::SnapshotEvent(p) => self.snapshot = Some(p.clone()), Data::JoinEvent(_) | Data::NetworkEvent(_) | Data::NickEvent(_) | Data::EditMessageEvent(_) | Data::PartEvent(_) | Data::PmInitiateEvent(_) | Data::SendEvent(_) => return Err("unexpected packet type".into()), _ => {} } Ok(()) } fn joined(&self) -> Option { if let (Some(hello), Some(snapshot)) = (&self.hello, &self.snapshot) { let mut session = hello.session.clone(); if let Some(nick) = &snapshot.nick { session.name = nick.clone(); } let listing = snapshot .listing .iter() .cloned() .map(|s| (s.session_id.clone(), s)) .collect::>(); Some(Joined { session, account: hello.account.clone(), listing, }) } else { None } } } // TODO Track nick events for listing, add InferredSessionView #[derive(Debug, Clone)] pub struct Joined { pub session: SessionView, pub account: Option, pub listing: HashMap, } impl Joined { fn on_data(&mut self, data: &Data) { match data { Data::JoinEvent(p) => { self.listing.insert(p.0.session_id.clone(), p.0.clone()); } Data::SendEvent(p) => { self.listing .insert(p.0.sender.session_id.clone(), p.0.sender.clone()); } Data::PartEvent(p) => { self.listing.remove(&p.0.session_id); } Data::NetworkEvent(p) => { if p.r#type == "partition" { self.listing.retain(|_, s| { !(s.server_id == p.server_id && s.server_era == p.server_era) }); } } Data::NickEvent(p) => { if let Some(session) = self.listing.get_mut(&p.session_id) { session.name = p.to.clone(); } } Data::NickReply(p) => { assert_eq!(self.session.id, p.id); self.session.name = p.to.clone(); } // The who reply is broken and can't be trusted right now, so we'll // not even look at it. _ => {} } } } #[derive(Debug, Clone)] #[allow(clippy::large_enum_variant)] pub enum Status { Joining(Joining), Joined(Joined), } struct State { ws_tx: SplitSink, last_id: usize, replies: Replies, packet_tx: mpsc::UnboundedSender, // The server may send a pong frame with arbitrary payload unprompted at any // time (see RFC 6455 5.5.3). Because of this, we can't just remember the // last pong payload. ws_ping_counter: u64, last_ws_ping: Option>, last_ws_ping_replied_to: bool, last_euph_ping: Option