Add debug logging to connection
This commit is contained in:
parent
bd74931ecd
commit
3f5ae51a41
3 changed files with 25 additions and 3 deletions
|
|
@ -19,6 +19,7 @@ Procedure when bumping the version number:
|
||||||
- Finding, replacing and removing emoji in text
|
- Finding, replacing and removing emoji in text
|
||||||
- `State` conversion utility methods
|
- `State` conversion utility methods
|
||||||
- `Time::new` constructor
|
- `Time::new` constructor
|
||||||
|
- Debug logging using the `log` crate
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
- Rewrite `conn` module (backwards-imcompatible)
|
- Rewrite `conn` module (backwards-imcompatible)
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ bot = []
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
futures-util = { version = "0.3.25", default-features = false, features = ["sink"] }
|
futures-util = { version = "0.3.25", default-features = false, features = ["sink"] }
|
||||||
|
log = "0.4.17"
|
||||||
serde = { version = "1.0.149", features = ["derive"] }
|
serde = { version = "1.0.149", features = ["derive"] }
|
||||||
serde_json = "1.0.89"
|
serde_json = "1.0.89"
|
||||||
time = { version = "0.3.17", features = ["serde"] }
|
time = { version = "0.3.17", features = ["serde"] }
|
||||||
|
|
|
||||||
26
src/conn.rs
26
src/conn.rs
|
|
@ -8,6 +8,7 @@ use std::{error, fmt, result};
|
||||||
|
|
||||||
use ::time::OffsetDateTime;
|
use ::time::OffsetDateTime;
|
||||||
use futures_util::SinkExt;
|
use futures_util::SinkExt;
|
||||||
|
use log::debug;
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
use tokio::select;
|
use tokio::select;
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
|
@ -170,20 +171,24 @@ impl Joined {
|
||||||
fn on_data(&mut self, data: &Data) {
|
fn on_data(&mut self, data: &Data) {
|
||||||
match data {
|
match data {
|
||||||
Data::JoinEvent(p) => {
|
Data::JoinEvent(p) => {
|
||||||
|
debug!("Updating listing after join-event");
|
||||||
self.listing
|
self.listing
|
||||||
.insert(p.0.session_id.clone(), SessionInfo::Full(p.0.clone()));
|
.insert(p.0.session_id.clone(), SessionInfo::Full(p.0.clone()));
|
||||||
}
|
}
|
||||||
Data::SendEvent(p) => {
|
Data::SendEvent(p) => {
|
||||||
|
debug!("Updating listing after send-event");
|
||||||
self.listing.insert(
|
self.listing.insert(
|
||||||
p.0.sender.session_id.clone(),
|
p.0.sender.session_id.clone(),
|
||||||
SessionInfo::Full(p.0.sender.clone()),
|
SessionInfo::Full(p.0.sender.clone()),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
Data::PartEvent(p) => {
|
Data::PartEvent(p) => {
|
||||||
|
debug!("Updating listing after part-event");
|
||||||
self.listing.remove(&p.0.session_id);
|
self.listing.remove(&p.0.session_id);
|
||||||
}
|
}
|
||||||
Data::NetworkEvent(p) => {
|
Data::NetworkEvent(p) => {
|
||||||
if p.r#type == "partition" {
|
if p.r#type == "partition" {
|
||||||
|
debug!("Updating listing after network-event with type partition");
|
||||||
self.listing.retain(|_, s| match s {
|
self.listing.retain(|_, s| match s {
|
||||||
SessionInfo::Full(s) => {
|
SessionInfo::Full(s) => {
|
||||||
s.server_id != p.server_id && s.server_era != p.server_era
|
s.server_id != p.server_id && s.server_era != p.server_era
|
||||||
|
|
@ -203,6 +208,7 @@ impl Joined {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Data::NickEvent(p) => {
|
Data::NickEvent(p) => {
|
||||||
|
debug!("Updating listing after nick-event");
|
||||||
self.listing
|
self.listing
|
||||||
.entry(p.session_id.clone())
|
.entry(p.session_id.clone())
|
||||||
.and_modify(|s| match s {
|
.and_modify(|s| match s {
|
||||||
|
|
@ -212,6 +218,7 @@ impl Joined {
|
||||||
.or_insert_with(|| SessionInfo::Partial(p.clone()));
|
.or_insert_with(|| SessionInfo::Partial(p.clone()));
|
||||||
}
|
}
|
||||||
Data::NickReply(p) => {
|
Data::NickReply(p) => {
|
||||||
|
debug!("Updating own session after nick-reply");
|
||||||
assert_eq!(self.session.id, p.id);
|
assert_eq!(self.session.id, p.id);
|
||||||
self.session.name = p.to.clone();
|
self.session.name = p.to.clone();
|
||||||
}
|
}
|
||||||
|
|
@ -400,7 +407,9 @@ impl Conn {
|
||||||
let msg = msg.ok_or(Error::ConnectionClosed)??;
|
let msg = msg.ok_or(Error::ConnectionClosed)??;
|
||||||
match msg {
|
match msg {
|
||||||
tungstenite::Message::Text(text) => {
|
tungstenite::Message::Text(text) => {
|
||||||
let packet = ParsedPacket::from_packet(serde_json::from_str(&text)?)?;
|
let packet = serde_json::from_str(&text)?;
|
||||||
|
debug!("Received {packet:?}");
|
||||||
|
let packet = ParsedPacket::from_packet(packet)?;
|
||||||
self.on_packet(&packet).await?;
|
self.on_packet(&packet).await?;
|
||||||
return Ok(Some(packet));
|
return Ok(Some(packet));
|
||||||
}
|
}
|
||||||
|
|
@ -422,6 +431,7 @@ impl Conn {
|
||||||
async fn on_packet(&mut self, packet: &ParsedPacket) -> Result<()> {
|
async fn on_packet(&mut self, packet: &ParsedPacket) -> Result<()> {
|
||||||
// Complete pending replies if the packet has an id
|
// Complete pending replies if the packet has an id
|
||||||
if let Some(id) = &packet.id {
|
if let Some(id) = &packet.id {
|
||||||
|
debug!("Resolving pending reply for id {id}");
|
||||||
self.replies.complete(id, packet.clone());
|
self.replies.complete(id, packet.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -490,11 +500,15 @@ impl Conn {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn on_ping(&mut self) -> Result<()> {
|
async fn on_ping(&mut self) -> Result<()> {
|
||||||
|
debug!("Checking ping replies and sending new pings");
|
||||||
|
|
||||||
// Check previous pings
|
// Check previous pings
|
||||||
if self.last_ws_ping_payload.is_some() && !self.last_ws_ping_replied_to {
|
if self.last_ws_ping_payload.is_some() && !self.last_ws_ping_replied_to {
|
||||||
|
debug!("Server did not respond to websocket ping, disconnecting");
|
||||||
self.disconnect().await?;
|
self.disconnect().await?;
|
||||||
}
|
}
|
||||||
if self.last_euph_ping_payload.is_some() && !self.last_euph_ping_replied_to {
|
if self.last_euph_ping_payload.is_some() && !self.last_euph_ping_replied_to {
|
||||||
|
debug!("Server did not respond to euph ping, disconnecting");
|
||||||
self.disconnect().await?;
|
self.disconnect().await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -533,6 +547,7 @@ impl Conn {
|
||||||
throttled: None,
|
throttled: None,
|
||||||
}
|
}
|
||||||
.into_packet()?;
|
.into_packet()?;
|
||||||
|
debug!("Sending {packet:?}");
|
||||||
|
|
||||||
let msg = tungstenite::Message::Text(serde_json::to_string(&packet)?);
|
let msg = tungstenite::Message::Text(serde_json::to_string(&packet)?);
|
||||||
self.ws.send(msg).await?;
|
self.ws.send(msg).await?;
|
||||||
|
|
@ -550,6 +565,7 @@ impl Conn {
|
||||||
throttled: None,
|
throttled: None,
|
||||||
}
|
}
|
||||||
.into_packet()?;
|
.into_packet()?;
|
||||||
|
debug!("Sending {packet:?}");
|
||||||
|
|
||||||
let msg = tungstenite::Message::Text(serde_json::to_string(&packet)?);
|
let msg = tungstenite::Message::Text(serde_json::to_string(&packet)?);
|
||||||
self.ws.send(msg).await?;
|
self.ws.send(msg).await?;
|
||||||
|
|
@ -558,7 +574,9 @@ impl Conn {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn disconnect(&mut self) -> Result<Infallible> {
|
async fn disconnect(&mut self) -> Result<Infallible> {
|
||||||
|
// TODO Maybe timeout this
|
||||||
let _ = self.ws.close(None).await;
|
let _ = self.ws.close(None).await;
|
||||||
|
debug!("Closed connection gracefully");
|
||||||
Err(Error::ConnectionClosed)
|
Err(Error::ConnectionClosed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -591,6 +609,7 @@ impl Conn {
|
||||||
) -> tungstenite::Result<(Self, Vec<HeaderValue>)> {
|
) -> tungstenite::Result<(Self, Vec<HeaderValue>)> {
|
||||||
let human = if human { "?h=1" } else { "" };
|
let human = if human { "?h=1" } else { "" };
|
||||||
let uri = format!("wss://{domain}/room/{room}/ws{human}");
|
let uri = format!("wss://{domain}/room/{room}/ws{human}");
|
||||||
|
debug!("Connecting to {uri} with cookies: {cookies:?}");
|
||||||
let mut request = uri.into_client_request().expect("valid request");
|
let mut request = uri.into_client_request().expect("valid request");
|
||||||
if let Some(cookies) = cookies {
|
if let Some(cookies) = cookies {
|
||||||
request.headers_mut().append(header::COOKIE, cookies);
|
request.headers_mut().append(header::COOKIE, cookies);
|
||||||
|
|
@ -598,11 +617,12 @@ impl Conn {
|
||||||
|
|
||||||
let (ws, response) = tokio_tungstenite::connect_async(request).await?;
|
let (ws, response) = tokio_tungstenite::connect_async(request).await?;
|
||||||
let (mut parts, _) = response.into_parts();
|
let (mut parts, _) = response.into_parts();
|
||||||
let set_cookies = match parts.headers.entry(header::SET_COOKIE) {
|
let cookies_set = match parts.headers.entry(header::SET_COOKIE) {
|
||||||
header::Entry::Occupied(entry) => entry.remove_entry_mult().1.collect(),
|
header::Entry::Occupied(entry) => entry.remove_entry_mult().1.collect(),
|
||||||
header::Entry::Vacant(_) => vec![],
|
header::Entry::Vacant(_) => vec![],
|
||||||
};
|
};
|
||||||
|
debug!("Received cookies {cookies_set:?}");
|
||||||
let rx = Self::wrap(ws, timeout);
|
let rx = Self::wrap(ws, timeout);
|
||||||
Ok((rx, set_cookies))
|
Ok((rx, cookies_set))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue