diff --git a/examples/testbot.rs b/examples/testbot.rs index 27cb7c2..b2432b9 100644 --- a/examples/testbot.rs +++ b/examples/testbot.rs @@ -45,7 +45,7 @@ async fn main() -> Result<(), Box> { let start = Instant::now(); let (ws, _) = tokio_tungstenite::connect_async(URI).await?; - let (tx, mut rx) = euphoxide::conn::wrap(ws); + let (tx, mut rx) = euphoxide::conn::wrap(ws, Duration::from_secs(30)); while let Some(packet) = rx.recv().await { let data = match packet.content { Ok(data) => data, diff --git a/src/conn.rs b/src/conn.rs index 2eb13b3..5824232 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -25,10 +25,6 @@ use crate::replies::{self, PendingReply, Replies}; pub type WsStream = WebSocketStream>; -/// Timeout used for any kind of reply from the server, including to ws and euph -/// pings. Also used as the time in-between pings. -const TIMEOUT: Duration = Duration::from_secs(30); // TODO Make configurable - #[derive(Debug, thiserror::Error)] pub enum Error { #[error("connection closed")] @@ -183,6 +179,7 @@ struct State { impl State { async fn run( ws: WsStream, + timeout: Duration, mut tx_canary: mpsc::UnboundedReceiver, rx_canary: oneshot::Receiver, event_tx: mpsc::UnboundedSender, @@ -193,7 +190,7 @@ impl State { let mut state = Self { ws_tx, last_id: 0, - replies: Replies::new(TIMEOUT), + replies: Replies::new(timeout), packet_tx, ws_ping_counter: 0, last_ws_ping: None, @@ -207,7 +204,7 @@ impl State { _ = tx_canary.recv() => (), _ = rx_canary => (), _ = Self::listen(&mut ws_rx, &event_tx) => (), - _ = Self::send_ping_events(&event_tx) => (), + _ = Self::send_ping_events(&event_tx, timeout) => (), _ = state.handle_events(&event_tx, &mut event_rx) => (), } } @@ -222,10 +219,13 @@ impl State { Ok(()) } - async fn send_ping_events(event_tx: &mpsc::UnboundedSender) -> InternalResult<()> { + async fn send_ping_events( + event_tx: &mpsc::UnboundedSender, + timeout: Duration, + ) -> InternalResult<()> { loop { event_tx.send(Event::DoPings)?; - time::sleep(TIMEOUT).await; + time::sleep(timeout).await; } } @@ -460,7 +460,7 @@ impl ConnRx { // TODO Combine ConnTx and ConnRx and implement Stream + Sink? -pub fn wrap(ws: WsStream) -> (ConnTx, ConnRx) { +pub fn wrap(ws: WsStream, timeout: Duration) -> (ConnTx, ConnRx) { let (tx_canary_tx, tx_canary_rx) = mpsc::unbounded_channel(); let (rx_canary_tx, rx_canary_rx) = oneshot::channel(); let (event_tx, event_rx) = mpsc::unbounded_channel(); @@ -468,6 +468,7 @@ pub fn wrap(ws: WsStream) -> (ConnTx, ConnRx) { task::spawn(State::run( ws, + timeout, tx_canary_rx, rx_canary_rx, event_tx.clone(),