From 28c1ce243e66f4a2fbaf91d9d8006731e01460d2 Mon Sep 17 00:00:00 2001 From: Joscha Date: Fri, 6 Dec 2024 15:50:10 +0100 Subject: [PATCH] Add client connection --- src/client.rs | 3 +- src/client/conn.rs | 265 +++++++++++++++++++++++++++++++++++++++++++++ src/error.rs | 23 ++++ 3 files changed, 289 insertions(+), 2 deletions(-) create mode 100644 src/client/conn.rs diff --git a/src/client.rs b/src/client.rs index ed10083..c59b1b9 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,3 +1,2 @@ -//! Connecting to a server as a client. - +pub mod conn; pub mod state; diff --git a/src/client/conn.rs b/src/client/conn.rs new file mode 100644 index 0000000..a0fbf1e --- /dev/null +++ b/src/client/conn.rs @@ -0,0 +1,265 @@ +use std::{future::Future, time::Duration}; + +use log::debug; +use tokio::{ + select, + sync::{mpsc, oneshot}, +}; +use tokio_tungstenite::tungstenite::{ + client::IntoClientRequest, + http::{header, HeaderValue}, +}; + +use crate::{ + api::{Command, Data, ParsedPacket}, + conn::{Conn, ConnConfig, Side}, + error::{Error, Result}, + replies::{self, PendingReply, Replies}, +}; + +use super::state::State; + +enum ConnCommand { + SendCmd(Data, oneshot::Sender>>), + GetState(oneshot::Sender), +} + +#[derive(Debug, Clone)] +pub struct ClientConnConfig { + pub domain: String, + pub human: bool, + pub channel_bufsize: usize, + pub connect_timeout: Duration, + pub command_timeout: Duration, + pub ping_interval: Duration, +} + +impl Default for ClientConnConfig { + fn default() -> Self { + Self { + domain: "euphoria.leet.nu".to_string(), + human: false, + channel_bufsize: 10, + connect_timeout: Duration::from_secs(10), + command_timeout: Duration::from_secs(30), + ping_interval: Duration::from_secs(30), + } + } +} + +pub struct ClientConn { + rx: mpsc::Receiver, + tx: mpsc::Sender, + + conn: Conn, + state: State, + + last_id: usize, + replies: Replies, +} + +impl ClientConn { + pub fn state(&self) -> &State { + &self.state + } + + pub fn handle(&self) -> ClientConnHandle { + ClientConnHandle { + tx: self.tx.clone(), + } + } + + pub async fn close(&mut self) -> Result<()> { + self.conn.close().await + } + + pub async fn recv(&mut self) -> Result> { + loop { + self.replies.purge(); + + // There's always at least one tx end (self.tx), so self.rx.recv() + // should never return None. + let packet = select! { + packet = self.conn.recv() => packet?, + Some(cmd) = self.rx.recv() => { + self.on_cmd(cmd).await; + continue; + }, + }; + + if let Some(packet) = &packet { + self.on_packet(packet); + } + + break Ok(packet); + } + } + + pub async fn send(&mut self, data: impl Into) -> Result { + // Overkill of universe-heat-death-like proportions + self.last_id = self.last_id.wrapping_add(1); + let id = self.last_id.to_string(); + + self.conn + .send(ParsedPacket::from_data(Some(id.clone()), data.into())) + .await?; + + Ok(id) + } + + fn on_packet(&mut self, packet: &ParsedPacket) { + if let Ok(data) = &packet.content { + self.state.on_data(data); + } + + if let Some(id) = &packet.id { + let id = id.clone(); + self.replies.complete(&id, packet.clone()); + } + } + + async fn on_cmd(&mut self, cmd: ConnCommand) { + match cmd { + ConnCommand::SendCmd(data, sender) => { + let result = self.send(data).await.map(|id| self.replies.wait_for(id)); + let _ = sender.send(result); + } + ConnCommand::GetState(sender) => { + let _ = sender.send(self.state.clone()); + } + } + } + + pub async fn connect( + room: &str, + cookies: Option, + ) -> Result<(Self, Vec)> { + Self::connect_with_config(room, cookies, &ClientConnConfig::default()).await + } + + pub async fn connect_with_config( + room: &str, + cookies: Option, + config: &ClientConnConfig, + ) -> Result<(Self, Vec)> { + // Prepare URL + let human = if config.human { "?h=1" } else { "" }; + let uri = format!("wss://{}/room/{room}/ws{human}", config.domain); + debug!("Connecting to {uri} with cookies: {cookies:?}"); + + // Prepare request + let mut request = uri.into_client_request().expect("valid request"); + if let Some(cookies) = cookies { + request.headers_mut().append(header::COOKIE, cookies); + } + + // Connect to server + let (ws, response) = tokio::time::timeout( + config.connect_timeout, + tokio_tungstenite::connect_async(request), + ) + .await + .map_err(|_| Error::ConnectionTimeout)??; + + // Extract response cookies + let (mut parts, _) = response.into_parts(); + let cookies_set = match parts.headers.entry(header::SET_COOKIE) { + header::Entry::Occupied(entry) => entry.remove_entry_mult().1.collect(), + header::Entry::Vacant(_) => vec![], + }; + debug!("Received cookies {cookies_set:?}"); + + // Prepare EuphConn + let conn_config = ConnConfig { + ping_interval: config.ping_interval, + }; + let conn = Conn::wrap_with_config(ws, Side::Client, conn_config); + + // Prepare client + let (tx, rx) = mpsc::channel(config.channel_bufsize); + let client = Self { + rx, + tx, + conn, + state: State::new(), + last_id: 0, + replies: Replies::new(config.command_timeout), + }; + + Ok((client, cookies_set)) + } +} + +#[derive(Debug, Clone)] +pub struct ClientConnHandle { + tx: mpsc::Sender, +} + +impl ClientConnHandle { + /// Send a command to the server. + /// + /// When awaited, returns either an error if something went wrong while + /// sending the command, or a second future with the server's reply (the + /// *reply future*). + /// + /// When awaited, the *reply future* returns either an error if something + /// was wrong with the reply, or the data returned by the server. The *reply + /// future* can be safely ignored and doesn't have to be awaited. + pub async fn send(&self, cmd: C) -> Result>> + where + C: Command + Into, + C::Reply: TryFrom, + { + let (tx, rx) = oneshot::channel(); + + self.tx + .send(ConnCommand::SendCmd(cmd.into(), tx)) + .await + .map_err(|_| Error::ConnectionClosed)?; + + Ok(async { + let data = rx + .await + .map_err(|_| Error::ConnectionClosed)?? + .get() + .await + .map_err(|err| match err { + replies::Error::TimedOut => Error::CommandTimeout, + replies::Error::Canceled => Error::ConnectionClosed, + })? + .content + .map_err(Error::Euph)?; + + let ptype = data.packet_type(); + data.try_into() + .map_err(|_| Error::ReceivedUnexpectedPacket(ptype)) + }) + } + + /// Send a command to the server without waiting for a reply. + /// + /// This method is equivalent to calling and awaiting [`Self::send`] but + /// ignoring the *reply future*. The reason it exists is that clippy gets + /// really annoying when you try to ignore a future (which is usually the + /// right call). + pub async fn send_only(&self, cmd: C) -> Result<()> + where + C: Command + Into, + C::Reply: TryFrom, + { + let _ignore = self.send(cmd).await?; + Ok(()) + } + + /// Retrieve the current connection [`State`]. + pub async fn state(&self) -> Result { + let (tx, rx) = oneshot::channel(); + + self.tx + .send(ConnCommand::GetState(tx)) + .await + .map_err(|_| Error::ConnectionClosed)?; + + rx.await.map_err(|_| Error::ConnectionClosed) + } +} diff --git a/src/error.rs b/src/error.rs index 130160d..9e3b412 100644 --- a/src/error.rs +++ b/src/error.rs @@ -29,6 +29,26 @@ pub enum Error { /// A tungstenite error. Tungstenite(tungstenite::Error), + + /// A timeout occurred while opening a connection. + /// + /// This is a higher-level error that only occurs in the + /// [`ClientConn`](crate::client::conn::ClientConn). + ConnectionTimeout, + + /// The server did not reply to a command in time. + /// + /// This is a higher-level error that only occurs with + /// [`Command`](crate::api::Command)-based APIs in + /// [`ClientConn`](crate::client::conn::ClientConn). + CommandTimeout, + + /// The server replied with an error string. + /// + /// This is a higher-level error that only occurs with + /// [`Command`](crate::api::Command)-based APIs in + /// [`ClientConn`](crate::client::conn::ClientConn). + Euph(String), } impl fmt::Display for Error { @@ -43,6 +63,9 @@ impl fmt::Display for Error { write!(f, "received packet of unexpected type: {ptype}") } Self::Tungstenite(err) => write!(f, "{err}"), + Self::ConnectionTimeout => write!(f, "connection timed out while connecting"), + Self::CommandTimeout => write!(f, "command timed out"), + Self::Euph(msg) => write!(f, "{msg}"), } } }