Add logging to instance
This commit is contained in:
parent
f243e2243d
commit
4365d14923
1 changed files with 29 additions and 7 deletions
|
|
@ -3,17 +3,17 @@
|
||||||
//! See [`Instance`] for more details.
|
//! See [`Instance`] for more details.
|
||||||
|
|
||||||
// TODO Cookies
|
// TODO Cookies
|
||||||
// TODO Logging
|
|
||||||
|
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use log::{debug, warn};
|
||||||
use tokio::select;
|
use tokio::select;
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
|
||||||
use crate::api::packet::ParsedPacket;
|
use crate::api::packet::ParsedPacket;
|
||||||
use crate::api::{Auth, AuthOption, Data, Nick};
|
use crate::api::{Auth, AuthOption, Data, Nick};
|
||||||
use crate::conn::{Conn, ConnTx, State};
|
use crate::conn::{self, Conn, ConnTx, State};
|
||||||
|
|
||||||
const EUPH_DOMAIN: &str = "euphoria.io";
|
const EUPH_DOMAIN: &str = "euphoria.io";
|
||||||
const TIMEOUT: Duration = Duration::from_secs(30);
|
const TIMEOUT: Duration = Duration::from_secs(30);
|
||||||
|
|
@ -121,6 +121,7 @@ impl Instance {
|
||||||
F: FnMut(Event) -> Fut + Send + 'static,
|
F: FnMut(Event) -> Fut + Send + 'static,
|
||||||
Fut: Future<Output = Result<(), ()>> + Send + 'static,
|
Fut: Future<Output = Result<(), ()>> + Send + 'static,
|
||||||
{
|
{
|
||||||
|
debug!("{}: Created with config {config:?}", config.name);
|
||||||
let (request_tx, request_rx) = mpsc::unbounded_channel();
|
let (request_tx, request_rx) = mpsc::unbounded_channel();
|
||||||
tokio::spawn(Self::run::<F, Fut>(config.clone(), on_event, request_rx));
|
tokio::spawn(Self::run::<F, Fut>(config.clone(), on_event, request_rx));
|
||||||
Self { config, request_tx }
|
Self { config, request_tx }
|
||||||
|
|
@ -144,8 +145,14 @@ impl Instance {
|
||||||
F: FnMut(Event) -> Fut,
|
F: FnMut(Event) -> Fut,
|
||||||
Fut: Future<Output = Result<(), ()>>,
|
Fut: Future<Output = Result<(), ()>>,
|
||||||
{
|
{
|
||||||
|
// TODO Only delay reconnecting if previous reconnect attempt failed
|
||||||
loop {
|
loop {
|
||||||
Self::run_once::<F, Fut>(&config, &mut on_event, &mut request_rx).await;
|
Self::run_once::<F, Fut>(&config, &mut on_event, &mut request_rx).await;
|
||||||
|
debug!(
|
||||||
|
"{}: Waiting {} seconds before reconnecting",
|
||||||
|
config.name,
|
||||||
|
RECONNECT.as_secs()
|
||||||
|
);
|
||||||
tokio::time::sleep(RECONNECT).await;
|
tokio::time::sleep(RECONNECT).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -159,26 +166,35 @@ impl Instance {
|
||||||
F: FnMut(Event) -> Fut,
|
F: FnMut(Event) -> Fut,
|
||||||
Fut: Future<Output = Result<(), ()>>,
|
Fut: Future<Output = Result<(), ()>>,
|
||||||
{
|
{
|
||||||
|
debug!("{}: Connecting...", config.name);
|
||||||
let (mut conn, _) =
|
let (mut conn, _) =
|
||||||
Conn::connect(&config.domain, &config.room, config.human, None, TIMEOUT)
|
Conn::connect(&config.domain, &config.room, config.human, None, TIMEOUT)
|
||||||
.await
|
.await
|
||||||
.ok()?;
|
.ok()?;
|
||||||
let conn_tx = conn.tx().clone();
|
let conn_tx = conn.tx().clone();
|
||||||
|
|
||||||
select! {
|
let result = select! {
|
||||||
_ = Self::receive::<F, Fut>(config, &mut conn, on_event) => (),
|
r = Self::receive::<F, Fut>(config, &mut conn, on_event) => r,
|
||||||
_ = Self::handle_requests(request_rx, &conn_tx) => (),
|
_ = Self::handle_requests(request_rx, &conn_tx) => Ok(()),
|
||||||
|
};
|
||||||
|
if let Err(err) = result {
|
||||||
|
if matches!(err, conn::Error::ConnectionClosed) {
|
||||||
|
debug!("{}: Connection closed, reconnecting", config.name);
|
||||||
|
} else {
|
||||||
|
warn!("{}: An error occurred, reconnecting: {err}", config.name);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Some(())
|
Some(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn receive<F, Fut>(config: &Config, conn: &mut Conn, on_event: &mut F)
|
async fn receive<F, Fut>(config: &Config, conn: &mut Conn, on_event: &mut F) -> conn::Result<()>
|
||||||
where
|
where
|
||||||
F: FnMut(Event) -> Fut,
|
F: FnMut(Event) -> Fut,
|
||||||
Fut: Future<Output = Result<(), ()>>,
|
Fut: Future<Output = Result<(), ()>>,
|
||||||
{
|
{
|
||||||
while let Ok(packet) = conn.recv().await {
|
loop {
|
||||||
|
let packet = conn.recv().await?;
|
||||||
let event = Event {
|
let event = Event {
|
||||||
packet,
|
packet,
|
||||||
snapshot: Snapshot {
|
snapshot: Snapshot {
|
||||||
|
|
@ -191,18 +207,21 @@ impl Instance {
|
||||||
match &event.packet.content {
|
match &event.packet.content {
|
||||||
Ok(Data::SnapshotEvent(_)) => {
|
Ok(Data::SnapshotEvent(_)) => {
|
||||||
if let Some(username) = &config.username {
|
if let Some(username) = &config.username {
|
||||||
|
debug!("{}: Setting nick to username {}", config.name, username);
|
||||||
let name = username.to_string();
|
let name = username.to_string();
|
||||||
let _ = conn.tx().send(Nick { name });
|
let _ = conn.tx().send(Nick { name });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(Data::BounceEvent(_)) => {
|
Ok(Data::BounceEvent(_)) => {
|
||||||
if let Some(password) = &config.password {
|
if let Some(password) = &config.password {
|
||||||
|
debug!("{}: Authenticating with password", config.name);
|
||||||
let cmd = Auth {
|
let cmd = Auth {
|
||||||
r#type: AuthOption::Passcode,
|
r#type: AuthOption::Passcode,
|
||||||
passcode: Some(password.to_string()),
|
passcode: Some(password.to_string()),
|
||||||
};
|
};
|
||||||
let _ = conn.tx().send(cmd);
|
let _ = conn.tx().send(cmd);
|
||||||
} else {
|
} else {
|
||||||
|
warn!("{}: Auth required but no password configured", config.name);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -210,9 +229,12 @@ impl Instance {
|
||||||
}
|
}
|
||||||
|
|
||||||
if on_event(event).await.is_err() {
|
if on_event(event).await.is_err() {
|
||||||
|
warn!("{}: on_event handler returned Err(())", config.name);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_requests(
|
async fn handle_requests(
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue