Return entire packet in ConnTx::recv

This commit is contained in:
Joscha 2022-08-20 20:18:43 +02:00
parent 741d6c8548
commit 49b37649e3
2 changed files with 22 additions and 22 deletions

View file

@ -46,8 +46,14 @@ async fn main() -> Result<(), Box<dyn Error>> {
let (ws, _) = tokio_tungstenite::connect_async(URI).await?; let (ws, _) = tokio_tungstenite::connect_async(URI).await?;
let (tx, mut rx) = euphoxide::conn::wrap(ws); let (tx, mut rx) = euphoxide::conn::wrap(ws);
loop { while let Some(packet) = rx.recv().await {
let data = rx.recv().await?; let data = match packet.content {
Ok(data) => data,
Err(err) => {
println!("Error for {}: {err}", packet.r#type);
continue;
}
};
match data { match data {
Data::HelloEvent(event) => println!("Connected with id {}", event.session.id), Data::HelloEvent(event) => println!("Connected with id {}", event.session.id),
Data::SnapshotEvent(event) => { Data::SnapshotEvent(event) => {

View file

@ -46,17 +46,14 @@ type InternalResult<T> = Result<T, Box<dyn error::Error>>;
#[derive(Debug)] #[derive(Debug)]
enum Event { enum Event {
Message(tungstenite::Message), Message(tungstenite::Message),
SendCmd(Data, oneshot::Sender<PendingReply<Result<Data, String>>>), SendCmd(Data, oneshot::Sender<PendingReply<ParsedPacket>>),
SendRpl(Option<String>, Data), SendRpl(Option<String>, Data),
Status(oneshot::Sender<Status>), Status(oneshot::Sender<Status>),
DoPings, DoPings,
} }
impl Event { impl Event {
fn send_cmd<C: Into<Data>>( fn send_cmd<C: Into<Data>>(cmd: C, rpl: oneshot::Sender<PendingReply<ParsedPacket>>) -> Self {
cmd: C,
rpl: oneshot::Sender<PendingReply<Result<Data, String>>>,
) -> Self {
Self::SendCmd(cmd.into(), rpl) Self::SendCmd(cmd.into(), rpl)
} }
@ -166,9 +163,9 @@ pub enum Status {
struct State { struct State {
ws_tx: SplitSink<WsStream, tungstenite::Message>, ws_tx: SplitSink<WsStream, tungstenite::Message>,
last_id: usize, last_id: usize,
replies: Replies<String, Result<Data, String>>, replies: Replies<String, ParsedPacket>,
packet_tx: mpsc::UnboundedSender<Data>, packet_tx: mpsc::UnboundedSender<ParsedPacket>,
// The server may send a pong frame with arbitrary payload unprompted at any // The server may send a pong frame with arbitrary payload unprompted at any
// time (see RFC 6455 5.5.3). Because of this, we can't just remember the // time (see RFC 6455 5.5.3). Because of this, we can't just remember the
@ -190,7 +187,7 @@ impl State {
rx_canary: oneshot::Receiver<Infallible>, rx_canary: oneshot::Receiver<Infallible>,
event_tx: mpsc::UnboundedSender<Event>, event_tx: mpsc::UnboundedSender<Event>,
mut event_rx: mpsc::UnboundedReceiver<Event>, mut event_rx: mpsc::UnboundedReceiver<Event>,
packet_tx: mpsc::UnboundedSender<Data>, packet_tx: mpsc::UnboundedSender<ParsedPacket>,
) { ) {
let (ws_tx, mut ws_rx) = ws.split(); let (ws_tx, mut ws_rx) = ws.split();
let mut state = Self { let mut state = Self {
@ -279,7 +276,7 @@ impl State {
// Complete pending replies if the packet has an id // Complete pending replies if the packet has an id
if let Some(id) = &packet.id { if let Some(id) = &packet.id {
self.replies.complete(id, packet.content.clone()); self.replies.complete(id, packet.clone());
} }
// Play a game of table tennis // Play a game of table tennis
@ -306,12 +303,8 @@ impl State {
} }
} }
// Shovel events and successful replies into self.packet_tx. Assumes // Shovel packets into self.packet_tx
// that no even ever errors and that erroring replies are not self.packet_tx.send(packet)?;
// interesting.
if let Ok(data) = packet.content {
self.packet_tx.send(data)?;
}
Ok(()) Ok(())
} }
@ -319,7 +312,7 @@ impl State {
async fn on_send_cmd( async fn on_send_cmd(
&mut self, &mut self,
data: Data, data: Data,
reply_tx: oneshot::Sender<PendingReply<Result<Data, String>>>, reply_tx: oneshot::Sender<PendingReply<ParsedPacket>>,
) -> InternalResult<()> { ) -> InternalResult<()> {
// Overkill of universe-heat-death-like proportions // Overkill of universe-heat-death-like proportions
self.last_id = self.last_id.wrapping_add(1); self.last_id = self.last_id.wrapping_add(1);
@ -399,7 +392,7 @@ pub struct ConnTx {
impl ConnTx { impl ConnTx {
async fn finish_send<C>( async fn finish_send<C>(
rx: oneshot::Receiver<PendingReply<Result<Data, String>>>, rx: oneshot::Receiver<PendingReply<ParsedPacket>>,
) -> Result<C::Reply, Error> ) -> Result<C::Reply, Error>
where where
C: Command, C: Command,
@ -418,6 +411,7 @@ impl ConnTx {
replies::Error::TimedOut => Error::TimedOut, replies::Error::TimedOut => Error::TimedOut,
replies::Error::Canceled => Error::ConnectionClosed, replies::Error::Canceled => Error::ConnectionClosed,
})? })?
.content
.map_err(Error::Euph)?; .map_err(Error::Euph)?;
data.try_into().map_err(|_| Error::IncorrectReplyType) data.try_into().map_err(|_| Error::IncorrectReplyType)
} }
@ -456,12 +450,12 @@ impl ConnTx {
pub struct ConnRx { pub struct ConnRx {
#[allow(dead_code)] #[allow(dead_code)]
canary: oneshot::Sender<Infallible>, canary: oneshot::Sender<Infallible>,
packet_rx: mpsc::UnboundedReceiver<Data>, packet_rx: mpsc::UnboundedReceiver<ParsedPacket>,
} }
impl ConnRx { impl ConnRx {
pub async fn recv(&mut self) -> Result<Data, Error> { pub async fn recv(&mut self) -> Option<ParsedPacket> {
self.packet_rx.recv().await.ok_or(Error::ConnectionClosed) self.packet_rx.recv().await
} }
} }