From 6916977a475d616f10e50c9910c7e5d641e0f8d1 Mon Sep 17 00:00:00 2001 From: Joscha Date: Fri, 20 Jan 2023 17:12:52 +0100 Subject: [PATCH] Rewrite conn module --- CHANGELOG.md | 7 +- Cargo.toml | 13 +- examples/testbot.rs | 24 +- src/conn.rs | 613 +++++++++++++++++++++----------------------- src/lib.rs | 1 - src/replies.rs | 5 + 6 files changed, 328 insertions(+), 335 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5cc4529..92fa409 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,16 +14,19 @@ Procedure when bumping the version number: ## Unreleased ### Added -- `Status` conversion utility methods +- `State` conversion utility methods - `Time::new` constructor +### Changed +- Rewrite `conn` module (backwards-imcompatible) + ## v0.2.0 - 2022-12-10 ### Added - `euphoxide::connect` ### Changed -- Updated dependencies in backwards-incompatible way +- Updated dependencies (backwards-incompatible) ## v0.1.0 - 2022-10-23 diff --git a/Cargo.toml b/Cargo.toml index 574d6d8..ec2b1a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,19 +4,14 @@ version = "0.2.0" edition = "2021" [dependencies] +futures-util = { version = "0.3.25", default-features = false, features = ["sink"] } serde = { version = "1.0.149", features = ["derive"] } serde_json = "1.0.89" time = { version = "0.3.17", features = ["serde"] } tokio = { version = "1.23.0", features = ["time", "sync", "macros", "rt"] } - -[dependencies.futures] -version = "0.3.25" -default-features = false -features = ["std"] - -[dependencies.tokio-tungstenite] -version = "0.18.0" -features = ["rustls-tls-native-roots"] +tokio-stream = "0.1.11" +tokio-tungstenite = "0.18.0" [dev-dependencies] # For example bot tokio = { version = "1.23.0", features = ["rt-multi-thread"] } +tokio-tungstenite = { version = "0.18.0", features = ["rustls-tls-native-roots"] } diff --git a/examples/testbot.rs b/examples/testbot.rs index 3e4f19a..f99d6f8 100644 --- a/examples/testbot.rs +++ b/examples/testbot.rs @@ -2,8 +2,11 @@ use std::error::Error; use std::time::{Duration, Instant}; use euphoxide::api::{Data, Nick, Send}; +use euphoxide::conn::Conn; -const URI: &str = "wss://euphoria.io/room/test/ws"; +const TIMEOUT: Duration = Duration::from_secs(10); +const DOMAIN: &str = "euphoria.io"; +const ROOM: &str = "test"; const NICK: &str = "TestBot"; const HELP: &str = "I'm an example bot for https://github.com/Garmelon/euphoxide"; @@ -44,9 +47,9 @@ fn format_delta(delta: Duration) -> String { async fn main() -> Result<(), Box> { let start = Instant::now(); - let (ws, _) = tokio_tungstenite::connect_async(URI).await?; - let (tx, mut rx) = euphoxide::conn::wrap(ws, Duration::from_secs(30)); - while let Some(packet) = rx.recv().await { + let (mut conn, _) = Conn::connect(DOMAIN, ROOM, false, None, TIMEOUT).await?; + + while let Ok(packet) = conn.recv().await { let data = match packet.content { Ok(data) => data, Err(err) => { @@ -64,11 +67,15 @@ async fn main() -> Result<(), Box> { // Here, a new task is spawned so the main event loop can // continue running immediately instead of waiting for a reply // from the server. - let tx_clone = tx.clone(); + // + // We only need to do this because we want to log the result of + // the nick command. Otherwise, we could've just called + // tx.send() synchronously and ignored the returned Future. + let tx = conn.tx().clone(); tokio::spawn(async move { // Awaiting the future returned by the send command lets you // (type-safely) access the server's reply. - let reply = tx_clone + let reply = tx .send(Nick { name: NICK.to_string(), }) @@ -118,7 +125,8 @@ async fn main() -> Result<(), Box> { // would be a race between sending the message and closing // the connection as the send function can return before the // message has actually been sent. - let _ = tx + let _ = conn + .tx() .send(Send { content: "/me dies".to_string(), parent: Some(event.0.id), @@ -131,7 +139,7 @@ async fn main() -> Result<(), Box> { // If you are not interested in the result, you can just // throw away the future returned by the send function. println!("Sending reply..."); - let _ = tx.send(Send { + let _ = conn.tx().send(Send { content: reply, parent: Some(event.0.id), }); diff --git a/src/conn.rs b/src/conn.rs index 7ee0fd0..962ff35 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -1,24 +1,22 @@ //! Connection state modeling. -// TODO Catch errors differently when sending into mpsc/oneshot - use std::collections::HashMap; use std::convert::Infallible; use std::future::Future; -use std::time::Duration; -use std::{error, fmt}; +use std::time::{Duration, Instant}; +use std::{error, fmt, result}; -use futures::channel::oneshot; -use futures::stream::{SplitSink, SplitStream}; -use futures::{SinkExt, StreamExt}; +use ::time::OffsetDateTime; +use futures_util::SinkExt; use tokio::net::TcpStream; -use tokio::sync::mpsc; -use tokio::{select, task, time}; +use tokio::sync::{mpsc, oneshot}; +use tokio::{select, time}; +use tokio_stream::StreamExt; use tokio_tungstenite::tungstenite::client::IntoClientRequest; use tokio_tungstenite::tungstenite::http::{header, HeaderValue}; use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream}; -use crate::api::packet::{Command, Packet, ParsedPacket}; +use crate::api::packet::{Command, ParsedPacket}; use crate::api::{ BounceEvent, Data, HelloEvent, LoginReply, NickEvent, PersonalAccountView, Ping, PingReply, SessionId, SessionView, SnapshotEvent, Time, UserId, @@ -29,45 +27,47 @@ pub type WsStream = WebSocketStream>; #[derive(Debug)] pub enum Error { + /// The connection is now closed. ConnectionClosed, - TimedOut, - IncorrectReplyType, + /// The server didn't reply to one of our commands in time. + CommandTimedOut, + /// The server did something that violated the api specification. + ProtocolViolation(&'static str), + /// An error returned by the euphoria server. Euph(String), + + Tungstenite(tungstenite::Error), + SerdeJson(serde_json::Error), } impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::ConnectionClosed => write!(f, "connection closed"), - Self::TimedOut => write!(f, "packet timed out"), - Self::IncorrectReplyType => write!(f, "incorrect reply type"), - Self::Euph(error_msg) => write!(f, "{error_msg}"), + Self::CommandTimedOut => write!(f, "server did not reply to command in time"), + Self::ProtocolViolation(msg) => write!(f, "{msg}"), + Self::Euph(msg) => write!(f, "{msg}"), + Self::Tungstenite(err) => write!(f, "{err}"), + Self::SerdeJson(err) => write!(f, "{err}"), } } } +impl From for Error { + fn from(err: tungstenite::Error) -> Self { + Self::Tungstenite(err) + } +} + +impl From for Error { + fn from(err: serde_json::Error) -> Self { + Self::SerdeJson(err) + } +} + impl error::Error for Error {} -type InternalResult = Result>; - -#[derive(Debug)] -enum Event { - Message(tungstenite::Message), - SendCmd(Data, oneshot::Sender>), - SendRpl(Option, Data), - Status(oneshot::Sender), - DoPings, -} - -impl Event { - fn send_cmd>(cmd: C, rpl: oneshot::Sender>) -> Self { - Self::SendCmd(cmd.into(), rpl) - } - - fn send_rpl>(id: Option, rpl: C) -> Self { - Self::SendRpl(id, rpl.into()) - } -} +type Result = result::Result; #[derive(Debug, Clone, Default)] pub struct Joining { @@ -77,18 +77,19 @@ pub struct Joining { } impl Joining { - fn on_data(&mut self, data: &Data) -> InternalResult<()> { + fn on_data(&mut self, data: &Data) -> Result<()> { match data { Data::BounceEvent(p) => self.bounce = Some(p.clone()), Data::HelloEvent(p) => self.hello = Some(p.clone()), Data::SnapshotEvent(p) => self.snapshot = Some(p.clone()), + // TODO Check and maybe expand list of unexpected packet types Data::JoinEvent(_) | Data::NetworkEvent(_) | Data::NickEvent(_) | Data::EditMessageEvent(_) | Data::PartEvent(_) | Data::PmInitiateEvent(_) - | Data::SendEvent(_) => return Err("unexpected packet type".into()), + | Data::SendEvent(_) => return Err(Error::ProtocolViolation("unexpected packet type")), _ => {} } Ok(()) @@ -211,12 +212,12 @@ impl Joined { #[derive(Debug, Clone)] #[allow(clippy::large_enum_variant)] -pub enum Status { +pub enum State { Joining(Joining), Joined(Joined), } -impl Status { +impl State { pub fn into_joining(self) -> Option { match self { Self::Joining(joining) => Some(joining), @@ -246,178 +247,271 @@ impl Status { } } -struct State { - ws_tx: SplitSink, +#[allow(clippy::large_enum_variant)] +enum ConnCommand { + SendCmd(Data, oneshot::Sender>), + GetState(oneshot::Sender), +} + +#[derive(Debug, Clone)] +pub struct ConnTx { + cmd_tx: mpsc::UnboundedSender, +} + +impl ConnTx { + /// The async part of sending a command. + /// + /// This is split into a separate function so that [`Self::send`] can be + /// fully synchronous (you can safely throw away the returned future) while + /// still guaranteeing that the packet was sent. + async fn finish_send(rx: oneshot::Receiver>) -> Result + where + C: Command, + C::Reply: TryFrom, + { + let pending_reply = rx + .await + // This should only happen if something goes wrong during encoding + // of the packet or while sending it through the websocket. Assuming + // the first doesn't happen, the connection is probably closed. + .map_err(|_| Error::ConnectionClosed)?; + + let data = pending_reply + .get() + .await + .map_err(|e| match e { + replies::Error::TimedOut => Error::CommandTimedOut, + replies::Error::Canceled => Error::ConnectionClosed, + })? + .content + .map_err(Error::Euph)?; + + data.try_into() + .map_err(|_| Error::ProtocolViolation("incorrect command reply type")) + } + + /// Send a command to the server. + /// + /// Returns a future containing the server's reply. This future does not + /// have to be awaited and can be safely ignored if you are not interested + /// in the reply. + /// + /// This function may return before the command was sent. To ensure that it + /// was sent before doing something else, await the returned future first. + /// + /// When called multiple times, this function guarantees that the commands + /// are sent in the order that the function is called. + pub fn send(&self, cmd: C) -> impl Future> + where + C: Command + Into, + C::Reply: TryFrom, + { + let (tx, rx) = oneshot::channel(); + let _ = self.cmd_tx.send(ConnCommand::SendCmd(cmd.into(), tx)); + Self::finish_send::(rx) + } + + pub async fn state(&self) -> Result { + let (tx, rx) = oneshot::channel(); + self.cmd_tx + .send(ConnCommand::GetState(tx)) + .map_err(|_| Error::ConnectionClosed)?; + rx.await.map_err(|_| Error::ConnectionClosed) + } +} + +#[derive(Debug)] +pub struct Conn { + ws: WsStream, last_id: usize, replies: Replies, - packet_tx: mpsc::UnboundedSender, + conn_tx: ConnTx, + cmd_rx: mpsc::UnboundedReceiver, - // The server may send a pong frame with arbitrary payload unprompted at any - // time (see RFC 6455 5.5.3). Because of this, we can't just remember the - // last pong payload. - ws_ping_counter: u64, - last_ws_ping: Option>, + // The websocket server may send a pong frame with arbitrary payload + // unprompted at any time (see RFC 6455 5.5.3). Because of this, we can't + // just remember the last pong payload. + last_ping: Instant, + last_ws_ping_payload: Option>, last_ws_ping_replied_to: bool, + last_euph_ping_payload: Option