Switch euph::Room to use euphoxide's Instance
This commit is contained in:
parent
b94dfbdc31
commit
8dd5db5888
5 changed files with 408 additions and 640 deletions
669
src/euph/room.rs
669
src/euph/room.rs
|
|
@ -1,187 +1,143 @@
|
|||
use std::convert::Infallible;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::bail;
|
||||
use cookie::{Cookie, CookieJar};
|
||||
use euphoxide::api::packet::ParsedPacket;
|
||||
use euphoxide::api::{
|
||||
Auth, AuthOption, Data, Log, Login, Logout, MessageId, Nick, Send, Time, UserId,
|
||||
Auth, AuthOption, Data, Log, Login, Logout, MessageId, Nick, Send, SendEvent, SendReply, Time,
|
||||
UserId,
|
||||
};
|
||||
use euphoxide::conn::{Conn, ConnTx, Joining, State as ConnState};
|
||||
use log::{error, info, warn};
|
||||
use parking_lot::Mutex;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tokio::{select, task};
|
||||
use tokio_tungstenite::tungstenite;
|
||||
use tokio_tungstenite::tungstenite::http::HeaderValue;
|
||||
use euphoxide::bot::instance::{Event, Instance, InstanceConfig, Snapshot};
|
||||
use euphoxide::conn::{self, ConnTx};
|
||||
use log::{debug, error, info, warn};
|
||||
use tokio::select;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use crate::macros::ok_or_return;
|
||||
use crate::vault::{EuphRoomVault, EuphVault};
|
||||
use crate::vault::EuphRoomVault;
|
||||
|
||||
const TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const RECONNECT_INTERVAL: Duration = Duration::from_secs(60);
|
||||
const LOG_INTERVAL: Duration = Duration::from_secs(10);
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum Error {
|
||||
#[error("room stopped")]
|
||||
Stopped,
|
||||
}
|
||||
|
||||
pub enum EuphRoomEvent {
|
||||
Connected,
|
||||
Disconnected,
|
||||
Packet(Box<ParsedPacket>),
|
||||
Stopped,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum Event {
|
||||
// Events
|
||||
Connected(ConnTx),
|
||||
pub enum State {
|
||||
Disconnected,
|
||||
Packet(Box<ParsedPacket>),
|
||||
// Commands
|
||||
State(oneshot::Sender<Option<ConnState>>),
|
||||
RequestLogs,
|
||||
Auth(String),
|
||||
Nick(String),
|
||||
Send(Option<MessageId>, String, oneshot::Sender<MessageId>),
|
||||
Login { email: String, password: String },
|
||||
Logout,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct State {
|
||||
name: String,
|
||||
username: Option<String>,
|
||||
force_username: bool,
|
||||
password: Option<String>,
|
||||
vault: EuphRoomVault,
|
||||
|
||||
conn_tx: Option<ConnTx>,
|
||||
/// `None` before any `snapshot-event`, then either `Some(None)` or
|
||||
/// `Some(Some(id))`.
|
||||
last_msg_id: Option<Option<MessageId>>,
|
||||
requesting_logs: Arc<Mutex<bool>>,
|
||||
Connecting,
|
||||
Connected(ConnTx, conn::State),
|
||||
Stopped,
|
||||
}
|
||||
|
||||
impl State {
|
||||
async fn run(
|
||||
mut self,
|
||||
canary: oneshot::Receiver<Infallible>,
|
||||
event_tx: mpsc::UnboundedSender<Event>,
|
||||
mut event_rx: mpsc::UnboundedReceiver<Event>,
|
||||
euph_room_event_tx: mpsc::UnboundedSender<EuphRoomEvent>,
|
||||
ephemeral: bool,
|
||||
) {
|
||||
let vault = self.vault.clone();
|
||||
let name = self.name.clone();
|
||||
let result = if ephemeral {
|
||||
select! {
|
||||
_ = canary => Ok(()),
|
||||
_ = Self::reconnect(&vault, &name, &event_tx) => Ok(()),
|
||||
e = self.handle_events(&mut event_rx, &euph_room_event_tx) => e,
|
||||
}
|
||||
pub fn conn_tx(&self) -> Option<&ConnTx> {
|
||||
if let Self::Connected(conn_tx, _) = self {
|
||||
Some(conn_tx)
|
||||
} else {
|
||||
select! {
|
||||
_ = canary => Ok(()),
|
||||
_ = Self::reconnect(&vault, &name, &event_tx) => Ok(()),
|
||||
e = self.handle_events(&mut event_rx, &euph_room_event_tx) => e,
|
||||
_ = Self::regularly_request_logs(&event_tx) => Ok(()),
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = result {
|
||||
error!("e&{name}: {}", e);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that whoever is using this room knows that it's gone.
|
||||
// Otherwise, the users of the Room may be left in an inconsistent or
|
||||
// outdated state, and the UI may not update correctly.
|
||||
let _ = euph_room_event_tx.send(EuphRoomEvent::Stopped);
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum Error {
|
||||
#[error("not connected to room")]
|
||||
NotConnected,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Room {
|
||||
vault: EuphRoomVault,
|
||||
ephemeral: bool,
|
||||
|
||||
instance: Instance,
|
||||
state: State,
|
||||
|
||||
/// `None` before any `snapshot-event`, then either `Some(None)` or
|
||||
/// `Some(Some(id))`. Reset whenever connection is lost.
|
||||
last_msg_id: Option<Option<MessageId>>,
|
||||
|
||||
/// `Some` while `Self::regularly_request_logs` is running. Set to `None` to
|
||||
/// drop the sender and stop the task.
|
||||
log_request_canary: Option<oneshot::Sender<Infallible>>,
|
||||
}
|
||||
|
||||
impl Room {
|
||||
pub fn new<F>(vault: EuphRoomVault, instance_config: InstanceConfig, on_event: F) -> Self
|
||||
where
|
||||
F: Fn(Event) + std::marker::Send + Sync + 'static,
|
||||
{
|
||||
// &rl2dev's message history is broken and requesting old messages past
|
||||
// a certain point results in errors. Cove should not keep retrying log
|
||||
// requests when hitting that limit, so &rl2dev is always opened in
|
||||
// ephemeral mode.
|
||||
let ephemeral = vault.vault().vault().ephemeral() || vault.room() == "rl2dev";
|
||||
|
||||
Self {
|
||||
vault,
|
||||
ephemeral,
|
||||
instance: instance_config.build(on_event),
|
||||
state: State::Disconnected,
|
||||
last_msg_id: None,
|
||||
log_request_canary: None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn reconnect(
|
||||
vault: &EuphRoomVault,
|
||||
name: &str,
|
||||
event_tx: &mpsc::UnboundedSender<Event>,
|
||||
) -> anyhow::Result<()> {
|
||||
loop {
|
||||
info!("e&{}: connecting", name);
|
||||
let connected = if let Some(mut conn) = Self::connect(vault, name).await? {
|
||||
info!("e&{}: connected", name);
|
||||
event_tx.send(Event::Connected(conn.tx().clone()))?;
|
||||
pub fn stopped(&self) -> bool {
|
||||
self.instance.stopped()
|
||||
}
|
||||
|
||||
while let Ok(packet) = conn.recv().await {
|
||||
event_tx.send(Event::Packet(Box::new(packet)))?;
|
||||
pub fn state(&self) -> &State {
|
||||
&self.state
|
||||
}
|
||||
|
||||
fn conn_tx(&self) -> Result<&ConnTx, Error> {
|
||||
self.state.conn_tx().ok_or(Error::NotConnected)
|
||||
}
|
||||
|
||||
pub fn handle_event(&mut self, event: Event) {
|
||||
match event {
|
||||
Event::Connecting(_) => {
|
||||
self.state = State::Connecting;
|
||||
|
||||
// Juuust to make sure
|
||||
self.last_msg_id = None;
|
||||
self.log_request_canary = None;
|
||||
}
|
||||
Event::Connected(_, Snapshot { conn_tx, state }) => {
|
||||
if !self.ephemeral {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.log_request_canary = Some(tx);
|
||||
let vault_clone = self.vault.clone();
|
||||
let conn_tx_clone = conn_tx.clone();
|
||||
debug!("{}: spawning log request task", self.instance.config().name);
|
||||
tokio::task::spawn(async move {
|
||||
select! {
|
||||
_ = rx => {},
|
||||
_ = Self::regularly_request_logs(vault_clone, conn_tx_clone) => {},
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
info!("e&{}: disconnected", name);
|
||||
event_tx.send(Event::Disconnected)?;
|
||||
self.state = State::Connected(conn_tx, state);
|
||||
|
||||
true
|
||||
} else {
|
||||
info!("e&{}: could not connect", name);
|
||||
event_tx.send(Event::Disconnected)?;
|
||||
false
|
||||
};
|
||||
|
||||
// Only delay reconnecting if the previous attempt failed. This way,
|
||||
// we'll reconnect immediately if we login or logout.
|
||||
if !connected {
|
||||
tokio::time::sleep(RECONNECT_INTERVAL).await;
|
||||
let cookies = &*self.instance.config().server.cookies;
|
||||
let cookies = cookies.lock().unwrap().clone();
|
||||
self.vault.vault().set_cookies(cookies);
|
||||
}
|
||||
Event::Packet(_, packet, Snapshot { conn_tx, state }) => {
|
||||
self.state = State::Connected(conn_tx, state);
|
||||
self.on_packet(packet);
|
||||
}
|
||||
Event::Disconnected(_) => {
|
||||
self.state = State::Disconnected;
|
||||
self.last_msg_id = None;
|
||||
self.log_request_canary = None;
|
||||
}
|
||||
Event::Stopped(_) => {
|
||||
// TODO Remove room somewhere if this happens? If it doesn't already happen during stabilization
|
||||
self.state = State::Stopped;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_cookies(vault: &EuphVault) -> String {
|
||||
let cookie_jar = vault.cookies().await;
|
||||
let cookies = cookie_jar
|
||||
.iter()
|
||||
.map(|c| format!("{}", c.stripped()))
|
||||
.collect::<Vec<_>>();
|
||||
cookies.join("; ")
|
||||
}
|
||||
|
||||
fn update_cookies(vault: &EuphVault, set_cookies: &[HeaderValue]) {
|
||||
let mut cookie_jar = CookieJar::new();
|
||||
|
||||
for cookie in set_cookies {
|
||||
if let Ok(cookie) = cookie.to_str() {
|
||||
if let Ok(cookie) = Cookie::from_str(cookie) {
|
||||
cookie_jar.add(cookie)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
vault.set_cookies(cookie_jar);
|
||||
}
|
||||
|
||||
async fn connect(vault: &EuphRoomVault, name: &str) -> anyhow::Result<Option<Conn>> {
|
||||
// TODO Set user agent?
|
||||
|
||||
let cookies = Self::get_cookies(vault.vault()).await;
|
||||
let cookies = HeaderValue::from_str(&cookies).expect("valid cookies");
|
||||
|
||||
match Conn::connect("euphoria.io", name, true, Some(cookies), TIMEOUT).await {
|
||||
Ok((rx, set_cookies)) => {
|
||||
Self::update_cookies(vault.vault(), &set_cookies);
|
||||
Ok(Some(rx))
|
||||
}
|
||||
Err(tungstenite::Error::Http(resp)) if resp.status().is_client_error() => {
|
||||
bail!("room {name} doesn't exist");
|
||||
}
|
||||
Err(tungstenite::Error::Url(_) | tungstenite::Error::HttpFormat(_)) => {
|
||||
bail!("format error for room {name}");
|
||||
}
|
||||
Err(_) => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
async fn regularly_request_logs(event_tx: &mpsc::UnboundedSender<Event>) {
|
||||
async fn regularly_request_logs(vault: EuphRoomVault, conn_tx: ConnTx) {
|
||||
// TODO Make log downloading smarter
|
||||
|
||||
// Possible log-related mechanics. Some of these could also run in some
|
||||
|
|
@ -210,322 +166,130 @@ impl State {
|
|||
|
||||
loop {
|
||||
tokio::time::sleep(LOG_INTERVAL).await;
|
||||
let _ = event_tx.send(Event::RequestLogs);
|
||||
Self::request_logs(&vault, &conn_tx).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_events(
|
||||
&mut self,
|
||||
event_rx: &mut mpsc::UnboundedReceiver<Event>,
|
||||
euph_room_event_tx: &mpsc::UnboundedSender<EuphRoomEvent>,
|
||||
) -> anyhow::Result<()> {
|
||||
while let Some(event) = event_rx.recv().await {
|
||||
match event {
|
||||
Event::Connected(conn_tx) => {
|
||||
self.conn_tx = Some(conn_tx);
|
||||
let _ = euph_room_event_tx.send(EuphRoomEvent::Connected);
|
||||
}
|
||||
Event::Disconnected => {
|
||||
self.conn_tx = None;
|
||||
self.last_msg_id = None;
|
||||
let _ = euph_room_event_tx.send(EuphRoomEvent::Disconnected);
|
||||
}
|
||||
Event::Packet(packet) => {
|
||||
self.on_packet(&packet).await?;
|
||||
let _ = euph_room_event_tx.send(EuphRoomEvent::Packet(packet));
|
||||
}
|
||||
Event::State(reply_tx) => self.on_state(reply_tx).await,
|
||||
Event::RequestLogs => self.on_request_logs(),
|
||||
Event::Auth(password) => self.on_auth(password),
|
||||
Event::Nick(name) => self.on_nick(name),
|
||||
Event::Send(parent, content, id_tx) => self.on_send(parent, content, id_tx),
|
||||
Event::Login { email, password } => self.on_login(email, password),
|
||||
Event::Logout => self.on_logout(),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn own_user_id(&self) -> Option<UserId> {
|
||||
Some(match self.conn_tx.as_ref()?.state().await.ok()? {
|
||||
ConnState::Joining(Joining { hello, .. }) => hello?.session.id,
|
||||
ConnState::Joined(joined) => joined.session.id,
|
||||
})
|
||||
}
|
||||
|
||||
async fn on_packet(&mut self, packet: &ParsedPacket) -> anyhow::Result<()> {
|
||||
let data = ok_or_return!(&packet.content, Ok(()));
|
||||
match data {
|
||||
Data::BounceEvent(_) => {
|
||||
if let Some(password) = &self.password {
|
||||
// Try to authenticate with the configured password, but no
|
||||
// promises if it doesn't work. In particular, we only ever
|
||||
// try this password once.
|
||||
self.on_auth(password.clone());
|
||||
}
|
||||
}
|
||||
Data::DisconnectEvent(d) => {
|
||||
warn!("e&{}: disconnected for reason {:?}", self.name, d.reason);
|
||||
}
|
||||
Data::HelloEvent(_) => {}
|
||||
Data::JoinEvent(d) => {
|
||||
info!("e&{}: {:?} joined", self.name, d.0.name);
|
||||
}
|
||||
Data::LoginEvent(_) => {}
|
||||
Data::LogoutEvent(_) => {}
|
||||
Data::NetworkEvent(d) => {
|
||||
info!("e&{}: network event ({})", self.name, d.r#type);
|
||||
}
|
||||
Data::NickEvent(d) => {
|
||||
info!("e&{}: {:?} renamed to {:?}", self.name, d.from, d.to);
|
||||
}
|
||||
Data::EditMessageEvent(_) => {
|
||||
info!("e&{}: a message was edited", self.name);
|
||||
}
|
||||
Data::PartEvent(d) => {
|
||||
info!("e&{}: {:?} left", self.name, d.0.name);
|
||||
}
|
||||
Data::PingEvent(_) => {}
|
||||
Data::PmInitiateEvent(d) => {
|
||||
// TODO Show info popup and automatically join PM room
|
||||
info!(
|
||||
"e&{}: {:?} initiated a pm from &{}",
|
||||
self.name, d.from_nick, d.from_room
|
||||
);
|
||||
}
|
||||
Data::SendEvent(d) => {
|
||||
let own_user_id = self.own_user_id().await;
|
||||
if let Some(last_msg_id) = &mut self.last_msg_id {
|
||||
let id = d.0.id;
|
||||
self.vault
|
||||
.add_msg(Box::new(d.0.clone()), *last_msg_id, own_user_id);
|
||||
*last_msg_id = Some(id);
|
||||
} else {
|
||||
bail!("send event before snapshot event");
|
||||
}
|
||||
}
|
||||
Data::SnapshotEvent(d) => {
|
||||
info!("e&{}: successfully joined", self.name);
|
||||
self.vault.join(Time::now());
|
||||
self.last_msg_id = Some(d.log.last().map(|m| m.id));
|
||||
let own_user_id = self.own_user_id().await;
|
||||
self.vault.add_msgs(d.log.clone(), None, own_user_id);
|
||||
|
||||
if let Some(username) = &self.username {
|
||||
if self.force_username || d.nick.is_none() {
|
||||
self.on_nick(username.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
Data::LogReply(d) => {
|
||||
let own_user_id = self.own_user_id().await;
|
||||
self.vault.add_msgs(d.log.clone(), d.before, own_user_id);
|
||||
}
|
||||
Data::SendReply(d) => {
|
||||
let own_user_id = self.own_user_id().await;
|
||||
if let Some(last_msg_id) = &mut self.last_msg_id {
|
||||
let id = d.0.id;
|
||||
self.vault
|
||||
.add_msg(Box::new(d.0.clone()), *last_msg_id, own_user_id);
|
||||
*last_msg_id = Some(id);
|
||||
} else {
|
||||
bail!("send reply before snapshot event");
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn on_state(&self, reply_tx: oneshot::Sender<Option<ConnState>>) {
|
||||
let state = if let Some(conn_tx) = &self.conn_tx {
|
||||
conn_tx.state().await.ok()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let _ = reply_tx.send(state);
|
||||
}
|
||||
|
||||
fn on_request_logs(&self) {
|
||||
if let Some(conn_tx) = &self.conn_tx {
|
||||
// Check whether logs are already being requested
|
||||
let mut guard = self.requesting_logs.lock();
|
||||
if *guard {
|
||||
return;
|
||||
} else {
|
||||
*guard = true;
|
||||
}
|
||||
drop(guard);
|
||||
|
||||
// No logs are being requested and we've reserved our spot, so let's
|
||||
// request some logs!
|
||||
let vault = self.vault.clone();
|
||||
let conn_tx = conn_tx.clone();
|
||||
let requesting_logs = self.requesting_logs.clone();
|
||||
task::spawn(async move {
|
||||
let result = Self::request_logs(vault, conn_tx).await;
|
||||
*requesting_logs.lock() = false;
|
||||
result
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn request_logs(vault: EuphRoomVault, conn_tx: ConnTx) -> anyhow::Result<()> {
|
||||
async fn request_logs(vault: &EuphRoomVault, conn_tx: &ConnTx) {
|
||||
let before = match vault.last_span().await {
|
||||
Some((None, _)) => return Ok(()), // Already at top of room history
|
||||
Some((None, _)) => return, // Already at top of room history
|
||||
Some((Some(before), _)) => Some(before),
|
||||
None => None,
|
||||
};
|
||||
|
||||
debug!("{}: requesting logs", vault.room());
|
||||
|
||||
// &rl2dev's message history is broken and requesting old messages past
|
||||
// a certain point results in errors. By reducing the amount of messages
|
||||
// in each log request, we can get closer to this point. Since &rl2dev
|
||||
// is fairly low in activity, this should be fine.
|
||||
let n = if vault.room() == "rl2dev" { 50 } else { 1000 };
|
||||
|
||||
let _ = conn_tx.send(Log { n, before }).await?;
|
||||
let _ = conn_tx.send(Log { n, before }).await;
|
||||
// The code handling incoming events and replies also handles
|
||||
// `LogReply`s, so we don't need to do anything special here.
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn on_auth(&self, password: String) {
|
||||
if let Some(conn_tx) = &self.conn_tx {
|
||||
let conn_tx = conn_tx.clone();
|
||||
task::spawn(async move {
|
||||
let _ = conn_tx
|
||||
.send(Auth {
|
||||
r#type: AuthOption::Passcode,
|
||||
passcode: Some(password),
|
||||
})
|
||||
.await;
|
||||
});
|
||||
fn own_user_id(&self) -> Option<UserId> {
|
||||
if let State::Connected(_, state) = &self.state {
|
||||
Some(match state {
|
||||
conn::State::Joining(joining) => joining.hello.as_ref()?.session.id.clone(),
|
||||
conn::State::Joined(joined) => joined.session.id.clone(),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn on_nick(&self, name: String) {
|
||||
if let Some(conn_tx) = &self.conn_tx {
|
||||
let conn_tx = conn_tx.clone();
|
||||
task::spawn(async move {
|
||||
let _ = conn_tx.send(Nick { name }).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn on_send(
|
||||
&self,
|
||||
parent: Option<MessageId>,
|
||||
content: String,
|
||||
id_tx: oneshot::Sender<MessageId>,
|
||||
) {
|
||||
if let Some(conn_tx) = &self.conn_tx {
|
||||
let conn_tx = conn_tx.clone();
|
||||
task::spawn(async move {
|
||||
if let Ok(reply) = conn_tx.send(Send { content, parent }).await {
|
||||
let _ = id_tx.send(reply.0.id);
|
||||
fn on_packet(&mut self, packet: ParsedPacket) {
|
||||
let instance_name = &self.instance.config().name;
|
||||
let data = ok_or_return!(&packet.content);
|
||||
match data {
|
||||
Data::BounceEvent(_) => {
|
||||
if let Some(password) = &self.instance.config().password {
|
||||
// Try to authenticate with the configured password, but no
|
||||
// promises if it doesn't work. In particular, we only ever
|
||||
// try this password once.
|
||||
let _ = self.auth(password.clone());
|
||||
}
|
||||
});
|
||||
}
|
||||
Data::DisconnectEvent(d) => {
|
||||
warn!("{instance_name}: disconnected for reason {:?}", d.reason);
|
||||
}
|
||||
Data::HelloEvent(_) => {}
|
||||
Data::JoinEvent(d) => {
|
||||
info!("{instance_name}: {:?} joined", d.0.name);
|
||||
}
|
||||
Data::LoginEvent(_) => {}
|
||||
Data::LogoutEvent(_) => {}
|
||||
Data::NetworkEvent(d) => {
|
||||
info!("{instance_name}: network event ({})", d.r#type);
|
||||
}
|
||||
Data::NickEvent(d) => {
|
||||
info!("{instance_name}: {:?} renamed to {:?}", d.from, d.to);
|
||||
}
|
||||
Data::EditMessageEvent(_) => {
|
||||
info!("{instance_name}: a message was edited");
|
||||
}
|
||||
Data::PartEvent(d) => {
|
||||
info!("{instance_name}: {:?} left", d.0.name);
|
||||
}
|
||||
Data::PingEvent(_) => {}
|
||||
Data::PmInitiateEvent(d) => {
|
||||
// TODO Show info popup and automatically join PM room
|
||||
info!(
|
||||
"{instance_name}: {:?} initiated a pm from &{}",
|
||||
d.from_nick, d.from_room
|
||||
);
|
||||
}
|
||||
Data::SendEvent(SendEvent(msg)) => {
|
||||
let own_user_id = self.own_user_id();
|
||||
if let Some(last_msg_id) = &mut self.last_msg_id {
|
||||
self.vault
|
||||
.add_msg(Box::new(msg.clone()), *last_msg_id, own_user_id);
|
||||
*last_msg_id = Some(msg.id);
|
||||
}
|
||||
}
|
||||
Data::SnapshotEvent(d) => {
|
||||
info!("{instance_name}: successfully joined");
|
||||
self.vault.join(Time::now());
|
||||
self.last_msg_id = Some(d.log.last().map(|m| m.id));
|
||||
self.vault.add_msgs(d.log.clone(), None, self.own_user_id());
|
||||
}
|
||||
Data::LogReply(d) => {
|
||||
self.vault
|
||||
.add_msgs(d.log.clone(), d.before, self.own_user_id());
|
||||
}
|
||||
Data::SendReply(SendReply(msg)) => {
|
||||
let own_user_id = self.own_user_id();
|
||||
if let Some(last_msg_id) = &mut self.last_msg_id {
|
||||
self.vault
|
||||
.add_msg(Box::new(msg.clone()), *last_msg_id, own_user_id);
|
||||
*last_msg_id = Some(msg.id);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn on_login(&self, email: String, password: String) {
|
||||
if let Some(conn_tx) = &self.conn_tx {
|
||||
let _ = conn_tx.send(Login {
|
||||
namespace: "email".to_string(),
|
||||
id: email,
|
||||
password,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn on_logout(&self) {
|
||||
if let Some(conn_tx) = &self.conn_tx {
|
||||
let _ = conn_tx.send(Logout);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Room {
|
||||
#[allow(dead_code)]
|
||||
canary: oneshot::Sender<Infallible>,
|
||||
event_tx: mpsc::UnboundedSender<Event>,
|
||||
}
|
||||
|
||||
impl Room {
|
||||
pub fn new(
|
||||
vault: EuphRoomVault,
|
||||
username: Option<String>,
|
||||
force_username: bool,
|
||||
password: Option<String>,
|
||||
) -> (Self, mpsc::UnboundedReceiver<EuphRoomEvent>) {
|
||||
let (canary_tx, canary_rx) = oneshot::channel();
|
||||
let (event_tx, event_rx) = mpsc::unbounded_channel();
|
||||
let (euph_room_event_tx, euph_room_event_rx) = mpsc::unbounded_channel();
|
||||
|
||||
// &rl2dev's message history is broken and requesting old messages past
|
||||
// a certain point results in errors. Cove should not keep retrying log
|
||||
// requests when hitting that limit, so &rl2dev is always opened in
|
||||
// ephemeral mode.
|
||||
let room_name = vault.room().to_string();
|
||||
let ephemeral = vault.vault().vault().ephemeral() || room_name == "rl2dev";
|
||||
|
||||
let state = State {
|
||||
name: vault.room().to_string(),
|
||||
username,
|
||||
force_username,
|
||||
password,
|
||||
vault,
|
||||
conn_tx: None,
|
||||
last_msg_id: None,
|
||||
requesting_logs: Arc::new(Mutex::new(false)),
|
||||
};
|
||||
|
||||
task::spawn(state.run(
|
||||
canary_rx,
|
||||
event_tx.clone(),
|
||||
event_rx,
|
||||
euph_room_event_tx,
|
||||
ephemeral,
|
||||
));
|
||||
|
||||
let new_room = Self {
|
||||
canary: canary_tx,
|
||||
event_tx,
|
||||
};
|
||||
(new_room, euph_room_event_rx)
|
||||
}
|
||||
|
||||
pub fn stopped(&self) -> bool {
|
||||
self.event_tx.is_closed()
|
||||
}
|
||||
|
||||
pub async fn state(&self) -> Result<Option<ConnState>, Error> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.event_tx
|
||||
.send(Event::State(tx))
|
||||
.map_err(|_| Error::Stopped)?;
|
||||
rx.await.map_err(|_| Error::Stopped)
|
||||
}
|
||||
|
||||
pub fn auth(&self, password: String) -> Result<(), Error> {
|
||||
self.event_tx
|
||||
.send(Event::Auth(password))
|
||||
.map_err(|_| Error::Stopped)
|
||||
let _ = self.conn_tx()?.send(Auth {
|
||||
r#type: AuthOption::Passcode,
|
||||
passcode: Some(password),
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn log(&self) -> Result<(), Error> {
|
||||
self.event_tx
|
||||
.send(Event::RequestLogs)
|
||||
.map_err(|_| Error::Stopped)
|
||||
let conn_tx_clone = self.conn_tx()?.clone();
|
||||
let vault_clone = self.vault.clone();
|
||||
tokio::task::spawn(async move { Self::request_logs(&vault_clone, &conn_tx_clone).await });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn nick(&self, name: String) -> Result<(), Error> {
|
||||
self.event_tx
|
||||
.send(Event::Nick(name))
|
||||
.map_err(|_| Error::Stopped)
|
||||
let _ = self.conn_tx()?.send(Nick { name });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn send(
|
||||
|
|
@ -533,22 +297,27 @@ impl Room {
|
|||
parent: Option<MessageId>,
|
||||
content: String,
|
||||
) -> Result<oneshot::Receiver<MessageId>, Error> {
|
||||
let (id_tx, id_rx) = oneshot::channel();
|
||||
self.event_tx
|
||||
.send(Event::Send(parent, content, id_tx))
|
||||
.map(|_| id_rx)
|
||||
.map_err(|_| Error::Stopped)
|
||||
let reply = self.conn_tx()?.send(Send { content, parent });
|
||||
let (tx, rx) = oneshot::channel();
|
||||
tokio::spawn(async move {
|
||||
if let Ok(reply) = reply.await {
|
||||
let _ = tx.send(reply.0.id);
|
||||
}
|
||||
});
|
||||
Ok(rx)
|
||||
}
|
||||
|
||||
pub fn login(&self, email: String, password: String) -> Result<(), Error> {
|
||||
self.event_tx
|
||||
.send(Event::Login { email, password })
|
||||
.map_err(|_| Error::Stopped)
|
||||
let _ = self.conn_tx()?.send(Login {
|
||||
namespace: "email".to_string(),
|
||||
id: email,
|
||||
password,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn logout(&self) -> Result<(), Error> {
|
||||
self.event_tx
|
||||
.send(Event::Logout)
|
||||
.map_err(|_| Error::Stopped)
|
||||
let _ = self.conn_tx()?.send(Logout);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue