Continue implementing rooms

Changing lots of things along the way... But that's how it is: Make one
change, make more changes to fix the resulting errors and so on.
This commit is contained in:
Joscha 2022-02-19 01:01:52 +01:00
parent 31ffa5cd67
commit 992e84e67e
12 changed files with 791 additions and 141 deletions

View file

@ -5,12 +5,15 @@ edition = "2021"
[dependencies]
anyhow = "1.0.53"
clap = { version = "3.1.0", features = ["derive"] }
cove-core = { path = "../cove-core" }
crossterm = "0.22.1"
# futures = "0.3.21"
# serde_json = "1.0.78"
# thiserror = "1.0.30"
thiserror = "1.0.30"
tokio = { version = "1.16.1", features = ["full"] }
# tokio-stream = "0.1.8"
# tokio-tungstenite = "0.16.1"
tokio-tungstenite = { version = "0.16.1", features = [
"rustls-tls-native-roots",
] }
tui = "0.17.0"

24
cove-tui/src/config.rs Normal file
View file

@ -0,0 +1,24 @@
use std::time::Duration;
use clap::Parser;
#[derive(Debug, Parser)]
pub struct Args {
#[clap(long, default_value_t = String::from("wss://plugh.de/cove/"))]
cove_url: String,
}
pub struct Config {
pub cove_url: String,
pub timeout: Duration,
}
impl Config {
pub fn load() -> Self {
let args = Args::parse();
Self {
cove_url: args.cove_url,
timeout: Duration::from_secs(10),
}
}
}

View file

@ -1,9 +1,12 @@
mod replies;
mod room;
mod config;
mod never;
use std::io::{self, Stdout};
use std::time::Duration;
use config::Config;
use crossterm::event::{DisableMouseCapture, EnableMouseCapture};
use crossterm::execute;
use crossterm::terminal::{EnterAlternateScreen, LeaveAlternateScreen};
@ -23,6 +26,8 @@ async fn run(terminal: &mut Terminal<CrosstermBackend<Stdout>>) -> anyhow::Resul
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let config = Config::load();
let mut terminal = Terminal::new(CrosstermBackend::new(io::stdout()))?;
crossterm::terminal::enable_raw_mode()?;

2
cove-tui/src/never.rs Normal file
View file

@ -0,0 +1,2 @@
// TODO Replace with `!` when it is stabilised
pub enum Never {}

View file

@ -6,8 +6,11 @@ use std::time::Duration;
use tokio::sync::oneshot::{self, Receiver, Sender};
use tokio::time;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("timed out")]
TimedOut,
#[error("canceled")]
Canceled,
}
@ -35,7 +38,14 @@ pub struct Replies<I, R> {
}
impl<I: Eq + Hash, R> Replies<I, R> {
pub async fn wait_for(&mut self, id: I) -> PendingReply<R> {
pub fn new(timeout: Duration) -> Self {
Self {
timeout,
pending: HashMap::new(),
}
}
pub fn wait_for(&mut self, id: I) -> PendingReply<R> {
let (tx, rx) = oneshot::channel();
self.pending.insert(id, tx);
PendingReply {

View file

@ -1,63 +1,280 @@
use std::any;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use cove_core::conn::ConnTx;
use anyhow::bail;
use cove_core::conn::{self, ConnMaintenance, ConnRx, ConnTx};
use cove_core::packets::{
Cmd, IdentifyCmd, IdentifyRpl, JoinNtf, NickRpl, Ntf, Packet, RoomCmd, RoomRpl, Rpl, SendRpl,
WhoRpl,
};
use cove_core::{Session, SessionId};
use tokio::sync::oneshot::{self, Sender};
use tokio::sync::Mutex;
pub enum ConnectedState {
ChoosingNick,
Identifying,
Online,
use crate::config::Config;
use crate::never::Never;
use crate::replies::{self, Replies};
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("not connected")]
NotConnected,
#[error("not present")]
NotPresent,
#[error("incorrect reply type")]
IncorrectReplyType,
#[error("{0}")]
Conn(#[from] conn::Error),
#[error("{0}")]
Replies(#[from] replies::Error),
}
pub enum RoomState {
Connecting,
Reconnecting,
Connected { state: ConnectedState, tx: ConnTx },
DoesNotExist,
/// State for when a websocket connection exists.
struct Connected {
tx: ConnTx,
next_id: u64,
replies: Replies<u64, Rpl>,
}
/// State for when a client has fully joined a room.
struct Present {
session: Session,
others: HashMap<SessionId, Session>,
}
enum Status {
/// No action required by the UI.
Nominal,
/// User must enter a nick.
NickRequired,
/// Identifying to the server. No action required by the UI.
Identifying,
CouldNotConnect,
InvalidRoom(String),
InvalidNick(String),
InvalidIdentity(String),
InvalidContent(String),
}
pub struct Room {
name: String,
state: RoomState,
nick: Option<String>,
others: HashMap<SessionId, Session>,
stop: Sender<()>,
identity: String,
initial_nick: Option<String>,
status: Status,
connected: Option<Connected>,
present: Option<Present>,
still_alive: Sender<Never>,
}
impl Room {
pub async fn create(name: String) -> Arc<Mutex<Self>> {
pub async fn new(
name: String,
identity: String,
initial_nick: Option<String>,
config: &'static Config,
) -> Arc<Mutex<Self>> {
let (tx, rx) = oneshot::channel();
let room = Self {
let room = Arc::new(Mutex::new(Self {
name,
state: RoomState::Connecting,
nick: None,
others: HashMap::new(),
stop: tx,
};
let room = Arc::new(Mutex::new(room));
identity,
initial_nick,
status: Status::Nominal,
connected: None,
present: None,
still_alive: tx,
}));
let room_clone = room.clone();
tokio::spawn(async {
tokio::spawn(async move {
tokio::select! {
_ = rx => {},
_ = Self::connect(room_clone) => {}
_ = rx => {}
_ = Self::bg_task(room_clone, config) => {}
}
});
room
}
async fn connect(room: Arc<Mutex<Self>>) {
todo!()
async fn bg_task(room: Arc<Mutex<Room>>, config: &'static Config) {
let mut room_verified = false;
loop {
if let Ok((tx, rx, mt)) = Self::connect(&config.cove_url, config.timeout).await {
{
let mut room = room.lock().await;
room.status = Status::Nominal;
room.connected = Some(Connected {
tx,
next_id: 0,
replies: Replies::new(config.timeout),
});
}
tokio::select! {
_ = mt.perform() => {}
_ = Self::receive(room.clone(), rx, &mut room_verified) => {}
}
}
if !room_verified {
room.lock().await.status = Status::CouldNotConnect;
return;
}
}
}
pub fn stop(self) {
// If the send goes wrong because the other end has hung up, it's
// already stopped and there's nothing to do.
let _ = self.stop.send(());
async fn connect(
url: &str,
timeout: Duration,
) -> anyhow::Result<(ConnTx, ConnRx, ConnMaintenance)> {
let stream = tokio_tungstenite::connect_async(url).await?.0;
let conn = conn::new(stream, timeout)?;
Ok(conn)
}
async fn receive(
room: Arc<Mutex<Room>>,
mut rx: ConnRx,
room_verified: &mut bool,
) -> anyhow::Result<()> {
while let Some(packet) = rx.recv().await? {
match packet {
Packet::Cmd { .. } => {} // Ignore, the server never sends commands
Packet::Rpl { id, rpl } => {
room.lock().await.on_rpl(&room, id, rpl, room_verified)?;
}
Packet::Ntf { ntf } => room.lock().await.on_ntf(ntf),
}
}
Ok(())
}
fn on_rpl(
&mut self,
room: &Arc<Mutex<Room>>,
id: u64,
rpl: Rpl,
room_verified: &mut bool,
) -> anyhow::Result<()> {
match &rpl {
Rpl::Room(RoomRpl::Success) => {
*room_verified = true;
if let Some(nick) = &self.initial_nick {
tokio::spawn(Self::identify(
room.clone(),
nick.clone(),
self.identity.clone(),
));
} else {
self.status = Status::NickRequired;
}
}
Rpl::Room(RoomRpl::InvalidRoom { reason }) => {
self.status = Status::InvalidRoom(reason.clone());
anyhow::bail!("invalid room");
}
Rpl::Identify(IdentifyRpl::Success {
you,
others,
last_message,
}) => {
let others = others
.iter()
.map(|session| (session.id, session.clone()))
.collect();
self.present = Some(Present {
session: you.clone(),
others,
});
// TODO Send last message to store
}
Rpl::Identify(IdentifyRpl::InvalidNick { reason }) => {
self.status = Status::InvalidNick(reason.clone());
}
Rpl::Identify(IdentifyRpl::InvalidIdentity { reason }) => {
self.status = Status::InvalidIdentity(reason.clone());
}
Rpl::Nick(NickRpl::Success { you }) => {
if let Some(present) = &mut self.present {
present.session = you.clone();
}
}
Rpl::Nick(NickRpl::InvalidNick { reason }) => {
self.status = Status::InvalidNick(reason.clone());
}
Rpl::Send(SendRpl::Success { message }) => {
// TODO Send message to store
}
Rpl::Send(SendRpl::InvalidContent { reason }) => {
self.status = Status::InvalidContent(reason.clone());
}
Rpl::Who(WhoRpl { you, others }) => {
if let Some(present) = &mut self.present {
present.session = you.clone();
present.others = others
.iter()
.map(|session| (session.id, session.clone()))
.collect();
}
}
}
if let Some(connected) = &mut self.connected {
connected.replies.complete(&id, rpl);
}
Ok(())
}
fn on_ntf(&mut self, ntf: Ntf) {
match ntf {
Ntf::Join(join) => {
if let Some(present) = &mut self.present {
present.others.insert(join.who.id, join.who);
}
}
Ntf::Nick(nick) => {
if let Some(present) = &mut self.present {
present.others.insert(nick.who.id, nick.who);
}
}
Ntf::Part(part) => {
if let Some(present) = &mut self.present {
present.others.remove(&part.who.id);
}
}
Ntf::Send(_) => {
// TODO Send message to store
}
}
}
async fn cmd<C, R>(room: &Mutex<Room>, cmd: C) -> Result<R, Error>
where
C: Into<Cmd>,
Rpl: TryInto<R>,
{
let token = {
let mut room = room.lock().await;
let connected = room.connected.as_mut().ok_or(Error::NotConnected)?;
let id = connected.next_id;
connected.next_id += 1;
let pending_reply = connected.replies.wait_for(id);
connected.tx.send(&Packet::cmd(id, cmd.into()))?;
pending_reply
};
let rpl = token.get().await?;
let rpl = rpl.try_into().map_err(|_| Error::IncorrectReplyType)?;
Ok(rpl)
}
async fn identify(room: Arc<Mutex<Room>>, nick: String, identity: String) -> Result<(), Error> {
let result: IdentifyRpl = Self::cmd(&room, IdentifyCmd { nick, identity }).await?;
Ok(())
}
}