Add a few debug logs
This commit is contained in:
parent
56a4b8c362
commit
bd61530b5f
5 changed files with 122 additions and 6 deletions
|
|
@ -1,10 +1,12 @@
|
|||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::{fmt, result};
|
||||
use std::{fmt, io, result};
|
||||
|
||||
use cove_core::packets::Packet;
|
||||
use futures::stream::{SplitSink, SplitStream};
|
||||
use futures::StreamExt;
|
||||
use log::debug;
|
||||
use rand::Rng;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
|
||||
|
|
@ -15,6 +17,8 @@ use tokio_tungstenite::WebSocketStream;
|
|||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum Error {
|
||||
#[error("IO error: {0}")]
|
||||
Io(#[from] io::Error),
|
||||
#[error("WS error: {0}")]
|
||||
Ws(#[from] tungstenite::Error),
|
||||
#[error("MPSC error: {0}")]
|
||||
|
|
@ -31,6 +35,7 @@ pub type Result<T> = result::Result<T, Error>;
|
|||
|
||||
#[derive(Clone)]
|
||||
pub struct ConnTx {
|
||||
peer_addr: SocketAddr,
|
||||
tx: UnboundedSender<Message>,
|
||||
}
|
||||
|
||||
|
|
@ -43,12 +48,15 @@ impl fmt::Debug for ConnTx {
|
|||
impl ConnTx {
|
||||
pub fn send(&self, packet: &Packet) -> Result<()> {
|
||||
let str = serde_json::to_string(packet).expect("unserializable packet");
|
||||
// TODO Format somewhat nicer?
|
||||
debug!("<{}> ↑ {}", self.peer_addr, str.trim());
|
||||
self.tx.send(Message::Text(str))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ConnRx {
|
||||
peer_addr: SocketAddr,
|
||||
ws_rx: SplitStream<WebSocketStream<TcpStream>>,
|
||||
last_ping_payload: Arc<Mutex<Vec<u8>>>,
|
||||
}
|
||||
|
|
@ -66,6 +74,7 @@ impl ConnRx {
|
|||
None => return Ok(None),
|
||||
Some(msg) => msg?,
|
||||
};
|
||||
|
||||
let str = match msg {
|
||||
Message::Text(str) => str,
|
||||
Message::Pong(payload) => {
|
||||
|
|
@ -79,7 +88,12 @@ impl ConnRx {
|
|||
Message::Binary(_) => return Err(Error::IllegalBinaryPacket),
|
||||
Message::Close(_) => return Ok(None),
|
||||
};
|
||||
|
||||
let packet = serde_json::from_str(&str)?;
|
||||
|
||||
// TODO Format somewhat nicer?
|
||||
debug!("<{}> ↓ {}", self.peer_addr, str.trim());
|
||||
|
||||
return Ok(Some(packet));
|
||||
}
|
||||
}
|
||||
|
|
@ -142,6 +156,7 @@ impl ConnMaintenance {
|
|||
|
||||
rand::thread_rng().fill(&mut payload);
|
||||
tx.send(Message::Ping(payload.to_vec()))?;
|
||||
|
||||
tokio::time::sleep(ping_delay).await;
|
||||
}
|
||||
}
|
||||
|
|
@ -151,12 +166,18 @@ pub fn new(
|
|||
stream: WebSocketStream<TcpStream>,
|
||||
ping_delay: Duration,
|
||||
) -> Result<(ConnTx, ConnRx, ConnMaintenance)> {
|
||||
let peer_addr = stream.get_ref().peer_addr()?;
|
||||
|
||||
let (ws_tx, ws_rx) = stream.split();
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
let last_ping_payload = Arc::new(Mutex::new(vec![]));
|
||||
|
||||
let conn_tx = ConnTx { tx: tx.clone() };
|
||||
let conn_tx = ConnTx {
|
||||
peer_addr,
|
||||
tx: tx.clone(),
|
||||
};
|
||||
let conn_rx = ConnRx {
|
||||
peer_addr,
|
||||
ws_rx,
|
||||
last_ping_payload: last_ping_payload.clone(),
|
||||
};
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ use cove_core::packets::{
|
|||
SendRpl, WhoCmd, WhoRpl,
|
||||
};
|
||||
use cove_core::{Identity, Message, MessageId, Session, SessionId};
|
||||
use log::{info, warn};
|
||||
use rand::Rng;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::sync::Mutex;
|
||||
|
|
@ -255,7 +256,8 @@ impl Server {
|
|||
id,
|
||||
cmd: Cmd::Hello(cmd),
|
||||
}) => (id, cmd),
|
||||
_ => return Err(anyhow!("not a Hello command")),
|
||||
Some(_) => return Err(anyhow!("not a Hello packet")),
|
||||
None => return Err(anyhow!("connection closed during greeting")),
|
||||
};
|
||||
|
||||
if let Some((room, session)) = self.handle_hello(&tx, id, cmd).await? {
|
||||
|
|
@ -312,17 +314,30 @@ impl Server {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn on_conn(self, stream: TcpStream) -> anyhow::Result<()> {
|
||||
println!("Connection from {}", stream.peer_addr().unwrap());
|
||||
let stream = tokio_tungstenite::accept_async(stream).await.unwrap();
|
||||
async fn handle_conn(&self, stream: TcpStream) -> anyhow::Result<()> {
|
||||
let stream = tokio_tungstenite::accept_async(stream).await?;
|
||||
let (tx, rx, maintenance) = conn::new(stream, Duration::from_secs(10))?;
|
||||
tokio::try_join!(self.greet_and_run(tx, rx), Self::maintain(maintenance))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn on_conn(self, stream: TcpStream) -> anyhow::Result<()> {
|
||||
let peer_addr = stream.peer_addr()?;
|
||||
info!("<{peer_addr}> Connected");
|
||||
|
||||
if let Err(e) = self.handle_conn(stream).await {
|
||||
warn!("<{peer_addr}> Err: {e}");
|
||||
}
|
||||
|
||||
info!("<{peer_addr}> Disconnected");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
env_logger::init();
|
||||
|
||||
let server = Server::new();
|
||||
let listener = TcpListener::bind(("::0", 40080)).await.unwrap();
|
||||
while let Ok((stream, _)) = listener.accept().await {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue