Change euph packet representation

This commit is contained in:
Joscha 2022-06-22 20:32:55 +02:00
parent 03c1fe7f34
commit a4a8174ea3
8 changed files with 506 additions and 415 deletions

View file

@ -1,13 +1,13 @@
//! Models the euphoria API at <http://api.euphoria.io/>.
mod events;
mod packet;
pub mod packet;
mod room_cmds;
mod session_cmds;
mod types;
pub use events::*;
pub use packet::*;
pub use packet::Data;
pub use room_cmds::*;
pub use session_cmds::*;
pub use types::*;

View file

@ -2,10 +2,7 @@
use serde::{Deserialize, Serialize};
use super::{
has_packet_type, AuthOption, HasPacketType, Message, PacketType, PersonalAccountView,
SessionView, Snowflake, Time, UserId,
};
use super::{AuthOption, Message, PersonalAccountView, SessionView, Snowflake, Time, UserId};
/// Indicates that access to a room is denied.
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -20,8 +17,6 @@ pub struct BounceEvent {
pub ip: Option<String>,
}
has_packet_type!(BounceEvent);
/// Indicates that the session is being closed. The client will subsequently be
/// disconnected.
///
@ -33,8 +28,6 @@ pub struct DisconnectEvent {
pub reason: String,
}
has_packet_type!(DisconnectEvent);
/// Sent by the server to the client when a session is started.
///
/// It includes information about the client's authentication and associated
@ -58,14 +51,10 @@ pub struct HelloEvent {
pub version: String,
}
has_packet_type!(HelloEvent);
/// Indicates a session just joined the room.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JoinEvent(pub SessionView);
has_packet_type!(JoinEvent);
/// Sent to all sessions of an agent when that agent is logged in (except for
/// the session that issued the login command).
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -73,15 +62,11 @@ pub struct LoginEvent {
pub account_id: Snowflake,
}
has_packet_type!(LoginEvent);
/// Sent to all sessions of an agent when that agent is logged out (except for
/// the session that issued the logout command).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogoutEvent;
has_packet_type!(LogoutEvent);
/// Indicates some server-side event that impacts the presence of sessions in a
/// room.
///
@ -97,8 +82,6 @@ pub struct NetworkEvent {
pub server_era: String,
}
has_packet_type!(NetworkEvent);
/// Announces a nick change by another session in the room.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NickEvent {
@ -112,8 +95,6 @@ pub struct NickEvent {
pub to: String,
}
has_packet_type!(NickEvent);
/// Indicates that a message in the room has been modified or deleted.
///
/// If the client offers a user interface and the indicated message is currently
@ -129,14 +110,10 @@ pub struct EditMessageEvent {
pub message: Message,
}
has_packet_type!(EditMessageEvent);
/// Indicates a session just disconnected from the room.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PartEvent(pub SessionView);
has_packet_type!(PartEvent);
/// Represents a server-to-client ping.
///
/// The client should send back a ping-reply with the same value for the time
@ -150,8 +127,6 @@ pub struct PingEvent {
pub next: Time,
}
has_packet_type!(PingEvent);
/// Informs the client that another user wants to chat with them privately.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PmInitiateEvent {
@ -165,14 +140,10 @@ pub struct PmInitiateEvent {
pub pm_id: Snowflake,
}
has_packet_type!(PmInitiateEvent);
/// Indicates a message received by the room from another session.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SendEvent(pub Message);
has_packet_type!(SendEvent);
/// Indicates that a session has successfully joined a room.
///
/// It also offers a snapshot of the rooms state and recent history.
@ -197,5 +168,3 @@ pub struct SnapshotEvent {
/// If given, this room is for private chat with the given user.
pub pm_with_user_id: Option<String>,
}
has_packet_type!(SnapshotEvent);

View file

@ -1,196 +1,7 @@
use std::fmt;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json::Value;
/// The type of a packet.
///
/// Not all of these types have their corresponding data modeled as a struct.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum PacketType {
// Asynchronous events
/// See [`BounceEvent`](super::BounceEvent).
BounceEvent,
/// See [`DisconnectEvent`](super::DisconnectEvent).
DisconnectEvent,
/// See [`HelloEvent`](super::HelloEvent).
HelloEvent,
/// See [`JoinEvent`](super::JoinEvent).
JoinEvent,
/// See [`LoginEvent`](super::LoginEvent).
LoginEvent,
/// See [`LogoutEvent`](super::LogoutEvent).
LogoutEvent,
/// See [`NetworkEvent`](super::NetworkEvent).
NetworkEvent,
/// See [`NickEvent`](super::NickEvent).
NickEvent,
/// See [`EditMessageEvent`](super::EditMessageEvent).
EditMessageEvent,
/// See [`PartEvent`](super::PartEvent).
PartEvent,
/// See [`PingEvent`](super::PingEvent).
PingEvent,
/// See [`PmInitiateEvent`](super::PmInitiateEvent).
PmInitiateEvent,
/// See [`SendEvent`](super::SendEvent).
SendEvent,
/// See [`SnapshotEvent`](super::SnapshotEvent).
SnapshotEvent,
// Session commands
/// See [`Auth`](super::Auth).
Auth,
/// See [`AuthReply`](super::AuthReply).
AuthReply,
/// See [`Ping`](super::Ping).
Ping,
/// See [`PingReply`](super::PingReply).
PingReply,
// Chat room commands
/// See [`GetMessage`](super::GetMessage).
GetMessage,
/// See [`GetMessageReply`](super::GetMessageReply).
GetMessageReply,
/// See [`Log`](super::Log).
Log,
/// See [`LogReply`](super::LogReply).
LogReply,
/// See [`Nick`](super::Nick).
Nick,
/// See [`NickReply`](super::NickReply).
NickReply,
/// See [`PmInitiate`](super::PmInitiate).
PmInitiate,
/// See [`PmInitiateReply`](super::PmInitiateReply).
PmInitiateReply,
/// See [`Send`](super::Send).
Send,
/// See [`SendReply`](super::SendReply).
SendReply,
/// See [`Who`](super::Who).
Who,
/// See [`WhoReply`](super::WhoReply).
WhoReply,
// Account commands
/// Not implemented.
ChangeEmail,
/// Not implemented.
ChangeEmailReply,
/// Not implemented.
ChangeName,
/// Not implemented.
ChangeNameReply,
/// Not implemented.
ChangePassword,
/// Not implemented.
ChangePasswordReply,
/// Not implemented.
Login,
/// Not implemented.
LoginReply,
/// Not implemented.
Logout,
/// Not implemented.
LogoutReply,
/// Not implemented.
RegisterAccount,
/// Not implemented.
RegisterAccountReply,
/// Not implemented.
ResendVerificationEmail,
/// Not implemented.
ResendVerificationEmailReply,
/// Not implemented.
ResetPassword,
/// Not implemented.
ResetPasswordReply,
// Room host commands
/// Not implemented.
Ban,
/// Not implemented.
BanReply,
/// Not implemented.
EditMessage,
/// Not implemented.
EditMessageReply,
/// Not implemented.
GrantAccess,
/// Not implemented.
GrantAccessReply,
/// Not implemented.
GrantManager,
/// Not implemented.
GrantManagerReply,
/// Not implemented.
RevokeAccess,
/// Not implemented.
RevokeAccessReply,
/// Not implemented.
RevokeManager,
/// Not implemented.
RevokeManagerReply,
/// Not implemented.
Unban,
/// Not implemented.
UnbanReply,
// Staff commands
/// Not implemented.
StaffCreateRoom,
/// Not implemented.
StaffCreateRoomReply,
/// Not implemented.
StaffEnrollOtp,
/// Not implemented.
StaffEnrollOtpReply,
/// Not implemented.
StaffGrantManager,
/// Not implemented.
StaffGrantManagerReply,
/// Not implemented.
StaffInvade,
/// Not implemented.
StaffInvadeReply,
/// Not implemented.
StaffLockRoom,
/// Not implemented.
StaffLockRoomReply,
/// Not implemented.
StaffRevokeAccess,
/// Not implemented.
StaffRevokeAccessReply,
/// Not implemented.
StaffValidateOtp,
/// Not implemented.
StaffValidateOtpReply,
/// Not implemented.
UnlockStaffCapability,
/// Not implemented.
UnlockStaffCapabilityReply,
}
impl fmt::Display for PacketType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match serde_json::to_value(self) {
Ok(Value::String(s)) => write!(f, "{}", s),
_ => Err(fmt::Error),
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum PacketError {
#[error("throttled: {0}")]
Throttled(String),
#[error("error: {0}")]
Error(String),
}
use super::PacketType;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Packet {
@ -203,80 +14,206 @@ pub struct Packet {
pub throttled_reason: Option<String>,
}
impl Packet {
pub fn data(self) -> Result<Value, PacketError> {
if self.throttled {
let reason = self
.throttled_reason
.unwrap_or_else(|| "no reason given".to_string());
Err(PacketError::Throttled(reason))
} else if let Some(error) = self.error {
Err(PacketError::Error(error))
} else {
Ok(self.data.unwrap_or_default())
pub trait Command {
type Reply;
}
macro_rules! packets {
( $( $name:ident, )*) => {
#[derive(Debug, Clone)]
pub enum Data {
$( $name(super::$name), )*
Unimplemented,
}
}
impl Data {
pub fn from_value(ptype: PacketType, value: Value) -> serde_json::Result<Self> {
Ok(match ptype {
$( PacketType::$name => Self::$name(serde_json::from_value(value)?), )*
_ => Self::Unimplemented,
})
}
pub fn to_value(self) -> serde_json::Result<Value> {
Ok(match self{
$( Self::$name(p) => serde_json::to_value(p)?, )*
Self::Unimplemented => panic!("using unimplemented data"),
})
}
pub fn packet_type(&self) -> PacketType {
match self {
$( Self::$name(p) => PacketType::$name, )*
Self::Unimplemented => panic!("using unimplemented data"),
}
}
}
$(
impl From<super::$name> for Data {
fn from(p: super::$name) -> Self {
Self::$name(p)
}
}
impl TryFrom<Data> for super::$name{
type Error = ();
fn try_from(value: Data) -> Result<Self, Self::Error> {
match value {
Data::$name(p) => Ok(p),
_ => Err(())
}
}
}
)*
};
}
pub trait HasPacketType {
fn packet_type() -> PacketType;
}
macro_rules! has_packet_type {
($name:ident) => {
impl HasPacketType for $name {
fn packet_type() -> PacketType {
PacketType::$name
macro_rules! events {
( $( $name:ident, )* ) => {
impl Data {
pub fn is_event(&self) -> bool {
match self {
$( Self::$name(_) => true, )*
_ => false,
}
}
}
};
}
pub(crate) use has_packet_type;
pub trait ToPacket {
fn to_packet(self, id: Option<String>) -> Packet;
macro_rules! commands {
( $( $cmd:ident => $rpl:ident, )* ) => {
$(
impl Command for super::$cmd {
type Reply = super::$rpl;
}
)*
};
}
impl<T: HasPacketType + Serialize> ToPacket for T {
fn to_packet(self, id: Option<String>) -> Packet {
Packet {
id,
r#type: Self::packet_type(),
data: Some(serde_json::to_value(self).expect("malformed packet")),
error: None,
throttled: false,
throttled_reason: None,
}
}
packets! {
BounceEvent,
DisconnectEvent,
HelloEvent,
JoinEvent,
LoginEvent,
LogoutEvent,
NetworkEvent,
NickEvent,
EditMessageEvent,
PartEvent,
PingEvent,
PmInitiateEvent,
SendEvent,
SnapshotEvent,
Auth,
AuthReply,
Ping,
PingReply,
GetMessage,
GetMessageReply,
Log,
LogReply,
Nick,
NickReply,
PmInitiate,
PmInitiateReply,
Send,
SendReply,
Who,
WhoReply,
}
#[derive(Debug, thiserror::Error)]
pub enum DecodeError {
#[error("incorrect packet type: expected {expected}, got {actual}")]
IncorrectType {
expected: PacketType,
actual: PacketType,
},
#[error("{0}")]
SerdeJson(#[from] serde_json::Error),
#[error("{0}")]
Packet(#[from] PacketError),
events! {
BounceEvent,
DisconnectEvent,
HelloEvent,
JoinEvent,
LoginEvent,
LogoutEvent,
NetworkEvent,
NickEvent,
EditMessageEvent,
PartEvent,
PingEvent,
PmInitiateEvent,
SendEvent,
SnapshotEvent,
}
pub trait FromPacket: Sized {
fn from_packet(packet: Packet) -> Result<Self, DecodeError>;
commands! {
Auth => AuthReply,
Ping => PingReply,
GetMessage => GetMessageReply,
Log => LogReply,
Nick => NickReply,
PmInitiate => PmInitiateReply,
Send => SendReply,
Who => WhoReply,
}
impl<T: HasPacketType + DeserializeOwned> FromPacket for T {
fn from_packet(packet: Packet) -> Result<Self, DecodeError> {
if packet.r#type != Self::packet_type() {
Err(DecodeError::IncorrectType {
expected: Self::packet_type(),
actual: packet.r#type,
})
#[derive(Debug, Clone)]
pub struct ParsedPacket {
pub id: Option<String>,
pub r#type: PacketType,
pub content: Result<Data, String>,
pub throttled: Option<String>,
}
impl ParsedPacket {
pub fn from_packet(packet: Packet) -> serde_json::Result<Self> {
let id = packet.id;
let r#type = packet.r#type;
let content = if let Some(error) = packet.error {
Err(error)
} else {
let data = packet.data()?;
Ok(serde_json::from_value(data)?)
}
let data = packet.data.unwrap_or_default();
Ok(Data::from_value(r#type, data)?)
};
let throttled = if packet.throttled {
let reason = packet
.throttled_reason
.unwrap_or_else(|| "no reason given".to_string());
Some(reason)
} else {
None
};
Ok(Self {
id,
r#type,
content,
throttled,
})
}
pub fn to_packet(self) -> serde_json::Result<Packet> {
let id = self.id;
let r#type = self.r#type;
let throttled = self.throttled.is_some();
let throttled_reason = self.throttled;
Ok(match self.content {
Ok(data) => Packet {
id,
r#type,
data: Some(data.to_value()?),
error: None,
throttled,
throttled_reason,
},
Err(error) => Packet {
id,
r#type,
data: None,
error: Some(error),
throttled,
throttled_reason,
},
})
}
}

View file

@ -2,7 +2,7 @@
use serde::{Deserialize, Serialize};
use super::{has_packet_type, HasPacketType, Message, PacketType, SessionView, Snowflake, UserId};
use super::{Message, SessionView, Snowflake, UserId};
/// Retrieve the full content of a single message in the room.
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -11,14 +11,10 @@ pub struct GetMessage {
pub id: Snowflake,
}
has_packet_type!(GetMessage);
/// The message retrieved by [`GetMessage`].
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetMessageReply(pub Message);
has_packet_type!(GetMessageReply);
/// Request messages from the room's message log.
///
/// This can be used to supplement the log provided by snapshot-event (for
@ -31,8 +27,6 @@ pub struct Log {
pub before: Option<Snowflake>,
}
has_packet_type!(Log);
/// List of messages from the room's message log.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogReply {
@ -42,8 +36,6 @@ pub struct LogReply {
pub before: Option<Snowflake>,
}
has_packet_type!(LogReply);
/// Set the name you present to the room.
///
/// This name applies to all messages sent during this session, until the nick
@ -54,8 +46,6 @@ pub struct Nick {
pub name: String,
}
has_packet_type!(Nick);
/// Confirms the [`Nick`] command.
///
/// Returns the session's former and new names (the server may modify the
@ -72,8 +62,6 @@ pub struct NickReply {
pub to: String,
}
has_packet_type!(NickReply);
/// Constructs a virtual room for private messaging between the client and the
/// given [`UserId`].
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -82,8 +70,6 @@ pub struct PmInitiate {
pub user_id: UserId,
}
has_packet_type!(PmInitiate);
/// Provides the PMID for the requested private messaging room.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PmInitiateReply {
@ -93,8 +79,6 @@ pub struct PmInitiateReply {
pub to_nick: String,
}
has_packet_type!(PmInitiateReply);
/// Send a message to a room.
///
/// The session must be successfully joined with the room. This message will be
@ -114,27 +98,19 @@ pub struct Send {
pub parent: Option<Snowflake>,
}
has_packet_type!(Send);
/// The message that was sent.
///
/// this includes the message id, which was populated by the server.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SendReply(pub Message);
has_packet_type!(SendReply);
/// Request a list of sessions currently joined in the room.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Who;
has_packet_type!(Who);
/// Lists the sessions currently joined in the room.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WhoReply {
/// A list of session views.
listing: Vec<SessionView>,
}
has_packet_type!(WhoReply);

View file

@ -2,7 +2,7 @@
use serde::{Deserialize, Serialize};
use super::{has_packet_type, AuthOption, HasPacketType, PacketType, Time};
use super::{AuthOption, Time};
/// Attempt to join a private room.
///
@ -16,8 +16,6 @@ pub struct Auth {
pub passcode: Option<String>,
}
has_packet_type!(Auth);
/// Reports whether the [`Auth`] command succeeded.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthReply {
@ -27,8 +25,6 @@ pub struct AuthReply {
pub reason: Option<String>,
}
has_packet_type!(AuthReply);
/// Initiate a client-to-server ping.
///
/// The server will send back a [`PingReply`] with the same timestamp as soon as
@ -39,13 +35,9 @@ pub struct Ping {
pub time: Time,
}
has_packet_type!(Ping);
/// Response to a [`Ping`] command or [`PingEvent`](super::PingEvent).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PingReply {
/// The timestamp of the ping being replied to.
pub time: Option<Time>,
}
has_packet_type!(PingReply);

View file

@ -10,6 +10,7 @@ use std::fmt;
use chrono::{DateTime, Utc};
use serde::{de, ser, Deserialize, Serialize};
use serde_json::Value;
/// Describes an account and its preferred name.
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -61,6 +62,186 @@ pub struct Message {
pub truncated: bool,
}
/// The type of a packet.
///
/// Not all of these types have their corresponding data modeled as a struct.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum PacketType {
// Asynchronous events
/// See [`BounceEvent`](super::BounceEvent).
BounceEvent,
/// See [`DisconnectEvent`](super::DisconnectEvent).
DisconnectEvent,
/// See [`HelloEvent`](super::HelloEvent).
HelloEvent,
/// See [`JoinEvent`](super::JoinEvent).
JoinEvent,
/// See [`LoginEvent`](super::LoginEvent).
LoginEvent,
/// See [`LogoutEvent`](super::LogoutEvent).
LogoutEvent,
/// See [`NetworkEvent`](super::NetworkEvent).
NetworkEvent,
/// See [`NickEvent`](super::NickEvent).
NickEvent,
/// See [`EditMessageEvent`](super::EditMessageEvent).
EditMessageEvent,
/// See [`PartEvent`](super::PartEvent).
PartEvent,
/// See [`PingEvent`](super::PingEvent).
PingEvent,
/// See [`PmInitiateEvent`](super::PmInitiateEvent).
PmInitiateEvent,
/// See [`SendEvent`](super::SendEvent).
SendEvent,
/// See [`SnapshotEvent`](super::SnapshotEvent).
SnapshotEvent,
// Session commands
/// See [`Auth`](super::Auth).
Auth,
/// See [`AuthReply`](super::AuthReply).
AuthReply,
/// See [`Ping`](super::Ping).
Ping,
/// See [`PingReply`](super::PingReply).
PingReply,
// Chat room commands
/// See [`GetMessage`](super::GetMessage).
GetMessage,
/// See [`GetMessageReply`](super::GetMessageReply).
GetMessageReply,
/// See [`Log`](super::Log).
Log,
/// See [`LogReply`](super::LogReply).
LogReply,
/// See [`Nick`](super::Nick).
Nick,
/// See [`NickReply`](super::NickReply).
NickReply,
/// See [`PmInitiate`](super::PmInitiate).
PmInitiate,
/// See [`PmInitiateReply`](super::PmInitiateReply).
PmInitiateReply,
/// See [`Send`](super::Send).
Send,
/// See [`SendReply`](super::SendReply).
SendReply,
/// See [`Who`](super::Who).
Who,
/// See [`WhoReply`](super::WhoReply).
WhoReply,
// Account commands
/// Not implemented.
ChangeEmail,
/// Not implemented.
ChangeEmailReply,
/// Not implemented.
ChangeName,
/// Not implemented.
ChangeNameReply,
/// Not implemented.
ChangePassword,
/// Not implemented.
ChangePasswordReply,
/// Not implemented.
Login,
/// Not implemented.
LoginReply,
/// Not implemented.
Logout,
/// Not implemented.
LogoutReply,
/// Not implemented.
RegisterAccount,
/// Not implemented.
RegisterAccountReply,
/// Not implemented.
ResendVerificationEmail,
/// Not implemented.
ResendVerificationEmailReply,
/// Not implemented.
ResetPassword,
/// Not implemented.
ResetPasswordReply,
// Room host commands
/// Not implemented.
Ban,
/// Not implemented.
BanReply,
/// Not implemented.
EditMessage,
/// Not implemented.
EditMessageReply,
/// Not implemented.
GrantAccess,
/// Not implemented.
GrantAccessReply,
/// Not implemented.
GrantManager,
/// Not implemented.
GrantManagerReply,
/// Not implemented.
RevokeAccess,
/// Not implemented.
RevokeAccessReply,
/// Not implemented.
RevokeManager,
/// Not implemented.
RevokeManagerReply,
/// Not implemented.
Unban,
/// Not implemented.
UnbanReply,
// Staff commands
/// Not implemented.
StaffCreateRoom,
/// Not implemented.
StaffCreateRoomReply,
/// Not implemented.
StaffEnrollOtp,
/// Not implemented.
StaffEnrollOtpReply,
/// Not implemented.
StaffGrantManager,
/// Not implemented.
StaffGrantManagerReply,
/// Not implemented.
StaffInvade,
/// Not implemented.
StaffInvadeReply,
/// Not implemented.
StaffLockRoom,
/// Not implemented.
StaffLockRoomReply,
/// Not implemented.
StaffRevokeAccess,
/// Not implemented.
StaffRevokeAccessReply,
/// Not implemented.
StaffValidateOtp,
/// Not implemented.
StaffValidateOtpReply,
/// Not implemented.
UnlockStaffCapability,
/// Not implemented.
UnlockStaffCapabilityReply,
}
impl fmt::Display for PacketType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match serde_json::to_value(self) {
Ok(Value::String(s)) => write!(f, "{}", s),
_ => Err(fmt::Error),
}
}
}
/// Describes an account to its owner.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PersonalAccountView {

View file

@ -15,12 +15,11 @@ use tokio::sync::mpsc;
use tokio::{select, task, time};
use tokio_tungstenite::{tungstenite, MaybeTlsStream, WebSocketStream};
use crate::replies::{self, Replies};
use crate::replies::{self, PendingReply, Replies};
use super::api::packet::{Command, Packet, ParsedPacket};
use super::api::{
BounceEvent, FromPacket, HelloEvent, JoinEvent, NetworkEvent, NickEvent, NickReply, Packet,
PacketType, PartEvent, PersonalAccountView, Ping, PingEvent, PingReply, SendEvent,
SnapshotEvent, ToPacket,
BounceEvent, Data, HelloEvent, PersonalAccountView, Ping, PingReply, SnapshotEvent,
};
use super::{SessionView, Time, UserId};
@ -32,16 +31,34 @@ pub enum Error {
ConnectionClosed,
#[error("packet timed out")]
TimedOut,
#[error("incorrect reply type")]
IncorrectReplyType,
#[error("{0}")]
Euph(String),
}
#[derive(Debug)]
enum Event {
Message(tungstenite::Message),
Send(Packet, oneshot::Sender<Result<Packet, Error>>),
SendCmd(Data, oneshot::Sender<PendingReply<Result<Data, String>>>),
SendRpl(Option<String>, Data),
Status(oneshot::Sender<Status>),
DoPings,
}
impl Event {
fn send_cmd<C: Into<Data>>(
cmd: C,
rpl: oneshot::Sender<PendingReply<Result<Data, String>>>,
) -> Self {
Self::SendCmd(cmd.into(), rpl)
}
fn send_rpl<C: Into<Data>>(id: Option<String>, rpl: C) -> Self {
Self::SendRpl(id, rpl.into())
}
}
#[derive(Debug, Clone, Default)]
pub struct Joining {
hello: Option<HelloEvent>,
@ -50,11 +67,11 @@ pub struct Joining {
}
impl Joining {
fn on_packet(&mut self, packet: Packet) -> anyhow::Result<()> {
match packet.r#type {
PacketType::BounceEvent => self.bounce = Some(BounceEvent::from_packet(packet)?),
PacketType::HelloEvent => self.hello = Some(HelloEvent::from_packet(packet)?),
PacketType::SnapshotEvent => self.snapshot = Some(SnapshotEvent::from_packet(packet)?),
fn on_data(&mut self, data: Data) -> anyhow::Result<()> {
match data {
Data::BounceEvent(p) => self.bounce = Some(p),
Data::HelloEvent(p) => self.hello = Some(p),
Data::SnapshotEvent(p) => self.snapshot = Some(p),
_ => {}
}
Ok(())
@ -87,43 +104,32 @@ pub struct Joined {
}
impl Joined {
fn on_packet(&mut self, packet: Packet) -> anyhow::Result<()> {
match packet.r#type {
PacketType::JoinEvent => {
let packet = JoinEvent::from_packet(packet)?;
self.listing.insert(packet.0.id.clone(), packet.0);
fn on_data(&mut self, data: Data) -> anyhow::Result<()> {
match data {
Data::JoinEvent(p) => {
self.listing.insert(p.0.id.clone(), p.0);
}
PacketType::SendEvent => {
let packet = SendEvent::from_packet(packet)?;
self.listing
.insert(packet.0.sender.id.clone(), packet.0.sender);
Data::SendEvent(p) => {
self.listing.insert(p.0.sender.id.clone(), p.0.sender);
}
PacketType::PartEvent => {
let packet = PartEvent::from_packet(packet)?;
self.listing.remove(&packet.0.id);
Data::PartEvent(p) => {
self.listing.remove(&p.0.id);
}
PacketType::NetworkEvent => {
let p = NetworkEvent::from_packet(packet)?;
Data::NetworkEvent(p) => {
if p.r#type == "partition" {
self.listing.retain(|_, s| {
!(s.server_id == p.server_id && s.server_era == p.server_era)
});
}
}
PacketType::NickEvent => {
let packet = NickEvent::from_packet(packet)?;
if let Some(session) = self.listing.get_mut(&packet.id) {
session.name = packet.to;
Data::NickEvent(p) => {
if let Some(session) = self.listing.get_mut(&p.id) {
session.name = p.to;
}
}
PacketType::NickReply => {
// Since this is a reply, it may contain errors, for example if
// the user specified an invalid nick. We can't just die if that
// happens, so we ignore the error case.
if let Ok(packet) = NickReply::from_packet(packet) {
assert_eq!(self.session.id, packet.id);
self.session.name = packet.to;
}
Data::NickReply(p) => {
assert_eq!(self.session.id, p.id);
self.session.name = p.to;
}
// The who reply is broken and can't be trusted right now, so we'll
// not even look at it.
@ -142,9 +148,9 @@ pub enum Status {
struct State {
ws_tx: SplitSink<WsStream, tungstenite::Message>,
last_id: usize,
replies: Replies<String, Packet>,
replies: Replies<String, Result<Data, String>>,
packet_tx: mpsc::UnboundedSender<Packet>,
packet_tx: mpsc::UnboundedSender<Data>,
last_ws_ping: Option<Vec<u8>>,
last_ws_pong: Option<Vec<u8>>,
@ -161,7 +167,7 @@ impl State {
rx_canary: oneshot::Receiver<Infallible>,
event_tx: mpsc::UnboundedSender<Event>,
mut event_rx: mpsc::UnboundedReceiver<Event>,
packet_tx: mpsc::UnboundedSender<Packet>,
packet_tx: mpsc::UnboundedSender<Data>,
) {
let (ws_tx, mut ws_rx) = ws.split();
let state = Self {
@ -208,7 +214,8 @@ impl State {
while let Some(ev) = event_rx.recv().await {
match ev {
Event::Message(msg) => self.on_msg(msg, event_tx)?,
Event::Send(packet, reply_tx) => self.on_send(packet, reply_tx).await?,
Event::SendCmd(data, reply_tx) => self.on_send_cmd(data, reply_tx).await?,
Event::SendRpl(id, data) => self.on_send_rpl(id, data).await?,
Event::Status(reply_tx) => self.on_status(reply_tx),
Event::DoPings => self.do_pings(event_tx).await?,
}
@ -237,62 +244,84 @@ impl State {
packet: Packet,
event_tx: &mpsc::UnboundedSender<Event>,
) -> anyhow::Result<()> {
if packet.r#type == PacketType::PingReply {
let packet = PingReply::from_packet(packet.clone())?;
self.last_euph_pong = packet.time;
} else if packet.r#type == PacketType::PingEvent {
let time = Some(PingEvent::from_packet(packet.clone())?.time);
Self::send_unconditionally(event_tx, PingReply { time }, packet.id.clone())?;
}
let packet = ParsedPacket::from_packet(packet)?;
// Complete pending replies if the packet has an id
if let Some(id) = &packet.id {
self.replies.complete(id, packet.clone());
self.replies.complete(id, packet.content.clone());
}
self.packet_tx.send(packet.clone())?;
// TODO Handle disconnect event?
match &mut self.status {
Status::Joining(joining) => {
joining.on_packet(packet)?;
if let Some(joined) = joining.joined() {
self.status = Status::Joined(joined);
}
// Shovel events into self.packet_tx, assuming that no event ever
// errors. Events with errors are simply ignored.
if let Ok(data) = &packet.content {
if data.is_event() {
self.packet_tx.send(data.clone())?;
}
}
// Play a game of table tennis
match &packet.content {
Ok(Data::PingReply(p)) => self.last_euph_pong = p.time,
Ok(Data::PingEvent(p)) => {
let reply = PingReply { time: Some(p.time) };
event_tx.send(Event::send_rpl(packet.id.clone(), reply))?;
}
// TODO Handle disconnect event?
_ => {}
}
// Update internal state
if let Ok(data) = packet.content {
match &mut self.status {
Status::Joining(joining) => {
joining.on_data(data)?;
if let Some(joined) = joining.joined() {
self.status = Status::Joined(joined);
}
}
Status::Joined(joined) => joined.on_data(data)?,
}
Status::Joined(joined) => joined.on_packet(packet)?,
}
Ok(())
}
async fn on_send(
async fn on_send_cmd(
&mut self,
mut packet: Packet,
reply_tx: oneshot::Sender<Result<Packet, Error>>,
data: Data,
reply_tx: oneshot::Sender<PendingReply<Result<Data, String>>>,
) -> anyhow::Result<()> {
let id = if let Some(id) = packet.id.clone() {
id
} else {
// Overkill of universe-heat-death-like proportions
self.last_id = self.last_id.wrapping_add(1);
format!("{}", self.last_id)
};
packet.id = Some(id.clone());
// Overkill of universe-heat-death-like proportions
self.last_id = self.last_id.wrapping_add(1);
let id = format!("{}", self.last_id);
let pending_reply = self.replies.wait_for(id);
let packet = ParsedPacket {
id: Some(id.clone()),
r#type: data.packet_type(),
content: Ok(data),
throttled: None,
}
.to_packet()?;
let msg = tungstenite::Message::Text(serde_json::to_string(&packet)?);
self.ws_tx.send(msg).await?;
let reply = match pending_reply.get().await {
Ok(reply) => Ok(reply),
Err(replies::Error::TimedOut) => Err(Error::TimedOut),
// We could also send an Error::ConnectionClosed here, but that
// happens automatically in the send function once we drop reply_tx.
Err(replies::Error::Canceled) => return Ok(()),
};
let _ = reply_tx.send(reply);
let _ = reply_tx.send(self.replies.wait_for(id));
Ok(())
}
async fn on_send_rpl(&mut self, id: Option<String>, data: Data) -> anyhow::Result<()> {
let packet = ParsedPacket {
id,
r#type: data.packet_type(),
content: Ok(data),
throttled: None,
}
.to_packet()?;
let msg = tungstenite::Message::Text(serde_json::to_string(&packet)?);
self.ws_tx.send(msg).await?;
Ok(())
}
@ -321,18 +350,9 @@ impl State {
// Send new euph ping
let euph_payload = Time(Utc::now());
Self::send_unconditionally(event_tx, Ping { time: euph_payload }, None)?;
Ok(())
}
fn send_unconditionally<T: ToPacket>(
event_tx: &mpsc::UnboundedSender<Event>,
packet: T,
id: Option<String>,
) -> anyhow::Result<()> {
let (tx, _) = oneshot::channel();
event_tx.send(Event::Send(packet.to_packet(id), tx))?;
event_tx.send(Event::send_cmd(Ping { time: euph_payload }, tx))?;
Ok(())
}
}
@ -343,16 +363,30 @@ pub struct ConnTx {
}
impl ConnTx {
pub async fn send<T: ToPacket>(&self, packet: T) -> Result<Packet, Error> {
pub async fn send<C>(&self, cmd: C) -> Result<C::Reply, Error>
where
C: Command + Into<Data>,
C::Reply: TryFrom<Data, Error = ()>,
{
let (tx, rx) = oneshot::channel();
let event = Event::Send(packet.to_packet(None), tx);
self.event_tx
.send(event)
.send(Event::SendCmd(cmd.into(), tx))
.map_err(|_| Error::ConnectionClosed)?;
match rx.await {
Ok(result) => result,
Err(_) => Err(Error::ConnectionClosed),
}
let pending_reply = rx
.await
// This should only happen if something goes wrong during encoding
// of the packet or while sending it through the websocket. Assuming
// the first doesn't happen, the connection is probably closed.
.map_err(|_| Error::ConnectionClosed)?;
let data = pending_reply
.get()
.await
.map_err(|e| match e {
replies::Error::TimedOut => Error::TimedOut,
replies::Error::Canceled => Error::ConnectionClosed,
})?
.map_err(Error::Euph)?;
data.try_into().map_err(|_| Error::IncorrectReplyType)
}
pub async fn status(&self) -> Result<Status, Error> {
@ -366,11 +400,11 @@ impl ConnTx {
pub struct ConnRx {
canary: oneshot::Sender<Infallible>,
packet_rx: mpsc::UnboundedReceiver<Packet>,
packet_rx: mpsc::UnboundedReceiver<Data>,
}
impl ConnRx {
pub async fn recv(&mut self) -> Result<Packet, Error> {
pub async fn recv(&mut self) -> Result<Data, Error> {
self.packet_rx.recv().await.ok_or(Error::ConnectionClosed)
}
}

View file

@ -16,6 +16,7 @@ pub enum Error {
pub type Result<T> = result::Result<T, Error>;
#[derive(Debug)]
pub struct PendingReply<R> {
timeout: Duration,
result: Receiver<R>,
@ -32,6 +33,7 @@ impl<R> PendingReply<R> {
}
}
#[derive(Debug)]
pub struct Replies<I, R> {
timeout: Duration,
pending: HashMap<I, Sender<R>>,