Remove other packages from workspace

This commit is contained in:
Joscha 2022-06-23 12:19:28 +02:00
parent 00b1f91f71
commit 1cc7dd8920
14 changed files with 5 additions and 1255 deletions

197
Cargo.lock generated
View file

@ -39,17 +39,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi",
"libc",
"winapi",
]
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.1.0" version = "1.1.0"
@ -68,15 +57,6 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "block-buffer"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [
"generic-array",
]
[[package]] [[package]]
name = "block-buffer" name = "block-buffer"
version = "0.10.2" version = "0.10.2"
@ -146,40 +126,6 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"
[[package]]
name = "cove-core"
version = "0.1.0"
dependencies = [
"futures",
"hex",
"log",
"rand",
"serde",
"serde_json",
"sha2",
"thiserror",
"tokio",
"tokio-stream",
"tokio-tungstenite 0.16.1",
]
[[package]]
name = "cove-server"
version = "0.1.0"
dependencies = [
"anyhow",
"cove-core",
"env_logger",
"futures",
"log",
"rand",
"serde_json",
"thiserror",
"tokio",
"tokio-stream",
"tokio-tungstenite 0.16.1",
]
[[package]] [[package]]
name = "cove-tui" name = "cove-tui"
version = "0.1.0" version = "0.1.0"
@ -198,7 +144,7 @@ dependencies = [
"serde_json", "serde_json",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-tungstenite 0.17.1", "tokio-tungstenite",
"toss", "toss",
] ]
@ -246,22 +192,13 @@ dependencies = [
"typenum", "typenum",
] ]
[[package]]
name = "digest"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
dependencies = [
"generic-array",
]
[[package]] [[package]]
name = "digest" name = "digest"
version = "0.10.3" version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506" checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506"
dependencies = [ dependencies = [
"block-buffer 0.10.2", "block-buffer",
"crypto-common", "crypto-common",
] ]
@ -301,19 +238,6 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]]
name = "env_logger"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b2cf0344971ee6c64c31be0d530793fba457d322dfec2810c453d0ef228f9c3"
dependencies = [
"atty",
"humantime",
"log",
"regex",
"termcolor",
]
[[package]] [[package]]
name = "fallible-iterator" name = "fallible-iterator"
version = "0.2.0" version = "0.2.0"
@ -488,15 +412,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "http" name = "http"
version = "0.2.8" version = "0.2.8"
@ -514,12 +429,6 @@ version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "496ce29bb5a52785b44e0f7ca2847ae0bb839c9bd28f69acac9b99d461c0c04c" checksum = "496ce29bb5a52785b44e0f7ca2847ae0bb839c9bd28f69acac9b99d461c0c04c"
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]] [[package]]
name = "idna" name = "idna"
version = "0.2.3" version = "0.2.3"
@ -655,12 +564,6 @@ version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225" checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225"
[[package]]
name = "opaque-debug"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]] [[package]]
name = "openssl-probe" name = "openssl-probe"
version = "0.1.5" version = "0.1.5"
@ -964,19 +867,6 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "sha-1"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6"
dependencies = [
"block-buffer 0.9.0",
"cfg-if",
"cpufeatures",
"digest 0.9.0",
"opaque-debug",
]
[[package]] [[package]]
name = "sha-1" name = "sha-1"
version = "0.10.0" version = "0.10.0"
@ -985,18 +875,7 @@ checksum = "028f48d513f9678cda28f6e4064755b3fbb2af6acd672f2c209b62323f7aea0f"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"cpufeatures", "cpufeatures",
"digest 0.10.3", "digest",
]
[[package]]
name = "sha2"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55deaec60f81eefe3cce0dc50bda92d6d8e88f2a27df7c5033b42afeb1ed2676"
dependencies = [
"cfg-if",
"cpufeatures",
"digest 0.10.3",
] ]
[[package]] [[package]]
@ -1082,15 +961,6 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "termcolor"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755"
dependencies = [
"winapi-util",
]
[[package]] [[package]]
name = "thiserror" name = "thiserror"
version = "1.0.31" version = "1.0.31"
@ -1179,33 +1049,6 @@ dependencies = [
"webpki", "webpki",
] ]
[[package]]
name = "tokio-stream"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-tungstenite"
version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e80b39df6afcc12cdf752398ade96a6b9e99c903dfdc36e53ad10b9c366bca72"
dependencies = [
"futures-util",
"log",
"rustls",
"rustls-native-certs",
"tokio",
"tokio-rustls",
"tungstenite 0.16.0",
"webpki",
]
[[package]] [[package]]
name = "tokio-tungstenite" name = "tokio-tungstenite"
version = "0.17.1" version = "0.17.1"
@ -1218,7 +1061,7 @@ dependencies = [
"rustls-native-certs", "rustls-native-certs",
"tokio", "tokio",
"tokio-rustls", "tokio-rustls",
"tungstenite 0.17.2", "tungstenite",
"webpki", "webpki",
] ]
@ -1233,27 +1076,6 @@ dependencies = [
"unicode-width", "unicode-width",
] ]
[[package]]
name = "tungstenite"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ad3713a14ae247f22a728a0456a545df14acf3867f905adff84be99e23b3ad1"
dependencies = [
"base64",
"byteorder",
"bytes",
"http",
"httparse",
"log",
"rand",
"rustls",
"sha-1 0.9.8",
"thiserror",
"url",
"utf-8",
"webpki",
]
[[package]] [[package]]
name = "tungstenite" name = "tungstenite"
version = "0.17.2" version = "0.17.2"
@ -1268,7 +1090,7 @@ dependencies = [
"log", "log",
"rand", "rand",
"rustls", "rustls",
"sha-1 0.10.0", "sha-1",
"thiserror", "thiserror",
"url", "url",
"utf-8", "utf-8",
@ -1472,15 +1294,6 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi",
]
[[package]] [[package]]
name = "winapi-x86_64-pc-windows-gnu" name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0" version = "0.4.0"

View file

@ -1,7 +1,5 @@
[workspace] [workspace]
members = [ members = [
"cove-core",
"cove-server",
"cove-tui", "cove-tui",
] ]

View file

@ -1,19 +0,0 @@
[package]
name = "cove-core"
version = "0.1.0"
edition = "2021"
[dependencies]
futures = "0.3.21"
hex = { version = "0.4.3", features = ["serde"] }
log = "0.4.14"
rand = "0.8.4"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.78"
sha2 = "0.10.1"
thiserror = "1.0.30"
tokio = { version = "1.16.1", features = ["full"] }
tokio-stream = "0.1.8"
tokio-tungstenite = { version = "0.16.1", features = [
"rustls-tls-native-roots",
] }

View file

@ -1,182 +0,0 @@
use std::sync::Arc;
use std::time::Duration;
use std::{fmt, io, result};
use futures::stream::{SplitSink, SplitStream};
use futures::StreamExt;
use log::debug;
use rand::Rng;
use tokio::net::TcpStream;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::sync::Mutex;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_tungstenite::tungstenite::{self, Message};
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use crate::packets::Packet;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("IO error: {0}")]
Io(#[from] io::Error),
#[error("WS error: {0}")]
Ws(#[from] tungstenite::Error),
#[error("MPSC error: {0}")]
Mpsc(#[from] mpsc::error::SendError<Message>),
#[error("Serde error: {0}")]
Serde(#[from] serde_json::Error),
#[error("client did not pong")]
NoPong,
#[error("illegal binary packet")]
IllegalBinaryPacket,
}
pub type Result<T> = result::Result<T, Error>;
type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
#[derive(Clone)]
pub struct ConnTx {
tx: UnboundedSender<Message>,
}
impl fmt::Debug for ConnTx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConnTx").finish_non_exhaustive()
}
}
impl ConnTx {
pub fn send(&self, packet: &Packet) -> Result<()> {
let str = serde_json::to_string(packet).expect("unserializable packet");
debug!("↑ {}", str.trim()); // TODO Format somewhat nicer?
self.tx.send(Message::Text(str))?;
Ok(())
}
}
pub struct ConnRx {
ws_rx: SplitStream<WsStream>,
last_ping_payload: Arc<Mutex<Vec<u8>>>,
}
impl fmt::Debug for ConnRx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConnRx").finish_non_exhaustive()
}
}
impl ConnRx {
pub async fn recv(&mut self) -> Result<Option<Packet>> {
loop {
let msg = match self.ws_rx.next().await {
None => return Ok(None),
Some(msg) => msg?,
};
let str = match msg {
Message::Text(str) => str,
Message::Pong(payload) => {
*self.last_ping_payload.lock().await = payload;
continue;
}
Message::Ping(_) => {
// Tungstenite automatically replies to pings
continue;
}
Message::Binary(_) => return Err(Error::IllegalBinaryPacket),
Message::Close(_) => return Ok(None),
};
let packet = serde_json::from_str(&str)?;
debug!("↓ {}", str.trim()); // TODO Format somewhat nicer?
return Ok(Some(packet));
}
}
}
pub struct ConnMaintenance {
// Shoveling packets into the WS connection
rx: UnboundedReceiver<Message>,
ws_tx: SplitSink<WsStream, Message>,
// Pinging and ponging
tx: UnboundedSender<Message>,
ping_delay: Duration,
last_ping_payload: Arc<Mutex<Vec<u8>>>,
}
impl fmt::Debug for ConnMaintenance {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConnMaintenance").finish_non_exhaustive()
}
}
impl ConnMaintenance {
pub async fn perform(self) -> Result<()> {
let result = tokio::try_join!(
Self::shovel(self.rx, self.ws_tx),
Self::ping_pong(self.tx, self.ping_delay, self.last_ping_payload)
);
result.map(|_| ())
}
async fn shovel(
rx: UnboundedReceiver<Message>,
ws_tx: SplitSink<WsStream, Message>,
) -> Result<()> {
UnboundedReceiverStream::new(rx)
.map(Ok)
.forward(ws_tx)
.await?;
Ok(())
}
async fn ping_pong(
tx: UnboundedSender<Message>,
ping_delay: Duration,
last_ping_payload: Arc<Mutex<Vec<u8>>>,
) -> Result<()> {
let mut payload = [0u8; 8];
rand::thread_rng().fill(&mut payload);
tx.send(Message::Ping(payload.to_vec()))?;
tokio::time::sleep(ping_delay).await;
loop {
{
let last_payload = last_ping_payload.lock().await;
if (&payload as &[u8]) != (&last_payload as &[u8]) {
return Err(Error::NoPong);
}
};
rand::thread_rng().fill(&mut payload);
tx.send(Message::Ping(payload.to_vec()))?;
tokio::time::sleep(ping_delay).await;
}
}
}
pub fn new(stream: WsStream, ping_delay: Duration) -> (ConnTx, ConnRx, ConnMaintenance) {
let (ws_tx, ws_rx) = stream.split();
let (tx, rx) = mpsc::unbounded_channel();
let last_ping_payload = Arc::new(Mutex::new(vec![]));
let conn_tx = ConnTx { tx: tx.clone() };
let conn_rx = ConnRx {
ws_rx,
last_ping_payload: last_ping_payload.clone(),
};
let conn_maintenance = ConnMaintenance {
ws_tx,
rx,
tx,
ping_delay,
last_ping_payload,
};
(conn_tx, conn_rx, conn_maintenance)
}

View file

@ -1,33 +0,0 @@
use std::fmt;
use hex::ToHex;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use crate::macros::id_alias;
// TODO Use base64 representation instead
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize)]
struct Id(#[serde(with = "hex")] [u8; 32]);
impl Id {
fn of(str: &str) -> Self {
let mut hasher = Sha256::new();
hasher.update(str);
Self(hasher.finalize().into())
}
}
// TODO Impl better fmt::Debug for Id and aliases
impl fmt::Display for Id {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0.encode_hex::<String>())
}
}
// Prevent misuse of one id as another by only making the aliases public.
id_alias!(MessageId);
id_alias!(SessionId);
id_alias!(Identity);

View file

@ -1,13 +0,0 @@
#![warn(clippy::use_self)]
pub mod conn;
mod id;
mod macros;
mod message;
pub mod packets;
pub mod replies;
mod session;
pub use self::id::*;
pub use self::message::*;
pub use self::session::*;

View file

@ -1,106 +0,0 @@
// Use `pub(crate) use <macro_name>` to make a macro importable from elsewhere.
// See https://stackoverflow.com/a/31749071
macro_rules! id_alias {
($name:ident) => {
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize,
)]
pub struct $name(Id);
impl $name {
pub fn of(str: &str) -> Self {
Self(Id::of(str))
}
}
impl fmt::Display for $name {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
};
}
pub(crate) use id_alias;
macro_rules! packets {
(
$( cmd $cmdName:ident($cmd:ident, $rpl:ident), )* // Commands with reply
$( ntf $ntfName:ident($ntf:ident), )* // Notifications
) => {
#[derive(Debug, Deserialize, Serialize)]
#[serde(tag = "name", content = "data")]
pub enum Cmd {
$( $cmdName($cmd), )*
}
$(
impl std::convert::TryFrom<Cmd> for $cmd {
type Error = ();
fn try_from(cmd: Cmd) -> Result<Self, Self::Error> {
match cmd {
Cmd::$cmdName(val) => Ok(val),
_ => Err(()),
}
}
}
impl From<$cmd> for Cmd {
fn from(cmd: $cmd) -> Self {
Self::$cmdName(cmd)
}
}
)*
#[derive(Debug, Deserialize, Serialize)]
#[serde(tag = "name", content = "data")]
pub enum Rpl {
$( $cmdName($rpl), )*
}
$(
impl std::convert::TryFrom<Rpl> for $rpl {
type Error = ();
fn try_from(rpl: Rpl) -> Result<Self, Self::Error> {
match rpl {
Rpl::$cmdName(val) => Ok(val),
_ => Err(()),
}
}
}
impl From<$rpl> for Rpl {
fn from(rpl: $rpl) -> Self {
Self::$cmdName(rpl)
}
}
)*
#[derive(Debug, Deserialize, Serialize)]
#[serde(tag = "name", content = "data")]
pub enum Ntf {
$( $ntfName($ntf), )*
}
$(
impl std::convert::TryFrom<Ntf> for $ntf {
type Error = ();
fn try_from(ntf: Ntf) -> Result<Self, Self::Error> {
match ntf {
Ntf::$ntfName(val) => Ok(val),
_ => Err(()),
}
}
}
impl From<$ntf> for Ntf {
fn from(ntf: $ntf) -> Self {
Self::$ntfName(ntf)
}
}
)*
};
}
pub(crate) use packets;

View file

@ -1,29 +0,0 @@
use serde::{Deserialize, Serialize};
use crate::{Identity, MessageId};
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Message {
pub time: u128,
pub pred: MessageId,
pub parent: Option<MessageId>,
pub identity: Identity,
pub nick: String,
pub content: String,
}
impl Message {
pub fn id(&self) -> MessageId {
let time = self.time;
let pred = self.pred;
let parent = match self.parent {
Some(id) => format!("{id}"),
None => "none".to_string(),
};
let identity = self.identity;
let nick = MessageId::of(&self.nick);
let content = MessageId::of(&self.content);
let str = format!("message {time} {pred} {parent} {identity} {nick} {content}");
MessageId::of(&str)
}
}

View file

@ -1,144 +0,0 @@
use serde::{Deserialize, Serialize};
use crate::macros::packets;
use crate::{Message, MessageId, Session};
#[derive(Debug, Deserialize, Serialize)]
pub struct RoomCmd {
pub name: String,
}
#[derive(Debug, Deserialize, Serialize)]
pub enum RoomRpl {
Success,
InvalidRoom { reason: String },
}
#[derive(Debug, Deserialize, Serialize)]
pub struct IdentifyCmd {
pub nick: String,
pub identity: String,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(tag = "type")]
pub enum IdentifyRpl {
Success {
you: Session,
others: Vec<Session>,
last_message: MessageId,
},
InvalidNick {
reason: String,
},
InvalidIdentity {
reason: String,
},
}
#[derive(Debug, Deserialize, Serialize)]
pub struct NickCmd {
pub nick: String,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(tag = "type")]
pub enum NickRpl {
Success { you: Session },
InvalidNick { reason: String },
}
#[derive(Debug, Deserialize, Serialize)]
pub struct SendCmd {
pub parent: Option<MessageId>,
pub content: String,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(tag = "type")]
pub enum SendRpl {
Success { message: Message },
InvalidContent { reason: String },
}
#[derive(Debug, Deserialize, Serialize)]
pub struct WhoCmd {}
#[derive(Debug, Deserialize, Serialize)]
pub struct WhoRpl {
pub you: Session,
pub others: Vec<Session>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct JoinNtf {
pub who: Session,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct NickNtf {
pub who: Session,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct PartNtf {
pub who: Session,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct SendNtf {
pub message: Message,
}
// Create a Cmd enum for all commands, a Rpl enum for all replies and a Ntf enum
// for all notifications, as well as TryFrom impls for the individual structs.
packets! {
cmd Room(RoomCmd, RoomRpl),
cmd Identify(IdentifyCmd, IdentifyRpl),
cmd Nick(NickCmd, NickRpl),
cmd Send(SendCmd, SendRpl),
cmd Who(WhoCmd, WhoRpl),
ntf Join(JoinNtf),
ntf Nick(NickNtf),
ntf Part(PartNtf),
ntf Send(SendNtf),
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(tag = "type")]
pub enum Packet {
Cmd {
id: u64,
#[serde(flatten)]
cmd: Cmd,
},
Rpl {
id: u64,
#[serde(flatten)]
rpl: Rpl,
},
Ntf {
#[serde(flatten)]
ntf: Ntf,
},
}
impl Packet {
pub fn cmd<C: Into<Cmd>>(id: u64, cmd: C) -> Self {
Self::Cmd {
id,
cmd: cmd.into(),
}
}
pub fn rpl<R: Into<Rpl>>(id: u64, rpl: R) -> Self {
Self::Rpl {
id,
rpl: rpl.into(),
}
}
pub fn ntf<N: Into<Ntf>>(ntf: N) -> Self {
Self::Ntf { ntf: ntf.into() }
}
}

View file

@ -1,70 +0,0 @@
use std::collections::HashMap;
use std::hash::Hash;
use std::result;
use std::time::Duration;
use tokio::sync::oneshot::{self, Receiver, Sender};
use tokio::time;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("timed out")]
TimedOut,
#[error("canceled")]
Canceled,
}
pub type Result<T> = result::Result<T, Error>;
pub struct PendingReply<R> {
timeout: Duration,
result: Receiver<R>,
}
impl<R> PendingReply<R> {
pub async fn get(self) -> Result<R> {
let result = time::timeout(self.timeout, self.result).await;
match result {
Err(_) => Err(Error::TimedOut),
Ok(Err(_)) => Err(Error::Canceled),
Ok(Ok(value)) => Ok(value),
}
}
}
pub struct Replies<I, R> {
timeout: Duration,
pending: HashMap<I, Sender<R>>,
}
impl<I: Eq + Hash, R> Replies<I, R> {
pub fn new(timeout: Duration) -> Self {
Self {
timeout,
pending: HashMap::new(),
}
}
pub fn wait_for(&mut self, id: I) -> PendingReply<R> {
let (tx, rx) = oneshot::channel();
self.pending.insert(id, tx);
PendingReply {
timeout: self.timeout,
result: rx,
}
}
pub fn complete(&mut self, id: &I, result: R) {
if let Some(tx) = self.pending.remove(id) {
let _ = tx.send(result);
}
}
pub fn cancel(&mut self, id: &I) {
self.pending.remove(id);
}
pub fn purge(&mut self) {
self.pending.retain(|_, tx| !tx.is_closed());
}
}

View file

@ -1,10 +0,0 @@
use serde::{Deserialize, Serialize};
use crate::{Identity, SessionId};
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Session {
pub id: SessionId,
pub nick: String,
pub identity: Identity,
}

View file

@ -1,17 +0,0 @@
[package]
name = "cove-server"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.53"
cove-core = { path = "../cove-core" }
env_logger = "0.9.0"
futures = "0.3.21"
log = "0.4.14"
rand = "0.8.4"
serde_json = "1.0.78"
thiserror = "1.0.30"
tokio = { version = "1.16.1", features = ["full"] }
tokio-stream = "0.1.8"
tokio-tungstenite = "0.16.1"

View file

@ -1,378 +0,0 @@
// TODO Logging
#![warn(clippy::use_self)]
mod util;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, bail};
use cove_core::conn::{self, ConnMaintenance, ConnRx, ConnTx};
use cove_core::packets::{
Cmd, IdentifyCmd, IdentifyRpl, JoinNtf, NickCmd, NickNtf, NickRpl, Packet, PartNtf, RoomCmd,
RoomRpl, SendCmd, SendNtf, SendRpl, WhoCmd, WhoRpl,
};
use cove_core::{Identity, Message, MessageId, Session, SessionId};
use log::{info, warn};
use rand::Rng;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Mutex;
use tokio_tungstenite::MaybeTlsStream;
#[derive(Debug, Clone)]
struct Client {
session: Session,
send: ConnTx,
}
#[derive(Debug)]
struct Room {
name: String,
clients: HashMap<SessionId, Client>,
last_message: MessageId,
last_timestamp: u128,
}
impl Room {
fn new(name: String) -> Self {
Self {
name,
clients: HashMap::new(),
last_message: MessageId::of(&format!("{}", rand::thread_rng().gen::<u64>())),
last_timestamp: util::timestamp(),
}
}
fn client(&self, id: SessionId) -> &Client {
self.clients.get(&id).expect("invalid session id")
}
fn client_mut(&mut self, id: SessionId) -> &mut Client {
self.clients.get_mut(&id).expect("invalid session id")
}
fn notify_all(&self, packet: &Packet) {
for client in self.clients.values() {
let _ = client.send.send(packet);
}
}
fn notify_except(&self, id: SessionId, packet: &Packet) {
for client in self.clients.values() {
if client.session.id != id {
let _ = client.send.send(packet);
}
}
}
fn join(&mut self, client: Client) {
if self.clients.contains_key(&client.session.id) {
// Session ids are generated randomly and a collision should be very
// unlikely.
panic!("duplicated session id");
}
self.notify_all(&Packet::ntf(JoinNtf {
who: client.session.clone(),
}));
self.clients.insert(client.session.id, client);
}
fn part(&mut self, id: SessionId) {
let client = self.clients.remove(&id).expect("invalid session id");
self.notify_all(&Packet::ntf(PartNtf {
who: client.session,
}));
}
fn nick(&mut self, id: SessionId, nick: String) {
let who = {
let client = self.client_mut(id);
client.session.nick = nick;
client.session.clone()
};
self.notify_except(id, &Packet::ntf(NickNtf { who }))
}
fn send(&mut self, id: SessionId, parent: Option<MessageId>, content: String) -> Message {
let client = &self.clients[&id];
self.last_timestamp = util::timestamp_after(self.last_timestamp);
let message = Message {
time: self.last_timestamp,
pred: self.last_message,
parent,
identity: client.session.identity,
nick: client.session.nick.clone(),
content,
};
self.last_message = message.id();
info!(
"&{} now at {} ({})",
self.name, self.last_message, self.last_timestamp
);
self.notify_except(
id,
&Packet::ntf(SendNtf {
message: message.clone(),
}),
);
message
}
fn who(&self, id: SessionId) -> (Session, Vec<Session>) {
let session = self.client(id).session.clone();
let others = self
.clients
.values()
.filter(|client| client.session.id != id)
.map(|client| client.session.clone())
.collect();
(session, others)
}
}
#[derive(Debug)]
struct ServerSession {
tx: ConnTx,
rx: ConnRx,
room: Arc<Mutex<Room>>,
session: Session,
}
impl ServerSession {
async fn handle_nick(&mut self, id: u64, cmd: NickCmd) -> anyhow::Result<()> {
if let Some(reason) = util::check_nick(&cmd.nick) {
self.tx
.send(&Packet::rpl(id, NickRpl::InvalidNick { reason }))?;
return Ok(());
}
self.session.nick = cmd.nick.clone();
self.tx.send(&Packet::rpl(
id,
NickRpl::Success {
you: self.session.clone(),
},
))?;
self.room.lock().await.nick(self.session.id, cmd.nick);
Ok(())
}
async fn handle_send(&mut self, id: u64, cmd: SendCmd) -> anyhow::Result<()> {
if let Some(reason) = util::check_content(&cmd.content) {
self.tx
.send(&Packet::rpl(id, SendRpl::InvalidContent { reason }))?;
return Ok(());
}
let message = self
.room
.lock()
.await
.send(self.session.id, cmd.parent, cmd.content);
self.tx
.send(&Packet::rpl(id, SendRpl::Success { message }))?;
Ok(())
}
async fn handle_who(&mut self, id: u64, _cmd: WhoCmd) -> anyhow::Result<()> {
let (you, others) = self.room.lock().await.who(self.session.id);
self.tx.send(&Packet::rpl(id, WhoRpl { you, others }))?;
Ok(())
}
async fn handle_packet(&mut self, packet: Packet) -> anyhow::Result<()> {
match packet {
Packet::Cmd { id, cmd } => match cmd {
Cmd::Room(_) => Err(anyhow!("unexpected Room cmd")),
Cmd::Identify(_) => Err(anyhow!("unexpected Identify cmd")),
Cmd::Nick(cmd) => self.handle_nick(id, cmd).await,
Cmd::Send(cmd) => self.handle_send(id, cmd).await,
Cmd::Who(cmd) => self.handle_who(id, cmd).await,
},
Packet::Rpl { .. } => Err(anyhow!("unexpected rpl")),
Packet::Ntf { .. } => Err(anyhow!("unexpected ntf")),
}
}
async fn run(&mut self) -> anyhow::Result<()> {
while let Some(packet) = self.rx.recv().await? {
self.handle_packet(packet).await?;
}
Ok(())
}
}
#[derive(Debug, Clone)]
struct Server {
rooms: Arc<Mutex<HashMap<String, Arc<Mutex<Room>>>>>,
}
impl Server {
fn new() -> Self {
Self {
rooms: Arc::new(Mutex::new(HashMap::new())),
}
}
async fn room(&self, name: String) -> Arc<Mutex<Room>> {
self.rooms
.lock()
.await
.entry(name.clone())
.or_insert_with(|| Arc::new(Mutex::new(Room::new(name))))
.clone()
}
async fn negotiate_room(tx: &ConnTx, rx: &mut ConnRx) -> anyhow::Result<String> {
loop {
match rx.recv().await? {
Some(Packet::Cmd {
id,
cmd: Cmd::Room(RoomCmd { name }),
}) => {
if let Some(reason) = util::check_room(&name) {
tx.send(&Packet::rpl(id, RoomRpl::InvalidRoom { reason }))?;
continue;
}
tx.send(&Packet::rpl(id, RoomRpl::Success))?;
return Ok(name);
}
Some(_) => bail!("invalid packet during room negotiation"),
None => bail!("connection closed during room negotiation"),
}
}
}
async fn negotiate_identity(tx: &ConnTx, rx: &mut ConnRx) -> anyhow::Result<(u64, Session)> {
loop {
match rx.recv().await? {
Some(Packet::Cmd {
id,
cmd: Cmd::Identify(IdentifyCmd { nick, identity }),
}) => {
if let Some(reason) = util::check_identity(&identity) {
tx.send(&Packet::rpl(id, IdentifyRpl::InvalidNick { reason }))?;
continue;
}
if let Some(reason) = util::check_nick(&nick) {
tx.send(&Packet::rpl(id, IdentifyRpl::InvalidNick { reason }))?;
continue;
}
let session = Session {
id: SessionId::of(&format!("{}", rand::thread_rng().gen::<u64>())),
nick,
identity: Identity::of(&identity),
};
return Ok((id, session));
}
Some(_) => bail!("invalid packet during room negotiation"),
None => bail!("connection closed during room negotiation"),
}
}
}
fn welcome(id: u64, you: Session, room: &Room, tx: &ConnTx) -> anyhow::Result<()> {
let others = room
.clients
.values()
.map(|client| client.session.clone())
.collect::<Vec<_>>();
let last_message = room.last_message;
tx.send(&Packet::rpl(
id,
IdentifyRpl::Success {
you,
others,
last_message,
},
))?;
Ok(())
}
async fn greet(&self, tx: ConnTx, mut rx: ConnRx) -> anyhow::Result<ServerSession> {
let room = Self::negotiate_room(&tx, &mut rx).await?;
let (id, session) = Self::negotiate_identity(&tx, &mut rx).await?;
let room = self.room(room).await;
{
let mut room = room.lock().await;
// Reply to successful identify command in the same lock as joining
// the room so the client doesn' miss any messages.
Self::welcome(id, session.clone(), &*room, &tx)?;
// Join room only after welcome so current session is not yet
// present in room during welcome.
room.join(Client {
session: session.clone(),
send: tx.clone(),
});
}
Ok(ServerSession {
tx,
rx,
room,
session,
})
}
async fn greet_and_run(&self, tx: ConnTx, rx: ConnRx) -> anyhow::Result<()> {
let mut session = self.greet(tx, rx).await?;
let result = session.run().await;
session.room.lock().await.part(session.session.id);
result
}
/// Wrapper for [`ConnMaintenance::perform`] so it returns an
/// [`anyhow::Result`].
async fn maintain(maintenance: ConnMaintenance) -> anyhow::Result<()> {
maintenance.perform().await?;
Ok(())
}
async fn handle_conn(&self, stream: TcpStream) -> anyhow::Result<()> {
let stream = MaybeTlsStream::Plain(stream);
let stream = tokio_tungstenite::accept_async(stream).await?;
let (tx, rx, maintenance) = conn::new(stream, Duration::from_secs(10));
tokio::try_join!(self.greet_and_run(tx, rx), Self::maintain(maintenance))?;
Ok(())
}
async fn on_conn(self, stream: TcpStream) -> anyhow::Result<()> {
let peer_addr = stream.peer_addr()?;
info!("<{peer_addr}> Connected");
if let Err(e) = self.handle_conn(stream).await {
warn!("<{peer_addr}> Err: {e}");
}
info!("<{peer_addr}> Disconnected");
Ok(())
}
}
#[tokio::main]
async fn main() {
env_logger::init();
let server = Server::new();
let listener = TcpListener::bind(("::0", 40080)).await.unwrap();
while let Ok((stream, _)) = listener.accept().await {
tokio::spawn(server.clone().on_conn(stream));
}
}

View file

@ -1,60 +0,0 @@
use std::cmp;
use std::time::{SystemTime, UNIX_EPOCH};
pub fn timestamp() -> u128 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("executed after 1970")
.as_millis()
}
pub fn timestamp_after(previous: u128) -> u128 {
cmp::max(timestamp(), previous + 1)
}
pub fn check_room(room: &str) -> Option<String> {
if room.is_empty() {
return Some("is empty".to_string());
}
if !room.is_ascii() {
return Some("contains non-ascii characters".to_string());
}
if room.len() > 1024 {
return Some("contains more than 1024 characters".to_string());
}
if !room
.chars()
.all(|c| c == '-' || c == '.' || ('a'..='z').contains(&c))
{
return Some("must only contain a-z, '-' and '_'".to_string());
}
None
}
pub fn check_nick(nick: &str) -> Option<String> {
if nick.is_empty() {
return Some("is empty".to_string());
}
if nick.trim().is_empty() {
return Some("contains only whitespace".to_string());
}
let nick = nick.trim();
if nick.chars().count() > 1024 {
return Some("contains more than 1024 characters".to_string());
}
None
}
pub fn check_identity(identity: &str) -> Option<String> {
if identity.chars().count() > 32 * 1024 {
return Some("contains more than 32768 characters".to_string());
}
None
}
pub fn check_content(content: &str) -> Option<String> {
if content.chars().count() > 128 * 1024 {
return Some("contains more than 131072 characters".to_string());
}
None
}