//! Connection state modeling. use std::collections::HashMap; use std::convert::Infallible; use std::time::Duration; use anyhow::bail; use chrono::Utc; use futures::channel::oneshot; use futures::stream::{SplitSink, SplitStream}; use futures::{SinkExt, StreamExt}; use rand::Rng; use tokio::net::TcpStream; use tokio::sync::mpsc; use tokio::{select, task, time}; use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream}; use crate::replies::{self, PendingReply, Replies}; use super::api::packet::{Command, Packet, ParsedPacket}; use super::api::{ BounceEvent, Data, HelloEvent, PersonalAccountView, Ping, PingReply, SessionView, SnapshotEvent, Time, UserId, }; pub type WsStream = WebSocketStream>; #[derive(Debug, thiserror::Error)] pub enum Error { #[error("connection closed")] ConnectionClosed, #[error("packet timed out")] TimedOut, #[error("incorrect reply type")] IncorrectReplyType, #[error("{0}")] Euph(String), } #[derive(Debug)] enum Event { Message(tungstenite::Message), SendCmd(Data, oneshot::Sender>>), SendRpl(Option, Data), Status(oneshot::Sender), DoPings, } impl Event { fn send_cmd>( cmd: C, rpl: oneshot::Sender>>, ) -> Self { Self::SendCmd(cmd.into(), rpl) } fn send_rpl>(id: Option, rpl: C) -> Self { Self::SendRpl(id, rpl.into()) } } #[derive(Debug, Clone, Default)] pub struct Joining { hello: Option, snapshot: Option, bounce: Option, } impl Joining { fn on_data(&mut self, data: Data) { match data { Data::BounceEvent(p) => self.bounce = Some(p), Data::HelloEvent(p) => self.hello = Some(p), Data::SnapshotEvent(p) => self.snapshot = Some(p), _ => {} } } fn joined(&self) -> Option { if let (Some(hello), Some(snapshot)) = (&self.hello, &self.snapshot) { let listing = snapshot .listing .iter() .cloned() .map(|s| (s.id.clone(), s)) .collect::>(); Some(Joined { session: hello.session.clone(), account: hello.account.clone(), listing, }) } else { None } } } #[derive(Debug, Clone)] pub struct Joined { session: SessionView, account: Option, listing: HashMap, } impl Joined { fn on_data(&mut self, data: Data) { match data { Data::JoinEvent(p) => { self.listing.insert(p.0.id.clone(), p.0); } Data::SendEvent(p) => { self.listing.insert(p.0.sender.id.clone(), p.0.sender); } Data::PartEvent(p) => { self.listing.remove(&p.0.id); } Data::NetworkEvent(p) => { if p.r#type == "partition" { self.listing.retain(|_, s| { !(s.server_id == p.server_id && s.server_era == p.server_era) }); } } Data::NickEvent(p) => { if let Some(session) = self.listing.get_mut(&p.id) { session.name = p.to; } } Data::NickReply(p) => { assert_eq!(self.session.id, p.id); self.session.name = p.to; } // The who reply is broken and can't be trusted right now, so we'll // not even look at it. _ => {} } } } #[derive(Debug, Clone)] pub enum Status { Joining(Joining), Joined(Joined), } struct State { ws_tx: SplitSink, last_id: usize, replies: Replies>, packet_tx: mpsc::UnboundedSender, last_ws_ping: Option>, last_ws_pong: Option>, last_euph_ping: Option