Implement connection in yet another way
This commit is contained in:
parent
49169a1b62
commit
03c1fe7f34
9 changed files with 573 additions and 87 deletions
|
|
@ -3,93 +3,84 @@ use std::time::Duration;
|
|||
|
||||
use futures::stream::{SplitSink, SplitStream};
|
||||
use futures::StreamExt;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tokio::{select, task, time};
|
||||
use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream};
|
||||
use tokio_tungstenite::tungstenite;
|
||||
|
||||
type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
|
||||
use super::conn::{State, Status, WsStream};
|
||||
|
||||
#[derive(Debug)]
|
||||
enum Event {
|
||||
Connected(SplitSink<WsStream, tungstenite::Message>),
|
||||
Disconnected,
|
||||
Message(tungstenite::Message),
|
||||
Ping,
|
||||
WsMessage(tungstenite::Message),
|
||||
DoPings,
|
||||
GetStatus(oneshot::Sender<Option<Status>>),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Connected {
|
||||
tx: SplitSink<WsStream, tungstenite::Message>,
|
||||
async fn run(
|
||||
canary: oneshot::Receiver<Infallible>,
|
||||
tx: mpsc::UnboundedSender<Event>,
|
||||
rx: mpsc::UnboundedReceiver<Event>,
|
||||
url: String,
|
||||
) {
|
||||
let state = State::default();
|
||||
select! {
|
||||
_ = canary => (),
|
||||
_ = respond_to_events(state, rx) => (),
|
||||
_ = maintain_connection(tx, url) => (),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum State {
|
||||
Connecting,
|
||||
Connected(Connected),
|
||||
async fn respond_to_events(
|
||||
mut state: State,
|
||||
mut rx: mpsc::UnboundedReceiver<Event>,
|
||||
) -> anyhow::Result<()> {
|
||||
while let Some(event) = rx.recv().await {
|
||||
match event {
|
||||
Event::Connected(tx) => state.on_connected(tx),
|
||||
Event::Disconnected => state.on_disconnected(),
|
||||
Event::WsMessage(msg) => state.on_ws_message(msg)?,
|
||||
Event::DoPings => state.on_do_pings()?,
|
||||
Event::GetStatus(tx) => {
|
||||
let _ = tx.send(state.status());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl State {
|
||||
async fn run(
|
||||
self,
|
||||
canary: oneshot::Receiver<Infallible>,
|
||||
tx: mpsc::UnboundedSender<Event>,
|
||||
rx: mpsc::UnboundedReceiver<Event>,
|
||||
url: String,
|
||||
) {
|
||||
async fn maintain_connection(tx: mpsc::UnboundedSender<Event>, url: String) -> anyhow::Result<()> {
|
||||
loop {
|
||||
// TODO Cookies
|
||||
let (ws, _) = tokio_tungstenite::connect_async(&url).await?;
|
||||
let (ws_tx, ws_rx) = ws.split();
|
||||
tx.send(Event::Connected(ws_tx))?;
|
||||
select! {
|
||||
_ = canary => (),
|
||||
_ = Self::maintain_connection(tx, url) => (),
|
||||
_ = self.respond_to_events(rx) => (),
|
||||
_ = receive_messages(&tx, ws_rx) => (),
|
||||
_ = prompt_pings(&tx) => ()
|
||||
}
|
||||
tx.send(Event::Disconnected)?;
|
||||
// TODO Make reconnect delay configurable
|
||||
time::sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn maintain_connection(
|
||||
tx: mpsc::UnboundedSender<Event>,
|
||||
url: String,
|
||||
) -> anyhow::Result<()> {
|
||||
loop {
|
||||
// TODO Cookies
|
||||
let (ws, _) = tokio_tungstenite::connect_async(&url).await?;
|
||||
let (ws_tx, ws_rx) = ws.split();
|
||||
tx.send(Event::Connected(ws_tx))?;
|
||||
select! {
|
||||
_ = Self::receive_messages(&tx, ws_rx) => (),
|
||||
_ = Self::prompt_pings(&tx) => ()
|
||||
}
|
||||
tx.send(Event::Disconnected)?;
|
||||
// TODO Make reconnect delay configurable
|
||||
time::sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
async fn receive_messages(
|
||||
tx: &mpsc::UnboundedSender<Event>,
|
||||
mut rx: SplitStream<WsStream>,
|
||||
) -> anyhow::Result<()> {
|
||||
while let Some(msg) = rx.next().await {
|
||||
tx.send(Event::WsMessage(msg?))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn receive_messages(
|
||||
tx: &mpsc::UnboundedSender<Event>,
|
||||
mut rx: SplitStream<WsStream>,
|
||||
) -> anyhow::Result<()> {
|
||||
while let Some(msg) = rx.next().await {
|
||||
tx.send(Event::Message(msg?))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn prompt_pings(tx: &mpsc::UnboundedSender<Event>) -> anyhow::Result<()> {
|
||||
loop {
|
||||
// TODO Make ping delay configurable
|
||||
time::sleep(Duration::from_secs(10)).await;
|
||||
tx.send(Event::Ping)?;
|
||||
}
|
||||
}
|
||||
|
||||
async fn respond_to_events(mut self, mut rx: mpsc::UnboundedReceiver<Event>) {
|
||||
while let Some(event) = rx.recv().await {
|
||||
match event {
|
||||
Event::Connected(tx) => self = State::Connected(Connected { tx }),
|
||||
Event::Disconnected => self = State::Connecting,
|
||||
Event::Message(_) => todo!(),
|
||||
Event::Ping => todo!(),
|
||||
}
|
||||
}
|
||||
async fn prompt_pings(tx: &mpsc::UnboundedSender<Event>) -> anyhow::Result<()> {
|
||||
loop {
|
||||
// TODO Make ping delay configurable
|
||||
time::sleep(Duration::from_secs(10)).await;
|
||||
tx.send(Event::DoPings)?;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -103,11 +94,17 @@ impl Room {
|
|||
let (event_tx, event_rx) = mpsc::unbounded_channel();
|
||||
let (canary_tx, canary_rx) = oneshot::channel();
|
||||
|
||||
task::spawn(State::Connecting.run(canary_rx, event_tx.clone(), event_rx, url));
|
||||
task::spawn(run(canary_rx, event_tx.clone(), event_rx, url));
|
||||
|
||||
Self {
|
||||
canary: canary_tx,
|
||||
tx: event_tx,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn status(&self) -> anyhow::Result<Option<Status>> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.tx.send(Event::GetStatus(tx))?;
|
||||
Ok(rx.await?)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue