Make timeout configurable

This commit is contained in:
Joscha 2022-08-20 23:50:00 +02:00
parent 2579a9860d
commit 5ac16db3fc
2 changed files with 11 additions and 10 deletions

View file

@ -45,7 +45,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let start = Instant::now(); let start = Instant::now();
let (ws, _) = tokio_tungstenite::connect_async(URI).await?; let (ws, _) = tokio_tungstenite::connect_async(URI).await?;
let (tx, mut rx) = euphoxide::conn::wrap(ws); let (tx, mut rx) = euphoxide::conn::wrap(ws, Duration::from_secs(30));
while let Some(packet) = rx.recv().await { while let Some(packet) = rx.recv().await {
let data = match packet.content { let data = match packet.content {
Ok(data) => data, Ok(data) => data,

View file

@ -25,10 +25,6 @@ use crate::replies::{self, PendingReply, Replies};
pub type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>; pub type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
/// Timeout used for any kind of reply from the server, including to ws and euph
/// pings. Also used as the time in-between pings.
const TIMEOUT: Duration = Duration::from_secs(30); // TODO Make configurable
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum Error { pub enum Error {
#[error("connection closed")] #[error("connection closed")]
@ -183,6 +179,7 @@ struct State {
impl State { impl State {
async fn run( async fn run(
ws: WsStream, ws: WsStream,
timeout: Duration,
mut tx_canary: mpsc::UnboundedReceiver<Infallible>, mut tx_canary: mpsc::UnboundedReceiver<Infallible>,
rx_canary: oneshot::Receiver<Infallible>, rx_canary: oneshot::Receiver<Infallible>,
event_tx: mpsc::UnboundedSender<Event>, event_tx: mpsc::UnboundedSender<Event>,
@ -193,7 +190,7 @@ impl State {
let mut state = Self { let mut state = Self {
ws_tx, ws_tx,
last_id: 0, last_id: 0,
replies: Replies::new(TIMEOUT), replies: Replies::new(timeout),
packet_tx, packet_tx,
ws_ping_counter: 0, ws_ping_counter: 0,
last_ws_ping: None, last_ws_ping: None,
@ -207,7 +204,7 @@ impl State {
_ = tx_canary.recv() => (), _ = tx_canary.recv() => (),
_ = rx_canary => (), _ = rx_canary => (),
_ = Self::listen(&mut ws_rx, &event_tx) => (), _ = Self::listen(&mut ws_rx, &event_tx) => (),
_ = Self::send_ping_events(&event_tx) => (), _ = Self::send_ping_events(&event_tx, timeout) => (),
_ = state.handle_events(&event_tx, &mut event_rx) => (), _ = state.handle_events(&event_tx, &mut event_rx) => (),
} }
} }
@ -222,10 +219,13 @@ impl State {
Ok(()) Ok(())
} }
async fn send_ping_events(event_tx: &mpsc::UnboundedSender<Event>) -> InternalResult<()> { async fn send_ping_events(
event_tx: &mpsc::UnboundedSender<Event>,
timeout: Duration,
) -> InternalResult<()> {
loop { loop {
event_tx.send(Event::DoPings)?; event_tx.send(Event::DoPings)?;
time::sleep(TIMEOUT).await; time::sleep(timeout).await;
} }
} }
@ -460,7 +460,7 @@ impl ConnRx {
// TODO Combine ConnTx and ConnRx and implement Stream + Sink? // TODO Combine ConnTx and ConnRx and implement Stream + Sink?
pub fn wrap(ws: WsStream) -> (ConnTx, ConnRx) { pub fn wrap(ws: WsStream, timeout: Duration) -> (ConnTx, ConnRx) {
let (tx_canary_tx, tx_canary_rx) = mpsc::unbounded_channel(); let (tx_canary_tx, tx_canary_rx) = mpsc::unbounded_channel();
let (rx_canary_tx, rx_canary_rx) = oneshot::channel(); let (rx_canary_tx, rx_canary_rx) = oneshot::channel();
let (event_tx, event_rx) = mpsc::unbounded_channel(); let (event_tx, event_rx) = mpsc::unbounded_channel();
@ -468,6 +468,7 @@ pub fn wrap(ws: WsStream) -> (ConnTx, ConnRx) {
task::spawn(State::run( task::spawn(State::run(
ws, ws,
timeout,
tx_canary_rx, tx_canary_rx,
rx_canary_rx, rx_canary_rx,
event_tx.clone(), event_tx.clone(),