Start rewriting room

This commit is contained in:
Joscha 2022-02-28 22:45:01 +01:00
parent 3cd027cd15
commit e5eefd8ada

View file

@ -2,13 +2,15 @@ use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use anyhow::bail;
use cove_core::conn::{self, ConnMaintenance, ConnRx, ConnTx}; use cove_core::conn::{self, ConnMaintenance, ConnRx, ConnTx};
use cove_core::packets::{ use cove_core::packets::{
Cmd, IdentifyCmd, IdentifyRpl, NickRpl, Ntf, Packet, RoomRpl, Rpl, SendRpl, WhoRpl, Cmd, IdentifyCmd, IdentifyRpl, NickRpl, Ntf, Packet, RoomCmd, RoomRpl, Rpl, SendRpl, WhoRpl,
}; };
use cove_core::{Session, SessionId}; use cove_core::{Session, SessionId};
use tokio::sync::oneshot::{self, Sender}; use tokio::sync::oneshot::{self, Sender};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio_tungstenite::connect_async;
use tui::widgets::StatefulWidget; use tui::widgets::StatefulWidget;
use crate::config::Config; use crate::config::Config;
@ -29,6 +31,34 @@ pub enum Error {
Replies(#[from] replies::Error), Replies(#[from] replies::Error),
} }
pub enum StopReason {
CouldNotConnect(conn::Error),
InvalidRoom(String),
InvalidIdentity(String),
/// Something went wrong but we don't know what.
SomethingWentWrong,
}
/// General state of the room connection.
pub enum Status {
/// Connecting to the room for the first time.
Connecting,
/// Reconnecting to the room after being connected at least once.
Reconnecting,
/// Identifying with the server after a connection has been established.
/// This occurs when an initial nick has been set on room creation.
Identifying,
/// User must enter a nick. May contain error message about previous nick.
NickRequired(Option<String>),
/// Fully connected.
Nominal,
/// Not connected and not attempting any reconnects. This is likely due to
/// factors out of the application's control (e. g. no internet connection,
/// room does not exist), meaning that retrying without user intervention
/// doesn't make sense.
Stopped(StopReason),
}
/// State for when a websocket connection exists. /// State for when a websocket connection exists.
struct Connected { struct Connected {
tx: ConnTx, tx: ConnTx,
@ -38,149 +68,31 @@ struct Connected {
/// State for when a client has fully joined a room. /// State for when a client has fully joined a room.
pub struct Present { pub struct Present {
pub session: Session, session: Session,
pub others: HashMap<SessionId, Session>, others: HashMap<SessionId, Session>,
} }
pub enum Status { pub struct RoomState {
/// No action required by the UI.
Nominal,
/// User must enter a nick.
NickRequired,
CouldNotConnect,
InvalidRoom(String),
InvalidNick(String),
InvalidIdentity(String),
}
pub struct Room {
name: String,
identity: String, identity: String,
initial_nick: Option<String>, initial_nick: Option<String>,
status: Status, status: Status,
connected: Option<Connected>, connected: Option<Connected>,
present: Option<Present>, present: Option<Present>,
still_alive: Sender<Never>,
}
impl Room {
pub async fn new(
name: String,
identity: String,
initial_nick: Option<String>,
config: &'static Config,
) -> Arc<Mutex<Self>> {
let (tx, rx) = oneshot::channel();
let room = Arc::new(Mutex::new(Self {
name,
identity,
initial_nick,
status: Status::Nominal,
connected: None,
present: None,
still_alive: tx,
}));
let room_clone = room.clone();
tokio::spawn(async move {
tokio::select! {
_ = rx => {}
_ = Self::bg_task(room_clone, config) => {}
}
});
room
}
pub fn status(&self) -> &Status {
&self.status
}
pub fn connected(&self) -> bool {
self.connected.is_some()
}
pub fn present(&self) -> Option<&Present> {
self.present.as_ref()
}
async fn bg_task(room: Arc<Mutex<Room>>, config: &'static Config) {
let mut room_verified = false;
loop {
if let Ok((tx, rx, mt)) = Self::connect(&config.cove_url, config.timeout).await {
{
let mut room = room.lock().await;
room.status = Status::Nominal;
room.connected = Some(Connected {
tx,
next_id: 0,
replies: Replies::new(config.timeout),
});
}
tokio::select! {
_ = mt.perform() => {}
_ = Self::receive(room.clone(), rx, &mut room_verified) => {}
}
}
if !room_verified {
room.lock().await.status = Status::CouldNotConnect;
return;
}
}
}
async fn connect(
url: &str,
timeout: Duration,
) -> anyhow::Result<(ConnTx, ConnRx, ConnMaintenance)> {
let stream = tokio_tungstenite::connect_async(url).await?.0;
let conn = conn::new(stream, timeout)?;
Ok(conn)
}
async fn receive(
room: Arc<Mutex<Room>>,
mut rx: ConnRx,
room_verified: &mut bool,
) -> anyhow::Result<()> {
while let Some(packet) = rx.recv().await? {
match packet {
Packet::Cmd { .. } => {} // Ignore, the server never sends commands
Packet::Rpl { id, rpl } => {
room.lock().await.on_rpl(&room, id, rpl, room_verified)?;
}
Packet::Ntf { ntf } => room.lock().await.on_ntf(ntf),
}
}
Ok(())
} }
impl RoomState {
fn on_rpl( fn on_rpl(
&mut self, &mut self,
room: &Arc<Mutex<Room>>,
id: u64, id: u64,
rpl: Rpl, rpl: Rpl,
room_verified: &mut bool, room_verified: &mut Option<RoomVerified>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
match &rpl { match &rpl {
Rpl::Room(RoomRpl::Success) => { Rpl::Room(RoomRpl::Success) => {
*room_verified = true; *room_verified = Some(RoomVerified::Yes);
if let Some(nick) = &self.initial_nick {
// TODO Use previous nick if there is one
tokio::spawn(Self::identify(
room.clone(),
nick.clone(),
self.identity.clone(),
));
} else {
self.status = Status::NickRequired;
}
} }
Rpl::Room(RoomRpl::InvalidRoom { reason }) => { Rpl::Room(RoomRpl::InvalidRoom { reason }) => {
self.status = Status::InvalidRoom(reason.clone()); self.status = Status::Stopped(StopReason::InvalidRoom(reason.clone()));
anyhow::bail!("invalid room"); anyhow::bail!("invalid room");
} }
Rpl::Identify(IdentifyRpl::Success { Rpl::Identify(IdentifyRpl::Success {
@ -188,34 +100,26 @@ impl Room {
others, others,
last_message, last_message,
}) => { }) => {
let session = you.clone();
let others = others let others = others
.iter() .iter()
.map(|session| (session.id, session.clone())) .map(|session| (session.id, session.clone()))
.collect(); .collect();
self.present = Some(Present { self.present = Some(Present { session, others });
session: you.clone(),
others,
});
// TODO Send last message to store // TODO Send last message to store
} }
Rpl::Identify(IdentifyRpl::InvalidNick { reason }) => { Rpl::Identify(IdentifyRpl::InvalidNick { .. }) => {}
self.status = Status::InvalidNick(reason.clone()); Rpl::Identify(IdentifyRpl::InvalidIdentity { .. }) => {}
}
Rpl::Identify(IdentifyRpl::InvalidIdentity { reason }) => {
self.status = Status::InvalidIdentity(reason.clone());
}
Rpl::Nick(NickRpl::Success { you }) => { Rpl::Nick(NickRpl::Success { you }) => {
if let Some(present) = &mut self.present { if let Some(present) = &mut self.present {
present.session = you.clone(); present.session = you.clone();
} }
} }
Rpl::Nick(NickRpl::InvalidNick { reason }) => { Rpl::Nick(NickRpl::InvalidNick { .. }) => {}
self.status = Status::InvalidNick(reason.clone());
}
Rpl::Send(SendRpl::Success { message }) => { Rpl::Send(SendRpl::Success { message }) => {
// TODO Send message to store // TODO Send message to store
} }
Rpl::Send(SendRpl::InvalidContent { reason }) => {} Rpl::Send(SendRpl::InvalidContent { .. }) => {}
Rpl::Who(WhoRpl { you, others }) => { Rpl::Who(WhoRpl { you, others }) => {
if let Some(present) = &mut self.present { if let Some(present) = &mut self.present {
present.session = you.clone(); present.session = you.clone();
@ -257,14 +161,14 @@ impl Room {
} }
} }
async fn cmd<C, R>(room: &Mutex<Room>, cmd: C) -> Result<R, Error> async fn cmd<C, R>(state: &Mutex<RoomState>, cmd: C) -> Result<R, Error>
where where
C: Into<Cmd>, C: Into<Cmd>,
Rpl: TryInto<R>, Rpl: TryInto<R>,
{ {
let token = { let pending_reply = {
let mut room = room.lock().await; let mut state = state.lock().await;
let connected = room.connected.as_mut().ok_or(Error::NotConnected)?; let connected = state.connected.as_mut().ok_or(Error::NotConnected)?;
let id = connected.next_id; let id = connected.next_id;
connected.next_id += 1; connected.next_id += 1;
@ -274,13 +178,156 @@ impl Room {
pending_reply pending_reply
}; };
let rpl = token.get().await?; let rpl = pending_reply.get().await?;
let rpl = rpl.try_into().map_err(|_| Error::IncorrectReplyType)?; let rpl_value = rpl.try_into().map_err(|_| Error::IncorrectReplyType)?;
Ok(rpl) Ok(rpl_value)
} }
async fn identify(room: Arc<Mutex<Room>>, nick: String, identity: String) -> Result<(), Error> { async fn select_room_and_identify(
let result: IdentifyRpl = Self::cmd(&room, IdentifyCmd { nick, identity }).await?; state: Arc<Mutex<RoomState>>,
name: String,
) -> Result<(), Error> {
let result: RoomRpl = Self::cmd(&state, RoomCmd { name }).await?;
match result {
RoomRpl::Success => {}
RoomRpl::InvalidRoom { reason } => {
let mut state = state.lock().await;
state.status = Status::Stopped(StopReason::InvalidRoom(reason));
// FIXME This does not actually stop the room
state.connected = None;
return Ok(());
}
}
let nick = {
if let Some(nick) = &(state.lock().await).initial_nick {
nick.clone()
} else {
return Ok(());
}
};
Self::identify(&state, nick).await
}
async fn identify(state: &Mutex<Self>, nick: String) -> Result<(), Error> {
let identity = state.lock().await.identity.clone();
let result: IdentifyRpl = Self::cmd(state, IdentifyCmd { nick, identity }).await?;
Ok(())
}
}
pub struct Room {
state: Arc<Mutex<RoomState>>,
/// Once this is dropped, all other room-related tasks, connections and
/// values are cleaned up.
dead_mans_switch: Sender<Never>,
}
enum RoomVerified {
Yes,
No(StopReason),
}
impl Room {
pub async fn new(
config: &'static Config,
name: String,
identity: String,
initial_nick: Option<String>,
) -> Self {
let (tx, rx) = oneshot::channel();
let room = Room {
state: Arc::new(Mutex::new(RoomState {
identity,
initial_nick,
status: Status::Connecting,
connected: None,
present: None,
})),
dead_mans_switch: tx,
};
let state_clone = room.state.clone();
tokio::spawn(async move {
tokio::select! {
_ = rx => {} // Watch dead man's switch
_ = Self::run(state_clone, config,name) => {}
}
});
room
}
/// Background task to connect to a room and stay connected.
async fn run(state: Arc<Mutex<RoomState>>, config: &'static Config, name: String) {
// The room exists and we have successfully connected to it before
let mut room_verified = None;
loop {
// Try to connect and run
match Self::connect(&config.cove_url, config.timeout).await {
Ok((tx, rx, mt)) => {
state.lock().await.connected = Some(Connected {
tx,
next_id: 0,
replies: Replies::new(config.timeout),
});
tokio::select! {
_ = mt.perform() => {}
_ = Self::receive(&state, rx, &mut room_verified) => {}
}
}
Err(e) if room_verified.is_none() => {
room_verified = Some(RoomVerified::No(StopReason::CouldNotConnect(e)))
}
Err(_) => {}
}
// Clean up and maybe reconnect
{
let mut state = state.lock().await;
match room_verified {
Some(RoomVerified::Yes) => state.status = Status::Reconnecting,
Some(RoomVerified::No(reason)) => {
state.status = Status::Stopped(reason);
break;
}
None => {
state.status = Status::Stopped(StopReason::SomethingWentWrong);
break;
}
}
}
}
}
async fn connect(
url: &str,
timeout: Duration,
) -> Result<(ConnTx, ConnRx, ConnMaintenance), conn::Error> {
// This function exists to funnel errors using `?` short-circuiting.
// Inlining it would be annoying and verbose.
let stream = tokio_tungstenite::connect_async(url).await?.0;
let conn = conn::new(stream, timeout)?;
Ok(conn)
}
async fn receive(
state: &Mutex<RoomState>,
mut rx: ConnRx,
room_verified: &mut Option<RoomVerified>,
) -> anyhow::Result<()> {
while let Some(packet) = rx.recv().await? {
match packet {
Packet::Cmd { .. } => {} // Ignore, the server never sends commands
Packet::Rpl { id, rpl } => {
state.lock().await.on_rpl(&room, id, rpl, room_verified)?;
}
Packet::Ntf { ntf } => room.lock().await.on_ntf(ntf),
}
}
Ok(()) Ok(())
} }
} }