Start answering hello commands

This commit is contained in:
Joscha 2022-02-12 00:54:27 +01:00
parent fec541b7aa
commit 6cc8670291
5 changed files with 207 additions and 182 deletions

View file

@ -4,8 +4,10 @@ version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.53"
cove-core = { path = "../cove-core" }
futures = "0.3.21"
serde_json = "1.0.78"
tokio = { version = "1.16.1", features = ["full"] }
tokio-stream = "0.1.8"
tokio-tungstenite = "0.16.1"

View file

@ -1,187 +1,153 @@
use cove_core::packets::{
Cmd, HelloCmd, HelloRpl, JoinNtf, NickCmd, NickNtf, NickRpl, Ntf, Packet, PartNtf, Rpl,
SendCmd, SendNtf, SendRpl, WhoCmd, WhoRpl,
};
use cove_core::{Identity, Message, MessageId, Session, SessionId};
use futures::{future, StreamExt, TryStreamExt};
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;
use anyhow::anyhow;
use cove_core::packets::{Cmd, HelloRpl, Packet, Rpl};
use cove_core::{Identity, MessageId, Session, SessionId};
use futures::stream::{SplitSink, SplitStream};
use futures::{future, Sink, SinkExt, Stream, StreamExt, TryStreamExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::{self, UnboundedSender};
use tokio::sync::{self, Mutex, RwLock};
use tokio_tungstenite::tungstenite::Message as TkMessage;
use tokio_tungstenite::WebSocketStream;
#[derive(Debug)]
struct Client {
session: Session,
packets: UnboundedSender<Packet>,
}
#[derive(Debug)]
struct Room {
clients: HashMap<SessionId, Client>,
last_message: MessageId,
last_timestamp: u128,
}
#[derive(Debug, Clone)]
struct Server {
rooms: Arc<RwLock<HashMap<String, Arc<Mutex<Room>>>>>,
}
impl Server {
fn new() -> Self {
Self {
rooms: Arc::new(RwLock::new(HashMap::new())),
}
}
async fn recv(rx: &mut SplitStream<WebSocketStream<TcpStream>>) -> anyhow::Result<Packet> {
loop {
let msg = rx.next().await.ok_or(anyhow!("connection closed"))??;
let str = match msg {
TkMessage::Text(str) => str,
TkMessage::Ping(_) | TkMessage::Pong(_) => continue,
TkMessage::Binary(_) => return Err(anyhow!("invalid binary packet")),
TkMessage::Close(_) => return Err(anyhow!("connection closed")),
};
break Ok(serde_json::from_str(&str)?);
}
}
async fn send(
tx: &mut SplitSink<WebSocketStream<TcpStream>, TkMessage>,
packet: &Packet,
) -> anyhow::Result<()> {
let str = serde_json::to_string(packet).expect("serialisable packet");
let msg = TkMessage::Text(str);
tx.feed(msg).await?;
tx.flush().await?;
Ok(())
}
fn check_room(room: &str) -> Option<String> {
if !room.is_empty() {
return Some("is empty".to_string());
}
if !room.is_ascii() {
return Some("contains non-ascii characters".to_string());
}
if room.len() > 1024 {
return Some("contains more than 1024 characters".to_string());
}
if !room
.chars()
.all(|c| c == '-' || c == '.' || ('a'..='z').contains(&c))
{
return Some("must only contain a-z, '-' and '_'".to_string());
}
None
}
fn check_nick(nick: &str) -> Option<String> {
if !nick.is_empty() {
return Some("is empty".to_string());
}
if !nick.trim().is_empty() {
return Some("contains only whitespace".to_string());
}
let nick = nick.trim();
if nick.chars().count() > 1024 {
return Some("contains more than 1024 characters".to_string());
}
None
}
fn check_identity(identity: &str) -> Option<String> {
if identity.chars().count() > 32768 {
return Some("contains more than 32768 characters".to_string());
}
None
}
async fn greet(
&self,
tx: &mut SplitSink<WebSocketStream<TcpStream>, TkMessage>,
rx: &mut SplitStream<WebSocketStream<TcpStream>>,
) -> anyhow::Result<(String, String, Identity, u64)> {
let packet = Self::recv(rx).await?;
let (id, cmd) = match packet {
Packet::Cmd {
id,
cmd: Cmd::Hello(cmd),
} => (id, cmd),
_ => return Err(anyhow!("not a hello command")),
};
if let Some(reason) = Self::check_room(&cmd.room) {
Self::send(tx, &Packet::rpl(id, HelloRpl::InvalidRoom { reason })).await?;
return Err(anyhow!("invalid room"));
}
if let Some(reason) = Self::check_nick(&cmd.nick) {
Self::send(tx, &Packet::rpl(id, HelloRpl::InvalidNick { reason })).await?;
return Err(anyhow!("invalid nick"));
}
if let Some(reason) = Self::check_identity(&cmd.identity) {
Self::send(tx, &Packet::rpl(id, HelloRpl::InvalidNick { reason })).await?;
return Err(anyhow!("invalid identity"));
}
let identity = Identity::of(&cmd.identity);
Ok((cmd.room, cmd.nick, identity, id))
}
async fn on_conn(self, stream: TcpStream) {
println!("Connection from {}", stream.peer_addr().unwrap());
let stream = tokio_tungstenite::accept_async(stream).await.unwrap();
let (mut tx, mut rx) = stream.split();
let (room, nick, identity, id) = match self.greet(&mut tx, &mut rx).await {
Ok(info) => info,
Err(_) => return,
};
todo!()
}
}
#[tokio::main]
async fn main() {
let session = Session {
id: SessionId::of("12345"),
nick: "Garmy".to_string(),
identity: Identity::of("random garbage"),
};
let message = Message {
pred: MessageId::of("pred"),
parent: None,
identity: Identity::of("asd"),
nick: "Foo".to_string(),
content: "Bar".to_string(),
};
println!(
"{}",
serde_json::to_string_pretty(&Packet::Cmd {
id: 12345,
cmd: Cmd::Hello(HelloCmd {
room: "welcome".to_string(),
nick: "Garmy".to_string(),
identity: "random garbage".to_string()
})
})
.unwrap()
);
println!(
"{}",
serde_json::to_string_pretty(&Packet::Rpl {
id: 67890,
rpl: Rpl::Hello(HelloRpl::Success {
you: session.clone(),
others: vec![],
last_message: MessageId::of("Blarg")
})
})
.unwrap()
);
println!(
"{}",
serde_json::to_string_pretty(&Packet::Rpl {
id: 67890,
rpl: Rpl::Hello(HelloRpl::InvalidNick {
reason: "foo".to_string()
})
})
.unwrap()
);
println!(
"{}",
serde_json::to_string_pretty(&Packet::Cmd {
id: 12345,
cmd: Cmd::Nick(NickCmd {
nick: "Garmelon".to_string()
})
})
.unwrap()
);
println!(
"{}",
serde_json::to_string_pretty(&Packet::Rpl {
id: 67890,
rpl: Rpl::Nick(NickRpl::Success)
})
.unwrap()
);
println!(
"{}",
serde_json::to_string_pretty(&Packet::Rpl {
id: 67890,
rpl: Rpl::Nick(NickRpl::InvalidNick {
reason: "foo".to_string()
})
})
.unwrap()
);
println!(
"{}",
serde_json::to_string_pretty(&Packet::Cmd {
id: 12345,
cmd: Cmd::Send(SendCmd {
parent: None,
// parent: Some(MessageId::of("Booh!")),
content: "Hello world!".to_string()
})
})
.unwrap()
);
println!(
"{}",
serde_json::to_string_pretty(&Packet::Rpl {
id: 67890,
rpl: Rpl::Send(SendRpl::Success {
message: message.clone()
})
})
.unwrap()
);
println!(
"{}",
serde_json::to_string_pretty(&Packet::Rpl {
id: 67890,
rpl: Rpl::Send(SendRpl::InvalidContent {
reason: "foo".to_string()
})
})
.unwrap()
);
println!(
"{}",
serde_json::to_string_pretty(&Packet::Cmd {
id: 12345,
cmd: Cmd::Who(WhoCmd {})
})
.unwrap()
);
println!(
"{}",
serde_json::to_string_pretty(&Packet::Rpl {
id: 67890,
rpl: Rpl::Who(WhoRpl {
you: session.clone(),
others: vec![]
})
})
.unwrap()
);
println!(
"{}",
serde_json::to_string_pretty(&Packet::Ntf {
ntf: Ntf::Join(JoinNtf {
who: session.clone()
})
})
.unwrap()
);
println!(
"{}",
serde_json::to_string_pretty(&Packet::Ntf {
ntf: Ntf::Nick(NickNtf {
who: session.clone()
})
})
.unwrap()
);
println!(
"{}",
serde_json::to_string_pretty(&Packet::Ntf {
ntf: Ntf::Part(PartNtf {
who: session.clone()
})
})
.unwrap()
);
println!(
"{}",
serde_json::to_string_pretty(&Packet::Ntf {
ntf: Ntf::Send(SendNtf {
message: message.clone()
})
})
.unwrap()
);
// let listener = TcpListener::bind(("::0", 40080)).await.unwrap();
// while let Ok((stream, _)) = listener.accept().await {
// tokio::spawn(conn(stream));
// }
}
async fn conn(stream: TcpStream) {
println!("Connection from {}", stream.peer_addr().unwrap());
let stream = tokio_tungstenite::accept_async(stream).await.unwrap();
let (write, read) = stream.split();
read.try_filter(|msg| future::ready(msg.is_text() || msg.is_binary()))
.forward(write)
.await
.unwrap();
let server = Server::new();
let listener = TcpListener::bind(("::0", 40080)).await.unwrap();
while let Ok((stream, _)) = listener.accept().await {
tokio::spawn(server.clone().on_conn(stream));
}
}