Handle connection identification

This commit is contained in:
Joscha 2022-03-02 22:50:43 +01:00
parent d5394476c6
commit f08c251bd1

View file

@ -4,25 +4,32 @@ use std::time::Duration;
use cove_core::conn::{self, ConnMaintenance, ConnRx, ConnTx}; use cove_core::conn::{self, ConnMaintenance, ConnRx, ConnTx};
use cove_core::packets::{ use cove_core::packets::{
IdentifyRpl, JoinNtf, NickNtf, NickRpl, Ntf, Packet, PartNtf, RoomRpl, Rpl, SendNtf, SendRpl, Cmd, IdentifyCmd, IdentifyRpl, JoinNtf, NickNtf, NickRpl, Ntf, Packet, PartNtf, RoomCmd,
WhoRpl, RoomRpl, Rpl, SendNtf, SendRpl, WhoRpl,
}; };
use cove_core::{Session, SessionId}; use cove_core::{Session, SessionId};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use crate::replies::Replies; use crate::replies::{self, Replies};
// TODO Split into "interacting" and "maintenance" parts?
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum Error { pub enum Error {
#[error("{0}")] #[error("{0}")]
Conn(#[from] conn::Error), Conn(#[from] conn::Error),
#[error("{0}")]
Reply(#[from] replies::Error),
#[error("invalid room: {0}")] #[error("invalid room: {0}")]
InvalidRoom(String), InvalidRoom(String),
#[error("invalid identity: {0}")] #[error("invalid identity: {0}")]
InvalidIdentity(String), InvalidIdentity(String),
#[error("maintenance aborted")] #[error("maintenance aborted")]
MaintenanceAborted, MaintenanceAborted,
#[error("not connected")]
NotConnected,
#[error("incorrect reply type")]
IncorrectReplyType,
} }
pub enum Event { pub enum Event {
@ -97,6 +104,7 @@ impl Status {
pub struct Connected { pub struct Connected {
tx: ConnTx, tx: ConnTx,
next_id: u64,
replies: Replies<u64, Rpl>, replies: Replies<u64, Rpl>,
status: Status, status: Status,
} }
@ -105,37 +113,114 @@ impl Connected {
fn new(tx: ConnTx, timeout: Duration) -> Self { fn new(tx: ConnTx, timeout: Duration) -> Self {
Self { Self {
tx, tx,
next_id: 0,
replies: Replies::new(timeout), replies: Replies::new(timeout),
status: Status::ChoosingRoom, status: Status::ChoosingRoom,
} }
} }
} }
pub enum CoveConn { pub enum State {
Connecting, Connecting,
Connected(Connected), Connected(Connected),
Stopped, Stopped,
} }
impl CoveConn { impl State {
fn connected(&self) -> Option<&Connected> { pub fn connected(&self) -> Option<&Connected> {
match self { match self {
CoveConn::Connected(connected) => Some(connected), Self::Connected(connected) => Some(connected),
CoveConn::Connecting | CoveConn::Stopped => None, Self::Connecting | Self::Stopped => None,
} }
} }
fn connected_mut(&mut self) -> Option<&mut Connected> { pub fn connected_mut(&mut self) -> Option<&mut Connected> {
match self { match self {
CoveConn::Connected(connected) => Some(connected), Self::Connected(connected) => Some(connected),
CoveConn::Connecting | CoveConn::Stopped => None, Self::Connecting | Self::Stopped => None,
} }
} }
} }
pub struct CoveConn {
state: State,
ev_tx: UnboundedSender<Event>,
}
impl CoveConn {
pub fn state(&self) -> &State {
&self.state
}
pub fn state_mut(&mut self) -> &mut State {
&mut self.state
}
pub fn connected(&self) -> Option<&Connected> {
self.state.connected()
}
pub fn connected_mut(&mut self) -> Option<&mut Connected> {
self.state.connected_mut()
}
async fn cmd<C, R>(conn: &Mutex<Self>, cmd: C) -> Result<R, Error>
where
C: Into<Cmd>,
Rpl: TryInto<R>,
{
let pending_reply = {
let mut conn = conn.lock().await;
let mut connected = conn.connected_mut().ok_or(Error::NotConnected)?;
let id = connected.next_id;
connected.next_id += 1;
let pending_reply = connected.replies.wait_for(id);
connected.tx.send(&Packet::cmd(id, cmd.into()))?;
pending_reply
};
let rpl = pending_reply.get().await?;
let rpl_value = rpl.try_into().map_err(|_| Error::IncorrectReplyType)?;
Ok(rpl_value)
}
/// Attempt to identify with a nick and identity. Does nothing if the room
/// doesn't require verification.
///
/// This method is intended to be called whenever a CoveConn user suspects
/// identification to be necessary. It has little overhead.
pub async fn identify(conn: Arc<Mutex<Self>>, nick: &str, identity: &str) {
{
let mut conn = conn.lock().await;
if let Some(connected) = conn.connected_mut() {
if let Status::IdRequired(_) = connected.status {
connected.status = Status::Identifying;
conn.ev_tx.send(Event::StateChanged);
} else {
return;
}
} else {
return;
}
}
let nick = nick.to_string();
let identity = identity.to_string();
tokio::spawn(async move {
// There's no need for a second locking block, or for us to see the
// result of this command. CoveConnMt::run will set the connection's
// status as appropriate.
Self::cmd::<IdentifyCmd, IdentifyRpl>(&conn, IdentifyCmd { nick, identity }).await
});
}
}
/// Maintenance for a [`CoveConn`]. /// Maintenance for a [`CoveConn`].
pub struct CoveConnMt { pub struct CoveConnMt {
url: String, url: String,
room: String,
timeout: Duration, timeout: Duration,
conn: Arc<Mutex<CoveConn>>, conn: Arc<Mutex<CoveConn>>,
ev_tx: UnboundedSender<Event>, ev_tx: UnboundedSender<Event>,
@ -146,21 +231,22 @@ impl CoveConnMt {
let (tx, rx, mt) = match Self::connect(&self.url, self.timeout).await { let (tx, rx, mt) = match Self::connect(&self.url, self.timeout).await {
Ok(conn) => conn, Ok(conn) => conn,
Err(e) => { Err(e) => {
*self.conn.lock().await = CoveConn::Stopped; *self.conn.lock().await.state_mut() = State::Stopped;
self.ev_tx.send(Event::StateChanged);
return Err(Error::Conn(e)); return Err(Error::Conn(e));
} }
}; };
*self.conn.lock().await = CoveConn::Connected(Connected::new(tx, self.timeout)); *self.conn.lock().await.state_mut() = State::Connected(Connected::new(tx, self.timeout));
self.ev_tx.send(Event::StateChanged); self.ev_tx.send(Event::StateChanged);
// TODO Spawn task to join room tokio::spawn(Self::join_room(self.conn.clone(), self.room));
let result = tokio::select! { let result = tokio::select! {
result = Self::recv(&self.conn, &self.ev_tx, rx) => result, result = Self::recv(&self.conn, &self.ev_tx, rx) => result,
_ = mt.perform() => Err(Error::MaintenanceAborted), _ = mt.perform() => Err(Error::MaintenanceAborted),
}; };
*self.conn.lock().await = CoveConn::Stopped; *self.conn.lock().await.state_mut() = State::Stopped;
self.ev_tx.send(Event::StateChanged); self.ev_tx.send(Event::StateChanged);
result result
@ -175,6 +261,11 @@ impl CoveConnMt {
Ok(conn) Ok(conn)
} }
async fn join_room(conn: Arc<Mutex<CoveConn>>, name: String) -> Result<(), Error> {
let reply: RoomRpl = CoveConn::cmd(&conn, RoomCmd { name }).await?;
Ok(())
}
async fn recv( async fn recv(
conn: &Mutex<CoveConn>, conn: &Mutex<CoveConn>,
ev_tx: &UnboundedSender<Event>, ev_tx: &UnboundedSender<Event>,
@ -206,7 +297,8 @@ impl CoveConnMt {
match &rpl { match &rpl {
Rpl::Room(RoomRpl::Success) => { Rpl::Room(RoomRpl::Success) => {
// TODO Send event that joining room was successful? connected.status = Status::IdRequired(None);
ev_tx.send(Event::StateChanged);
} }
Rpl::Room(RoomRpl::InvalidRoom { reason }) => { Rpl::Room(RoomRpl::InvalidRoom { reason }) => {
return Err(Error::InvalidRoom(reason.clone())) return Err(Error::InvalidRoom(reason.clone()))
@ -215,7 +307,10 @@ impl CoveConnMt {
connected.status = Status::Present(Present::new(you, others)); connected.status = Status::Present(Present::new(you, others));
ev_tx.send(Event::StateChanged); ev_tx.send(Event::StateChanged);
} }
Rpl::Identify(IdentifyRpl::InvalidNick { reason }) => {} Rpl::Identify(IdentifyRpl::InvalidNick { reason }) => {
connected.status = Status::IdRequired(Some(reason.clone()));
ev_tx.send(Event::StateChanged);
}
Rpl::Identify(IdentifyRpl::InvalidIdentity { reason }) => { Rpl::Identify(IdentifyRpl::InvalidIdentity { reason }) => {
return Err(Error::InvalidIdentity(reason.clone())) return Err(Error::InvalidIdentity(reason.clone()))
} }
@ -284,12 +379,17 @@ impl CoveConnMt {
pub async fn new( pub async fn new(
url: String, url: String,
room: String,
timeout: Duration, timeout: Duration,
) -> (Arc<Mutex<CoveConn>>, CoveConnMt, UnboundedReceiver<Event>) { ) -> (Arc<Mutex<CoveConn>>, CoveConnMt, UnboundedReceiver<Event>) {
let conn = Arc::new(Mutex::new(CoveConn::Connecting));
let (ev_tx, ev_rx) = mpsc::unbounded_channel(); let (ev_tx, ev_rx) = mpsc::unbounded_channel();
let conn = Arc::new(Mutex::new(CoveConn {
state: State::Connecting,
ev_tx: ev_tx.clone(),
}));
let mt = CoveConnMt { let mt = CoveConnMt {
url, url,
room,
timeout, timeout,
conn, conn,
ev_tx, ev_tx,