From 49b37649e32fe4b95532fb0aa9eaf89189283606 Mon Sep 17 00:00:00 2001 From: Joscha Date: Sat, 20 Aug 2022 20:18:43 +0200 Subject: [PATCH] Return entire packet in ConnTx::recv --- examples/testbot.rs | 10 ++++++++-- src/conn.rs | 34 ++++++++++++++-------------------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/examples/testbot.rs b/examples/testbot.rs index 02e8f62..27cb7c2 100644 --- a/examples/testbot.rs +++ b/examples/testbot.rs @@ -46,8 +46,14 @@ async fn main() -> Result<(), Box> { let (ws, _) = tokio_tungstenite::connect_async(URI).await?; let (tx, mut rx) = euphoxide::conn::wrap(ws); - loop { - let data = rx.recv().await?; + while let Some(packet) = rx.recv().await { + let data = match packet.content { + Ok(data) => data, + Err(err) => { + println!("Error for {}: {err}", packet.r#type); + continue; + } + }; match data { Data::HelloEvent(event) => println!("Connected with id {}", event.session.id), Data::SnapshotEvent(event) => { diff --git a/src/conn.rs b/src/conn.rs index 1c33dc1..14444af 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -46,17 +46,14 @@ type InternalResult = Result>; #[derive(Debug)] enum Event { Message(tungstenite::Message), - SendCmd(Data, oneshot::Sender>>), + SendCmd(Data, oneshot::Sender>), SendRpl(Option, Data), Status(oneshot::Sender), DoPings, } impl Event { - fn send_cmd>( - cmd: C, - rpl: oneshot::Sender>>, - ) -> Self { + fn send_cmd>(cmd: C, rpl: oneshot::Sender>) -> Self { Self::SendCmd(cmd.into(), rpl) } @@ -166,9 +163,9 @@ pub enum Status { struct State { ws_tx: SplitSink, last_id: usize, - replies: Replies>, + replies: Replies, - packet_tx: mpsc::UnboundedSender, + packet_tx: mpsc::UnboundedSender, // The server may send a pong frame with arbitrary payload unprompted at any // time (see RFC 6455 5.5.3). Because of this, we can't just remember the @@ -190,7 +187,7 @@ impl State { rx_canary: oneshot::Receiver, event_tx: mpsc::UnboundedSender, mut event_rx: mpsc::UnboundedReceiver, - packet_tx: mpsc::UnboundedSender, + packet_tx: mpsc::UnboundedSender, ) { let (ws_tx, mut ws_rx) = ws.split(); let mut state = Self { @@ -279,7 +276,7 @@ impl State { // Complete pending replies if the packet has an id if let Some(id) = &packet.id { - self.replies.complete(id, packet.content.clone()); + self.replies.complete(id, packet.clone()); } // Play a game of table tennis @@ -306,12 +303,8 @@ impl State { } } - // Shovel events and successful replies into self.packet_tx. Assumes - // that no even ever errors and that erroring replies are not - // interesting. - if let Ok(data) = packet.content { - self.packet_tx.send(data)?; - } + // Shovel packets into self.packet_tx + self.packet_tx.send(packet)?; Ok(()) } @@ -319,7 +312,7 @@ impl State { async fn on_send_cmd( &mut self, data: Data, - reply_tx: oneshot::Sender>>, + reply_tx: oneshot::Sender>, ) -> InternalResult<()> { // Overkill of universe-heat-death-like proportions self.last_id = self.last_id.wrapping_add(1); @@ -399,7 +392,7 @@ pub struct ConnTx { impl ConnTx { async fn finish_send( - rx: oneshot::Receiver>>, + rx: oneshot::Receiver>, ) -> Result where C: Command, @@ -418,6 +411,7 @@ impl ConnTx { replies::Error::TimedOut => Error::TimedOut, replies::Error::Canceled => Error::ConnectionClosed, })? + .content .map_err(Error::Euph)?; data.try_into().map_err(|_| Error::IncorrectReplyType) } @@ -456,12 +450,12 @@ impl ConnTx { pub struct ConnRx { #[allow(dead_code)] canary: oneshot::Sender, - packet_rx: mpsc::UnboundedReceiver, + packet_rx: mpsc::UnboundedReceiver, } impl ConnRx { - pub async fn recv(&mut self) -> Result { - self.packet_rx.recv().await.ok_or(Error::ConnectionClosed) + pub async fn recv(&mut self) -> Option { + self.packet_rx.recv().await } }