From 6cc86702914e22249e10d00ccff5ba6c61708210 Mon Sep 17 00:00:00 2001 From: Joscha Date: Sat, 12 Feb 2022 00:54:27 +0100 Subject: [PATCH] Start answering hello commands --- Cargo.lock | 19 +++ cove-core/src/macros.rs | 18 +++ cove-core/src/packets.rs | 20 +++ cove-server/Cargo.toml | 2 + cove-server/src/main.rs | 330 ++++++++++++++++++--------------------- 5 files changed, 207 insertions(+), 182 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 764f856..2443bd1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "anyhow" +version = "1.0.53" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94a45b455c14666b85fc40a019e8ab9eb75e3a124e05494f5397122bc9eb06e0" + [[package]] name = "base64" version = "0.13.0" @@ -63,10 +69,12 @@ dependencies = [ name = "cove-server" version = "0.1.0" dependencies = [ + "anyhow", "cove-core", "futures", "serde_json", "tokio", + "tokio-stream", "tokio-tungstenite", ] @@ -667,6 +675,17 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-stream" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-tungstenite" version = "0.16.1" diff --git a/cove-core/src/macros.rs b/cove-core/src/macros.rs index cd21ea8..d1a58ad 100644 --- a/cove-core/src/macros.rs +++ b/cove-core/src/macros.rs @@ -43,6 +43,12 @@ macro_rules! packets { } } } + + impl From<$cmd> for Cmd { + fn from(cmd: $cmd) -> Self { + Self::$cmdName(cmd) + } + } )* #[derive(Debug, Deserialize, Serialize)] @@ -61,6 +67,12 @@ macro_rules! packets { } } } + + impl From<$rpl> for Rpl { + fn from(rpl: $rpl) -> Self { + Self::$cmdName(rpl) + } + } )* #[derive(Debug, Deserialize, Serialize)] @@ -79,6 +91,12 @@ macro_rules! packets { } } } + + impl From<$ntf> for Ntf { + fn from(ntf: $ntf) -> Self { + Self::$ntfName(ntf) + } + } )* }; } diff --git a/cove-core/src/packets.rs b/cove-core/src/packets.rs index 6afa502..7b27f08 100644 --- a/cove-core/src/packets.rs +++ b/cove-core/src/packets.rs @@ -115,3 +115,23 @@ pub enum Packet { ntf: Ntf, }, } + +impl Packet { + pub fn cmd>(id: u64, cmd: C) -> Self { + Self::Cmd { + id, + cmd: cmd.into(), + } + } + + pub fn rpl>(id: u64, rpl: R) -> Self { + Self::Rpl { + id, + rpl: rpl.into(), + } + } + + pub fn ntf>(ntf: N) -> Self { + Self::Ntf { ntf: ntf.into() } + } +} diff --git a/cove-server/Cargo.toml b/cove-server/Cargo.toml index 75798c0..70fe4c2 100644 --- a/cove-server/Cargo.toml +++ b/cove-server/Cargo.toml @@ -4,8 +4,10 @@ version = "0.1.0" edition = "2021" [dependencies] +anyhow = "1.0.53" cove-core = { path = "../cove-core" } futures = "0.3.21" serde_json = "1.0.78" tokio = { version = "1.16.1", features = ["full"] } +tokio-stream = "0.1.8" tokio-tungstenite = "0.16.1" diff --git a/cove-server/src/main.rs b/cove-server/src/main.rs index fd548c0..1a2c814 100644 --- a/cove-server/src/main.rs +++ b/cove-server/src/main.rs @@ -1,187 +1,153 @@ -use cove_core::packets::{ - Cmd, HelloCmd, HelloRpl, JoinNtf, NickCmd, NickNtf, NickRpl, Ntf, Packet, PartNtf, Rpl, - SendCmd, SendNtf, SendRpl, WhoCmd, WhoRpl, -}; -use cove_core::{Identity, Message, MessageId, Session, SessionId}; -use futures::{future, StreamExt, TryStreamExt}; +use std::collections::HashMap; +use std::hash::Hash; +use std::sync::Arc; + +use anyhow::anyhow; +use cove_core::packets::{Cmd, HelloRpl, Packet, Rpl}; +use cove_core::{Identity, MessageId, Session, SessionId}; +use futures::stream::{SplitSink, SplitStream}; +use futures::{future, Sink, SinkExt, Stream, StreamExt, TryStreamExt}; use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::{self, UnboundedSender}; +use tokio::sync::{self, Mutex, RwLock}; +use tokio_tungstenite::tungstenite::Message as TkMessage; +use tokio_tungstenite::WebSocketStream; + +#[derive(Debug)] +struct Client { + session: Session, + packets: UnboundedSender, +} + +#[derive(Debug)] +struct Room { + clients: HashMap, + last_message: MessageId, + last_timestamp: u128, +} + +#[derive(Debug, Clone)] +struct Server { + rooms: Arc>>>>, +} + +impl Server { + fn new() -> Self { + Self { + rooms: Arc::new(RwLock::new(HashMap::new())), + } + } + + async fn recv(rx: &mut SplitStream>) -> anyhow::Result { + loop { + let msg = rx.next().await.ok_or(anyhow!("connection closed"))??; + let str = match msg { + TkMessage::Text(str) => str, + TkMessage::Ping(_) | TkMessage::Pong(_) => continue, + TkMessage::Binary(_) => return Err(anyhow!("invalid binary packet")), + TkMessage::Close(_) => return Err(anyhow!("connection closed")), + }; + break Ok(serde_json::from_str(&str)?); + } + } + + async fn send( + tx: &mut SplitSink, TkMessage>, + packet: &Packet, + ) -> anyhow::Result<()> { + let str = serde_json::to_string(packet).expect("serialisable packet"); + let msg = TkMessage::Text(str); + tx.feed(msg).await?; + tx.flush().await?; + Ok(()) + } + + fn check_room(room: &str) -> Option { + if !room.is_empty() { + return Some("is empty".to_string()); + } + if !room.is_ascii() { + return Some("contains non-ascii characters".to_string()); + } + if room.len() > 1024 { + return Some("contains more than 1024 characters".to_string()); + } + if !room + .chars() + .all(|c| c == '-' || c == '.' || ('a'..='z').contains(&c)) + { + return Some("must only contain a-z, '-' and '_'".to_string()); + } + None + } + + fn check_nick(nick: &str) -> Option { + if !nick.is_empty() { + return Some("is empty".to_string()); + } + if !nick.trim().is_empty() { + return Some("contains only whitespace".to_string()); + } + let nick = nick.trim(); + if nick.chars().count() > 1024 { + return Some("contains more than 1024 characters".to_string()); + } + None + } + + fn check_identity(identity: &str) -> Option { + if identity.chars().count() > 32768 { + return Some("contains more than 32768 characters".to_string()); + } + None + } + + async fn greet( + &self, + tx: &mut SplitSink, TkMessage>, + rx: &mut SplitStream>, + ) -> anyhow::Result<(String, String, Identity, u64)> { + let packet = Self::recv(rx).await?; + let (id, cmd) = match packet { + Packet::Cmd { + id, + cmd: Cmd::Hello(cmd), + } => (id, cmd), + _ => return Err(anyhow!("not a hello command")), + }; + if let Some(reason) = Self::check_room(&cmd.room) { + Self::send(tx, &Packet::rpl(id, HelloRpl::InvalidRoom { reason })).await?; + return Err(anyhow!("invalid room")); + } + if let Some(reason) = Self::check_nick(&cmd.nick) { + Self::send(tx, &Packet::rpl(id, HelloRpl::InvalidNick { reason })).await?; + return Err(anyhow!("invalid nick")); + } + if let Some(reason) = Self::check_identity(&cmd.identity) { + Self::send(tx, &Packet::rpl(id, HelloRpl::InvalidNick { reason })).await?; + return Err(anyhow!("invalid identity")); + } + let identity = Identity::of(&cmd.identity); + Ok((cmd.room, cmd.nick, identity, id)) + } + + async fn on_conn(self, stream: TcpStream) { + println!("Connection from {}", stream.peer_addr().unwrap()); + let stream = tokio_tungstenite::accept_async(stream).await.unwrap(); + let (mut tx, mut rx) = stream.split(); + let (room, nick, identity, id) = match self.greet(&mut tx, &mut rx).await { + Ok(info) => info, + Err(_) => return, + }; + todo!() + } +} #[tokio::main] async fn main() { - let session = Session { - id: SessionId::of("12345"), - nick: "Garmy".to_string(), - identity: Identity::of("random garbage"), - }; - let message = Message { - pred: MessageId::of("pred"), - parent: None, - identity: Identity::of("asd"), - nick: "Foo".to_string(), - content: "Bar".to_string(), - }; - println!( - "{}", - serde_json::to_string_pretty(&Packet::Cmd { - id: 12345, - cmd: Cmd::Hello(HelloCmd { - room: "welcome".to_string(), - nick: "Garmy".to_string(), - identity: "random garbage".to_string() - }) - }) - .unwrap() - ); - println!( - "{}", - serde_json::to_string_pretty(&Packet::Rpl { - id: 67890, - rpl: Rpl::Hello(HelloRpl::Success { - you: session.clone(), - others: vec![], - last_message: MessageId::of("Blarg") - }) - }) - .unwrap() - ); - println!( - "{}", - serde_json::to_string_pretty(&Packet::Rpl { - id: 67890, - rpl: Rpl::Hello(HelloRpl::InvalidNick { - reason: "foo".to_string() - }) - }) - .unwrap() - ); - println!( - "{}", - serde_json::to_string_pretty(&Packet::Cmd { - id: 12345, - cmd: Cmd::Nick(NickCmd { - nick: "Garmelon".to_string() - }) - }) - .unwrap() - ); - println!( - "{}", - serde_json::to_string_pretty(&Packet::Rpl { - id: 67890, - rpl: Rpl::Nick(NickRpl::Success) - }) - .unwrap() - ); - println!( - "{}", - serde_json::to_string_pretty(&Packet::Rpl { - id: 67890, - rpl: Rpl::Nick(NickRpl::InvalidNick { - reason: "foo".to_string() - }) - }) - .unwrap() - ); - println!( - "{}", - serde_json::to_string_pretty(&Packet::Cmd { - id: 12345, - cmd: Cmd::Send(SendCmd { - parent: None, - // parent: Some(MessageId::of("Booh!")), - content: "Hello world!".to_string() - }) - }) - .unwrap() - ); - println!( - "{}", - serde_json::to_string_pretty(&Packet::Rpl { - id: 67890, - rpl: Rpl::Send(SendRpl::Success { - message: message.clone() - }) - }) - .unwrap() - ); - println!( - "{}", - serde_json::to_string_pretty(&Packet::Rpl { - id: 67890, - rpl: Rpl::Send(SendRpl::InvalidContent { - reason: "foo".to_string() - }) - }) - .unwrap() - ); - println!( - "{}", - serde_json::to_string_pretty(&Packet::Cmd { - id: 12345, - cmd: Cmd::Who(WhoCmd {}) - }) - .unwrap() - ); - println!( - "{}", - serde_json::to_string_pretty(&Packet::Rpl { - id: 67890, - rpl: Rpl::Who(WhoRpl { - you: session.clone(), - others: vec![] - }) - }) - .unwrap() - ); - println!( - "{}", - serde_json::to_string_pretty(&Packet::Ntf { - ntf: Ntf::Join(JoinNtf { - who: session.clone() - }) - }) - .unwrap() - ); - println!( - "{}", - serde_json::to_string_pretty(&Packet::Ntf { - ntf: Ntf::Nick(NickNtf { - who: session.clone() - }) - }) - .unwrap() - ); - println!( - "{}", - serde_json::to_string_pretty(&Packet::Ntf { - ntf: Ntf::Part(PartNtf { - who: session.clone() - }) - }) - .unwrap() - ); - println!( - "{}", - serde_json::to_string_pretty(&Packet::Ntf { - ntf: Ntf::Send(SendNtf { - message: message.clone() - }) - }) - .unwrap() - ); - // let listener = TcpListener::bind(("::0", 40080)).await.unwrap(); - // while let Ok((stream, _)) = listener.accept().await { - // tokio::spawn(conn(stream)); - // } -} - -async fn conn(stream: TcpStream) { - println!("Connection from {}", stream.peer_addr().unwrap()); - let stream = tokio_tungstenite::accept_async(stream).await.unwrap(); - let (write, read) = stream.split(); - read.try_filter(|msg| future::ready(msg.is_text() || msg.is_binary())) - .forward(write) - .await - .unwrap(); + let server = Server::new(); + let listener = TcpListener::bind(("::0", 40080)).await.unwrap(); + while let Ok((stream, _)) = listener.accept().await { + tokio::spawn(server.clone().on_conn(stream)); + } }