Compare commits

...
Sign in to create a new pull request.

10 commits

Author SHA1 Message Date
Joscha
8f02d05b5a Add more functions 2016-09-21 18:52:16 +00:00
Joscha
65cad2cdf3 Refer to password as "password", not "pw" 2016-09-21 18:51:47 +00:00
Joscha
b4eacde6ca Import existing modules 2016-09-21 17:03:13 +00:00
Joscha
a4b9e016b0 Refer to client nick as "nick", not "name" 2016-09-21 17:00:04 +00:00
Joscha
ef2d9eba50 Add callbacks 2016-09-21 16:35:37 +00:00
Joscha
9082d0a404 Add mention 2016-09-20 20:49:33 +00:00
Joscha
19de0177d5 Add exceptions 2016-09-20 20:49:20 +00:00
Joscha
cc5a342f91 Add documentation 2016-09-20 20:47:40 +00:00
Joscha
8b1a10c396 Use exceptions and mentions 2016-09-20 20:46:41 +00:00
Joscha
fdbe866a70 Rewrite BotManager and add module-wide logging 2016-09-18 17:53:56 +00:00
12 changed files with 286 additions and 2017 deletions

View file

@ -1,10 +1,14 @@
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s|%(name)s|%(levelname)s| %(message)s')
sh = logging.StreamHandler()
sh.setFormatter(formatter)
logger.addHandler(sh)
from .bot import Bot
from .botmanager import BotManager
from .callbacks import Callbacks
from .connection import Connection
from .exceptions import *
from .session import Session
from .message import Message
from .sessions import Sessions
from .messages import Messages
from .room import Room
from .exceptions import YaboliException, CreateBotException
from .mention import Mention

View file

@ -1,600 +1,145 @@
import time
from . import callbacks
from . import exceptions
from . import room
class Bot():
class Bot:
def __init__(self, nick, roomname, password=None, creator=None, create_room=None,
create_time=None, manager=None):
"""
Empty bot class that can be built upon.
Takes care of extended botrulez.
"""
def __init__(self, roomname, nick="yaboli", password=None, manager=None,
created_in=None, created_by=None):
"""
roomname - name of the room to connect to
nick - nick to assume, None -> no nick
roomname - name of the room to connect to
password - room password (in case the room is private)
created_in - room the bot was created in
created_by - nick of the person the bot was created by
creator - nick of the person the bot was created by
create_room - room the bot was created in
create_time - time/date the bot was created at (used when listing bots)
"""
self.start_time = time.time()
self.created_by = created_by
self.created_in = created_in
self.manager = manager
self.start_time = time.time()
# modify/customize this in your __init__() function (or anywhere else you want, for that matter)
self.bot_description = ("This bot complies with the botrulez™ (https://github.com/jedevc/botrulez),\n"
self.creator = creator
self.create_room = create_room
self.create_time = create_time
self.room = room.Room(nick, roomname, password=password)
#self.room.add_callback("message", self.on_message)
# description used on general "!help", and in the !help text. (None - no reaction)
self.short_description = None
self.description = ("This bot complies with the botrulez™",
"(https://github.com/jedevc/botrulez),\n"
"plus a few extra commands.")
self.helptexts = {}
self.detailed_helptexts = {}
self.room = room.Room(roomname, nick=nick, password=password)
self.room.add_callback("message", self.on_message)
self.commands = callbacks.Callbacks()
self.bot_specific_commands = []
self.add_command("clone", self.clone_command, "Clone this bot to another room.", # possibly add option to set nick?
("!clone @bot [ <room> [ --pw=<password> ] ]\n"
"<room> : the name of the room to clone the bot to\n"
"--pw : the room's password\n\n"
"Clone this bot to the room specified.\n"
"If the target room is passworded, you can use the --pw option to set\n"
"a password for the bot to use.\n"
"If no room is specified, this will use the current room and password."),
bot_specific=False)
self.add_command("help", self.help_command, "Show help information about the bot.",
("!help @bot [ -s | -c | <command> ]\n"
"-s : general syntax help\n"
"-c : only list the commands\n"
"<command> : any command from !help @bot -c\n\n"
"Shows detailed help for a command if you specify a command name.\n"
"Shows a list of commands and short description if no arguments are given."))
self.add_command("kill", self.kill_command, "Kill (stop) the bot.",
("!kill @bot [ -r ]\n"
"-r : restart the bot (will change the id)\n\n"
"The bot disconnects from the room and stops."))
self.add_command("ping", self.ping_command, "Replies 'Pong!'.",
("!ping @bot\n\n"
"This command was originally used to help distinguish bots from\n"
"people. Since the Great UI Change, this is no longer necessary as\n"
"bots and people are displayed separately."))
self.add_command("restart", self.restart_command, "Restart the bot (shorthand for !kill @bot -r).",
("!restart @bot\n\n"
"Restart the bot.\n"
"Short for !kill @bot -r"))
self.add_command("send", self.send_command, "Send the bot to another room.",
("!send @bot <room> [ --pw=<password> ]\n"
"--pw : the room's password\n\n"
"Sends this bot to the room specified. If the target room is passworded,\n"
"you can use the --pw option to set a password for the bot to use."))
self.add_command("uptime", self.uptime_command, "Show bot uptime since last (re-)start.",
("!uptime @bot [ -i s]\n"
"-i : show more detailed information\n\n"
"Shows the bot's uptime since the last start or restart.\n"
"Shows additional information (i.e. id) if the -i flag is set."))
self.add_command("show", self.show_command, detailed_helptext="You've found a hidden command! :)")
self._commands = callbacks.Callbacks()
self._general_commands = [] # without @mention after the !nick
self._specific_commands = [] # need to choose certain bot if multiple are present
self._helptexts = {}
self._detailed_helptexts = {}
def launch(self):
self.room.launch()
def stop(self):
"""
stop() -> None
Kill this bot.
"""
self.room.stop()
def add_command(self, command, function, helptext=None, detailed_helptext=None,
bot_specific=True):
"""
add_command(command, function, helptext, detailed_helptext, bot_specific) -> None
Subscribe a function to a command and add a help text.
If no help text is provided, the command won't be displayed by the !help command.
This overwrites any previously added command.
You can "hide" commands by specifying only the detailed helptext,
or no helptext at all.
If the command is not bot specific, no id has to be specified if there are multiple bots
with the same nick in a room.
"""
command = command.lower()
self.commands.remove(command)
self.commands.add(command, function)
if helptext and not command in self.helptexts:
self.helptexts[command] = helptext
elif not helptext and command in self.helptexts:
self.helptexts.pop(command)
if detailed_helptext and not command in self.detailed_helptexts:
self.detailed_helptexts[command] = detailed_helptext
elif not detailed_helptext and command in self.detailed_helptexts:
self.detailed_helptexts.pop(command)
if bot_specific and not command in self.bot_specific_commands:
self.bot_specific_commands.append(command)
elif not bot_specific and command in self.bot_specific_commands:
self.bot_specific_commands.remove(command)
def call_command(self, message):
"""
call_command(message) -> None
Calls all functions subscribed to the command with the arguments supplied in the message.
Deals with the situation that multiple bots of the same type and nick are in the same room.
"""
try:
command, bot_id, nick, arguments, flags, options = self.parse(message.content)
except exceptions.ParseMessageException:
return
else:
command = command.lower()
nick = self.room.mentionable(nick).lower()
if not self.commands.exists(command):
return
if not nick == self.mentionable().lower():
return
if bot_id is not None: # id specified
if self.manager.get(bot_id) == self:
self.commands.call(command, message, arguments, flags, options)
else:
return
else: # no id specified
bots = self.manager.get_similar(self.roomname(), nick)
if self.manager.get_id(self) == min(bots): # only one bot should act
# either the bot is unique or the command is not bot-specific
if not command in self.bot_specific_commands or len(bots) == 1:
self.commands.call(command, message, arguments, flags, options)
else: # user has to select a bot
msg = ("There are multiple bots with that nick in this room. To select one,\n"
"please specify its id (from the list below) as follows:\n"
"!{} <id> @{} [your arguments...]\n").format(command, nick)
for bot_id in sorted(bots):
bot = bots[bot_id]
msg += "\n{} - @{} ({})".format(bot_id, bot.mentionable(), bot.creation_info())
self.room.send_message(msg, parent=message.id)
def roomname(self):
"""
roomname() -> roomname
The room the bot is connected to.
"""
return self.room.room
def password(self):
"""
password() -> password
The current room's password.
"""
return self.room.password
def nick(self):
"""
nick() -> nick
The bot's full nick.
"""
def get_nick(self):
return self.room.nick
def mentionable(self):
"""
mentionable() -> nick
def get_roomname(self):
return self.room.roomname
The bot's nick in a mentionable format.
def get_roompassword(self):
return self.room.password
def get_creator(self):
return self.creator
def get_create_room(self):
return self.create_room
def get_create_time(self):
return self.create_time
@staticmethod
def format_date(seconds, omit_date=False, omit_time=False):
"""
format_date(seconds) -> string
Convert a date (Unix/POSIX/Epoch time) into a YYYY-MM-DD hh:mm:ss format.
"""
return self.room.mentionable()
f = ""
if not omit_date:
f += "%Y-%m-%d"
def creation_info(self):
if not omit_time:
if not omit_date: f += " "
f += "%H:%M:%S"
return time.strftime(f, time.gmtime(seconds))
@staticmethod
def format_delta(seconds):
"""
creation_info() -> str
format_delta(seconds) -> string
Formatted info about the bot's creation
Convert a time difference into the following format (where x is an integer):
[-] [[[xd ]xh ]xm ]xs
"""
info = "created {}".format(self.format_date())
seconds = int(seconds)
delta = ""
if self.created_by:
info += " by @{}".format(self.room.mentionable(self.created_by))
if seconds < 0:
delta += "- "
seconds = -seconds
if self.created_in:
info += " in &{}".format(self.created_in)
if seconds >= 24*60*60:
delta +="{}d ".format(seconds//(24*60*60))
seconds %= 24*60*60
return info
if seconds >= 60*60:
delta += "{}h ".format(seconds//(60*60))
seconds %= 60*60
def format_date(self, seconds=None):
if seconds >= 60:
delta += "{}m ".format(seconds//60)
seconds %= 60
delta += "{}s".format(seconds)
return delta
def uptime(self):
"""
format_date(seconds) -> str
uptime() -> string
Format a time in epoch format to the format specified in self.date_format.
Defaults to self.start_time.
The bot's uptime since it was last started, in the following format:
date time (delta)
"""
if seconds is None:
seconds = self.start_time
date = self.format_date(self.start_time)
delta = self.format_delta(time.time() - self.start_time)
return time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(seconds))
return "{} ({})".format(date, delta)
def format_delta(self, delta=None):
def save(self):
"""
format_delta(delta) -> str
Overwrite if your bot should save when BotManager is shut down.
Make sure to also overwrite load().
Format a difference in seconds to the following format:
[- ][<days>d ][<hours>h ][<minutes>m ]<seconds>s
Defaults to the current uptime if no delta is specified.
The data returned will be converted to json and back for using the json module, so
make sure your data can handle that (i.e. don't use numbers as dict keys etc.)
"""
if not delta:
delta = time.time() - self.start_time
pass
delta = int(delta)
uptime = ""
if delta < 0:
uptime += "- "
delta = -delta
if delta >= 24*60*60:
uptime +="{}d ".format(delta//(24*60*60))
delta %= 24*60*60
if delta >= 60*60:
uptime += "{}h ".format(delta//(60*60))
delta %= 60*60
if delta >= 60:
uptime += "{}m ".format(delta//60)
delta %= 60
uptime += "{}s".format(delta)
return uptime
def parse_command(self, message):
def load(self, data):
"""
parse_command(message_content) -> command, bot_id, nick, argpart
Parse the "!command[ bot_id] @botname[ argpart]" part of a command.
Overwrite to load data from save().
See save() for more details.
"""
# command name (!command)
split = message.split(maxsplit=1)
if len(split) < 2:
raise exceptions.ParseMessageException("Not enough arguments")
elif split[0][:1] != "!":
raise exceptions.ParseMessageException("Not a command")
command = split[0][1:]
message = split[1]
split = message.split(maxsplit=1)
# bot id
try:
bot_id = int(split[0])
except ValueError:
bot_id = None
else:
if len(split) <= 1:
raise exceptions.ParseMessageException("No bot nick")
message = split[1]
split = message.split(maxsplit=1)
# bot nick (@mention)
if split[0][:1] != "@":
raise exceptions.ParseMessageException("No bot nick")
nick = split[0][1:]
# arguments to the command
if len(split) > 1:
argpart = split[1]
else:
argpart = None
return command, bot_id, nick, argpart
def parse_arguments(self, argstr):
"""
parse_arguments(argstr) -> arguments, flags, options
Parse the argument part of a command.
"""
argstr += " " # so the last argument will also be captured
escaping = False
quot_marks = None
type_signs = 0
option = None
word = ""
arguments = []
flags = ""
options = {}
for char in argstr:
# backslash-escaping
if escaping:
word += char
escaping = False
elif char == "\\":
escaping = True
# quotation mark escaped strings
elif quot_marks:
if char == quot_marks:
quot_marks = None
else:
word += char
elif char in ['"', "'"]:
quot_marks = char
# type signs
elif char == "-":
if type_signs < 2 and not word:
type_signs += 1
else:
word += char
# "=" in options
elif char == "=" and type_signs == 2 and word and not option:
option = word
word = ""
# space - evaluate information collected so far
elif char == " ":
if word:
if type_signs == 0: # argument
arguments.append(word)
elif type_signs == 1: # flag(s)
for flag in word:
if not flag in flags:
flags += flag
elif type_signs == 2: # option
if option:
options[option] = word
else:
options[word] = True
# reset all state variables
escaping = False
quot_marks = None
type_signs = 0
option = None
word = ""
# all other chars and situations
else:
word += char
return arguments, flags, options
def parse(self, message):
"""
parse(message_content) -> bool
Parse a message.
"""
command, bot_id, nick, argpart = self.parse_command(message)
if argpart:
arguments, flags, options = self.parse_arguments(argpart)
else:
arguments = []
flags = ""
options = {}
return command, bot_id, nick, arguments, flags, options
# ----- HANDLING OF EVENTS -----
def on_message(self, message):
"""
on_message(message) -> None
Gets called when a message is received (see __init__).
If you want to add a command to your bot, consider using add_command instead of overwriting
this function.
"""
self.call_command(message)
# ----- COMMANDS -----
def clone_command(self, message, arguments, flags, options):
"""
clone_command(message, *arguments, flags, options) -> None
Create a new bot.
"""
if not arguments:
room = self.roomname()
password = self.room.password
else:
room = arguments[0]
if room[:1] == "&":
room = room[1:]
if "pw" in options and options["pw"] is not True:
password = options["pw"]
else:
password = None
try:
bot = self.manager.create(room, password=password, created_in=self.roomname(),
created_by=message.sender.name)
except exceptions.CreateBotException:
self.room.send_message("Bot could not be cloned.", parent=message.id)
else:
self.room.send_message("Cloned @{} to &{}.".format(bot.mentionable(), room),
parent=message.id)
def help_command(self, message, arguments, flags, options):
"""
help_command(message, *arguments, flags, options) -> None
Show help about the bot.
"""
if arguments: # detailed help for one command
command = arguments[0]
if command[:1] == "!":
command = command[1:]
if command in self.detailed_helptexts:
msg = "Detailed help for !{}:\n".format(command)
msg += self.detailed_helptexts[command]
else:
msg = "No detailed help text found for !{}.".format(command)
if command in self.helptexts:
msg += "\n\n" + self.helptexts[command]
elif "s" in flags: # detailed syntax help
msg = ("SYNTAX HELP PLACEHOLDER")
else: # just list all commands
msg = ""
if not "c" in flags:
msg += self.bot_description + "\n\n"
msg += "This bot supports the following commands:"
for command in sorted(self.helptexts):
helptext = self.helptexts[command]
msg += "\n!{} - {}".format(command, helptext)
if not "c" in flags:
msg += ("\n\nFor help on the command syntax, try: !help @{0} -s\n"
"For detailed help on a command, try: !help @{0} <command>\n"
"(Hint: Most commands have extra functionality, which is listed in their detailed help.)")
msg = msg.format(self.mentionable())
self.room.send_message(msg, parent=message.id)
def kill_command(self, message, arguments, flags, options):
"""
kill_command(message, *arguments, flags, options) -> None
stop the bot.
"""
if "r" in flags:
bot = self.manager.create(self.roomname())
bot.created_by = self.created_by
bot.created_in = self.created_in
self.room.send_message("/me exits.", message.id)
self.manager.remove(self.manager.get_id(self))
def ping_command(self, message, arguments, flags, options):
"""
ping_command(message, *arguments, flags, options) -> None
Send a "Pong!" reply on a !ping command.
"""
self.room.send_message("Pong!", parent=message.id)
def restart_command(self, message, arguments, flags, options):
"""
restart_command(message, *arguments, flags, options) -> None
Restart the bot (shorthand for !kill @bot -r).
"""
self.commands.call("kill", message, [], "r", {})
def send_command(self, message, arguments, flags, options):
"""
_command(message, *arguments, flags, options) -> None
Send this bot to another room.
"""
if not arguments:
return
else:
room = arguments[0]
if room[:1] == "&":
room = room[1:]
if "pw" in options and options["pw"] is not True:
password = options["pw"]
else:
password = None
self.room.send_message("/me moves to &{}.".format(room), parent=message.id)
self.room.change(room, password=password)
self.room.launch()
def show_command(self, message, arguments, flags, options):
"""
show_command(message, arguments, flags, options) -> None
Show arguments, flags and options.
"""
msg = "arguments: {}\nflags: {}\noptions: {}".format(arguments, repr(flags), options)
self.room.send_message(msg, parent=message.id)
def uptime_command(self, message, arguments, flags, options):
"""
uptime_command(message, arguments, flags, options) -> None
Show uptime and other info.
"""
stime = self.format_date()
utime = self.format_delta()
if "i" in flags:
msg = "uptime: {} ({})".format(stime, utime)
msg += "\nid: {}".format(self.manager.get_id(self))
msg += "\n{}".format(self.creation_info())
else:
msg = "/me is up since {} ({}).".format(stime, utime)
self.room.send_message(msg, message.id)
pass

View file

@ -1,149 +1,184 @@
import json
import time
import logging
logger = logging.getLogger(__name__)
from . import bot
from . import exceptions
from .exceptions import CreateBotException
from .mention import Mention
class BotManager():
class BotManager:
"""
Keep track of multiple bots in different rooms.
Save and load bots from a file.
If you've created a bot from the Bot class, you can easily host it by adding:
if __name__ == "__main__":
manager = BotManager(YourBotClass, bot_limit=10)
manager.interactive()
to your file and then executing it.
"""
def __init__(self, bot_class, default_nick="yaboli", max_bots=100,
bots_file="bots.json", data_file="data.json"):
def __init__(self, bot_class, bot_limit=None):
"""
bot_class - class to create instances of
default_nick - default nick for all bots to assume when no nick is specified
max_bots - maximum number of bots allowed to exist simultaneously
None or 0 - no limit
bots_file - file the bot backups are saved to
None - no bot backups
data_file - file the bot data is saved to
- None - bot data isn't saved
bot_class - bot class you want to run
bot_limit - maximum amount of bots to exist simultaneously
"""
self.bot_class = bot_class
self.max_bots = max_bots
self.default_nick = default_nick
self.bot_limit = bot_limit
self.bot_id_counter = 0 # no two bots can have the same id
self.bots = {} # each bot has an unique id
self.bots_file = bots_file
self.data_file = data_file
self._bots = {}
self._bot_id = 0
self._bot_data = {}
self._load_bots()
def create(self, room, password=None, nick=None, created_in=None, created_by=None):
def create(self, nick, roomname, password=None, creator=None, create_room=None, create_time=None):
"""
create(room, password, nick) -> bot
create(nick, roomname, password, creator, create_room, create_time) -> bot
Create a new bot in room.
Create a bot of type self.bot_class.
Starts the bot and returns it.
"""
if nick is None:
nick = self.default_nick
if self.bot_limit and len(self.bots) >= self.bot_limit:
raise CreateBotException("Bot limit hit ({} bots)".format(self.bot_limit))
if self.max_bots and len(self._bots) >= self.max_bots:
raise exceptions.CreateBotException("max_bots limit hit")
else:
bot = self.bot_class(room, nick=nick, password=password, manager=self,
created_in=created_in, created_by=created_by)
self._bots[self._bot_id] = bot
self._bot_id += 1
bot_id = self.bot_id_counter
self.bot_id_counter += 1
self._save_bots()
if create_time is None:
create_time = time.time()
bot = self.bot_class(nick, roomname, password=password, creator=creator, create_room=create_room,
create_time=create_time, manager=self)
self.bots[bot_id] = bot
bot.launch()
logger.info("Created {} - {} in room {}".format(bot_id, nick, roomname))
return bot
def remove(self, bot_id):
"""
remove(bot_id) -> None
Kill a bot and remove it from the list of bots.
Remove a bot from the manager and stop it.
"""
if bot_id in self._bots:
self._bots[bot_id].stop()
self._bots.pop(bot_id)
bot = self.get(bot_id)
if not bot: return
self._save_bots()
# for logging purposes
nick = bot.get_nick()
roomname = bot.get_roomname()
bot.stop()
del self.bots[bot_id]
logger.info("Removed {} - {} in room {}".format(bot_id, nick, roomname))
def get(self, bot_id):
"""
get(self, bot_id) -> bot
get(bot_id) -> bot
Return bot with that id, if found.
Get a bot by its id.
Returns None if no bot was found.
"""
if bot_id in self._bots:
return self._bots[bot_id]
return self.bots.get(bot_id)
def get_id(self, bot):
"""
get_id(bot) -> bot_id
Return the bot id, if the bot is known.
Get a bot's id.
Returns None if id not found.
"""
for bot_id, own_bot in self._bots.items():
if bot == own_bot:
for bot_id, lbot in self.bots.items():
if lbot == bot:
return bot_id
def get_similar(self, room, nick):
def similar(self, roomname, mention):
"""
get_by_room(room, nick) -> dict
in_room(roomname, mention) -> [bot_id]
Collect all bots that are connected to the room and have that nick.
Get all bots that are connected to a room and match the mention.
The bot ids are sorted from small to big.
"""
return {bot_id: bot for bot_id, bot in self._bots.items()
if bot.roomname() == room and bot.mentionable().lower() == nick.lower()}
l = []
def _load_bots(self):
for bot_id, bot in sorted(self.bots.items()):
if bot.get_roomname() == roomname and mention == Mention(bot.get_nick()):
l.append(bot_id)
return l
def save(self, filename):
"""
_load_bots() -> None
save(filename) -> None
Load and create bots from self.bots_file.
Save all current bots to a file.
"""
if not self.bots_file:
return
try:
with open(self.bots_file) as f:
bots = json.load(f)
except FileNotFoundError:
pass
else:
for bot_info in bots:
bot = self.create(bot_info["room"], password=bot_info["password"],
nick=bot_info["nick"])
bot.created_in = bot_info["created_in"]
bot.created_by = bot_info["created_by"]
def _save_bots(self):
"""
_save_bots() -> None
Save all current bots to self.bots_file.
"""
if not self.bots_file:
return
logger.info("Saving bots to {}".format(filename))
bots = []
for bot in self.bots.values():
bots.append({
"nick": bot.get_nick(),
"room": bot.get_roomname(),
"password": bot.get_roompassword(),
"creator": bot.get_creator(),
"create_room": bot.get_create_room(),
"create_time": bot.get_create_time(),
"data": bot.save()
})
logger.debug("Bot info: {}".format(bots))
for bot_id, bot in self._bots.items():
bot_info = {}
logger.debug("Writing to file")
with open(filename, "w") as f:
json.dump(bots, f, sort_keys=True, indent=4)
bot_info["room"] = bot.roomname()
bot_info["password"] = bot.password()
bot_info["nick"] = bot.nick()
bot_info["created_in"] = bot.created_in
bot_info["created_by"] = bot.created_by
logger.info("Saved bots.")
bots.append(bot_info)
def load(self, filename):
"""
load(filename) -> None
with open(self.bots_file, "w") as f:
json.dump(bots, f)
Load bots from a file.
Creates the bots and starts them.
"""
logger.info("Loading bots from {}".format(filename))
try:
logger.debug("Reading file")
with open(filename) as f:
bots = json.load(f)
except FileNotFoundError:
logger.warning("File {} not found.".format(filename))
else:
logger.debug("Bot info: {}".format(bots))
for bot_info in bots:
try:
self.create(bot_info["nick"], bot_info["room"], bot_info["password"],
bot_info["creator"], bot_info["create_room"],
bot_info["create_time"]).load(bot_info["data"])
except CreateBotException as err:
logger.warning("Creating bot failed: {}.".format(err))
logger.info("Loaded bots.")
def interactive(self):
"""
interactive() -> None
Start interactive mode that allows you to manage bots using commands.
Command list:
[NYI]
"""
pass

View file

@ -6,50 +6,38 @@ class Callbacks():
def __init__(self):
self._callbacks = {}
def add(self, event, callback, *args, **kwargs):
def add(self, event, callback):
"""
add(event, callback, *args, **kwargs) -> None
add(event, callback) -> None
Add a function to be called on event.
The function will be called with *args and **kwargs.
Certain arguments might be added, depending on the event.
Certain arguments might be added on call, depending on the event.
"""
if not event in self._callbacks:
self._callbacks[event] = []
callback_info = {
"callback": callback,
"args": args,
"kwargs": kwargs
}
self._callbacks[event].append(callback_info)
self._callbacks[event].append(callback)
def remove(self, event):
"""
remove(event) -> None
Remove all callbacks attached to that event.
Remove all callbacks added to that event.
"""
if event in self._callbacks:
del self._callbacks[event]
def call(self, event, *args):
def call(self, event, *args, **kwargs):
"""
call(event) -> None
Call all callbacks subscribed to the event with *args and the arguments specified when the
callback was added.
Call all callbacks subscribed to the event with the arguments passed to this function.
"""
if event in self._callbacks:
for c_info in self._callbacks[event]:
c = c_info["callback"]
args = c_info["args"] + args
kwargs = c_info["kwargs"]
for c in self._callbacks:
c(*args, **kwargs)
def exists(self, event):

View file

@ -1,229 +0,0 @@
import json
import time
import threading
import websocket
from websocket import WebSocketException as WSException
from . import callbacks
class Connection():
"""
Stays connected to a room in its own thread.
Callback functions are called when a packet is received.
Callbacks:
- all the message types from api.euphoria.io
These pass the packet data as argument to the called functions.
The other callbacks don't pass any special arguments.
- "connect"
- "disconnect"
- "stop"
"""
ROOM_FORMAT = "wss://euphoria.io/room/{}/ws"
def __init__(self, room, url_format=None):
"""
room - name of the room to connect to
"""
self.room = room
if not url_format:
url_format = self.ROOM_FORMAT
self._url = url_format.format(self.room)
self._stopping = False
self._ws = None
self._thread = None
self._send_id = 0
self._callbacks = callbacks.Callbacks()
self._id_callbacks = callbacks.Callbacks()
def _connect(self, tries=-1, delay=10):
"""
_connect(tries, delay) -> bool
tries - maximum number of retries
-1 -> retry indefinitely
Returns True on success, False on failure.
Connect to the room.
"""
while tries != 0:
try:
self._ws = websocket.create_connection(
self._url,
enable_multithread=True
)
self._callbacks.call("connect")
return True
except WSException:
if tries > 0:
tries -= 1
if tries != 0:
time.sleep(delay)
return False
def disconnect(self):
"""
disconnect() -> None
Reconnect to the room.
WARNING: To completely disconnect, use stop().
"""
if self._ws:
self._ws.close()
self._ws = None
self._callbacks.call("disconnect")
def launch(self):
"""
launch() -> Thread
Connect to the room and spawn a new thread running run.
"""
if self._connect(tries=1):
self._thread = threading.Thread(target=self._run,
name="{}-{}".format(self.room, int(time.time())))
self._thread.start()
return self._thread
else:
self.stop()
def _run(self):
"""
_run() -> None
Receive messages.
"""
while not self._stopping:
try:
self._handle_json(self._ws.recv())
except (WSException, ConnectionResetError):
if not self._stopping:
self.disconnect()
self._connect()
def stop(self):
"""
stop() -> None
Close the connection to the room.
Joins the thread launched by self.launch().
"""
self._stopping = True
self.disconnect()
self._callbacks.call("stop")
if self._thread and self._thread != threading.current_thread():
self._thread.join()
def next_id(self):
"""
next_id() -> id
Returns the id that will be used for the next package.
"""
return str(self._send_id)
def add_callback(self, ptype, callback, *args, **kwargs):
"""
add_callback(ptype, callback, *args, **kwargs) -> None
Add a function to be called when a packet of type ptype is received.
"""
self._callbacks.add(ptype, callback, *args, **kwargs)
def add_id_callback(self, pid, callback, *args, **kwargs):
"""
add_id_callback(pid, callback, *args, **kwargs) -> None
Add a function to be called when a packet with id pid is received.
"""
self._id_callbacks.add(pid, callback, *args, **kwargs)
def add_next_callback(self, callback, *args, **kwargs):
"""
add_next_callback(callback, *args, **kwargs) -> None
Add a function to be called when the answer to the next message sent is received.
"""
self._id_callbacks.add(self.next_id(), callback, *args, **kwargs)
def _handle_json(self, data):
"""
handle_json(data) -> None
Handle incoming 'raw' data.
"""
packet = json.loads(data)
self._handle_packet(packet)
def _handle_packet(self, packet):
"""
_handle_packet(ptype, data) -> None
Handle incoming packets
"""
if "data" in packet:
data = packet["data"]
else:
data = None
if "error" in packet:
error = packet["error"]
else:
error = None
self._callbacks.call(packet["type"], data, error)
if "id" in packet:
self._id_callbacks.call(packet["id"], data, error)
self._id_callbacks.remove(packet["id"])
def _send_json(self, data):
"""
_send_json(data) -> None
Send 'raw' json.
"""
if self._ws:
try:
self._ws.send(json.dumps(data))
except WSException:
self.disconnect()
def send_packet(self, ptype, **kwargs):
"""
send_packet(ptype, **kwargs) -> None
Send a formatted packet.
"""
packet = {
"type": ptype,
"data": kwargs or None,
"id": str(self._send_id)
}
self._send_id += 1
self._send_json(packet)

View file

@ -2,40 +2,10 @@ class YaboliException(Exception):
"""
Generic yaboli exception class.
"""
pass
class BotManagerException(YaboliException):
class CreateBotException(YaboliException):
"""
Generic BotManager exception class.
Raised when a bot could not be created.
"""
pass
class CreateBotException(BotManagerException):
"""
This exception will be raised when BotManager could not create a bot.
"""
pass
class BotNotFoundException(BotManagerException):
"""
This exception will be raised when BotManager could not find a bot.
"""
pass
class BotException(YaboliException):
"""
Generic Bot exception class.
"""
pass
class ParseMessageException(BotException):
"""
This exception will be raised when a failure parsing a message occurs.
"""
pass

43
yaboli/mention.py Normal file
View file

@ -0,0 +1,43 @@
class Mention(str):
"""
A class to compare @mentions to nicks and other @mentions
"""
def mentionable(nick):
"""
A mentionable version of the nick.
Add an "@" in front to mention someone on euphoria.
"""
# return "".join(c for c in nick if not c in ".!?;&<'\"" and not c.isspace())
return "".join(filter(lambda c: c not in ".!?;&<'\"" and not c.isspace(), nick))
def __new__(cls, nick):
return str.__new__(cls, Mention.mentionable(nick))
def __add__(self, other):
return Mention(str(self) + other)
def __mod__(self, other):
return Mention(str(self) % other)
def __mul__(self, other):
return Mention(str(self)*other)
def __repr__(self):
return "@" + super().__repr__()
def __radd__(self, other):
return Mention(other + str(self))
def __rmul__(self, other):
return Mention(other*str(self))
def format(self, *args, **kwargs):
return Mention(str(self).format(*args, **kwargs))
def format_map(self, *args, **kwargs):
return Mention(str(self).format_map(*args, **kwargs))
def replace(self, *args, **kwargs):
return Mention(str(self).replace(*args, **kwargs))

View file

@ -1,99 +0,0 @@
import time
from . import session
class Message():
"""
This class keeps track of message details.
"""
def __init__(self, id, time, sender, content, parent=None, edited=None, deleted=None,
truncated=None):
"""
id - message id
time - time the message was sent (epoch)
sender - session of the sender
content - content of the message
parent - id of the parent message, or None
edited - time of last edit (epoch)
deleted - time of deletion (epoch)
truncated - message was truncated
"""
self.id = id
self.time = time
self.sender = sender
self.content = content
self.parent = parent
self.edited = edited
self.deleted = deleted
self.truncated = truncated
@classmethod
def from_data(self, data):
"""
Creates and returns a message created from the data.
NOTE: This also creates a session object using the data in "sender".
data - a euphoria message: http://api.euphoria.io/#message
"""
sender = session.Session.from_data(data["sender"])
parent = data["parent"] if "parent" in data else None
edited = data["edited"] if "edited" in data else None
deleted = data["deleted"] if "deleted" in data else None
truncated = data["truncated"] if "truncated" in data else None
return self(
data["id"],
data["time"],
sender,
data["content"],
parent=parent,
edited=edited,
deleted=deleted,
truncated=truncated
)
def time_formatted(self, date=False):
"""
time_formatted(date=False) -> str
date - include date in format
Time in a readable format:
With date: YYYY-MM-DD HH:MM:SS
Without date: HH:MM:SS
"""
if date:
return time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(self.time))
else:
return time.strftime("%H:%M:%S", time.gmtime(self.time))
def is_edited(self):
"""
is_edited() -> bool
Has this message been edited?
"""
return True if self.edited is not None else False
def is_deleted(self):
"""
is_deleted() -> bool
Has this message been deleted?
"""
return True if self.deleted is not None else False
def is_truncated(self):
"""
is_truncated() -> bool
Has this message been truncated?
"""
return True if self.truncated is not None else False

View file

@ -1,154 +0,0 @@
import operator
from . import message
class Messages():
"""
Message storage class which preserves thread hierarchy.
"""
def __init__(self, message_limit=500):
"""
message_limit - maximum amount of messages that will be stored at a time
None - no limit
"""
self.message_limit = message_limit
self._by_id = {}
self._by_parent = {}
def _sort(self, msgs):
"""
_sort(messages) -> None
Sorts a list of messages by their id, in place.
"""
msgs.sort(key=operator.attrgetter("id"))
def add_from_data(self, data):
"""
add_from_data(data) -> None
Create a message from "raw" data and add it.
"""
mes = message.Message.from_data(data)
self.add(mes)
def add(self, mes):
"""
add(message) -> None
Add a message to the structure.
"""
self.remove(mes.id)
self._by_id[mes.id] = mes
if mes.parent:
if not mes.parent in self._by_parent:
self._by_parent[mes.parent] = []
self._by_parent[mes.parent].append(mes)
if self.message_limit and len(self._by_id) > self.message_limit:
self.remove(self.get_oldest().id)
def remove(self, mid):
"""
remove(message_id) -> None
Remove a message from the structure.
"""
mes = self.get(mid)
if mes:
if mes.id in self._by_id:
self._by_id.pop(mes.id)
parent = self.get_parent(mes.id)
if parent and mes in self.get_children(parent.id):
self._by_parent[mes.parent].remove(mes)
def remove_all(self):
"""
remove_all() -> None
Removes all messages.
"""
self._by_id = {}
self._by_parent = {}
def get(self, mid):
"""
get(message_id) -> Message
Returns the message with the given id, if found.
"""
if mid in self._by_id:
return self._by_id[mid]
def get_oldest(self):
"""
get_oldest() -> Message
Returns the oldest message, if found.
"""
oldest = None
for mid in self._by_id:
if oldest is None or mid < oldest:
oldest = mid
return self.get(oldest)
def get_youngest(self):
"""
get_youngest() -> Message
Returns the youngest message, if found.
"""
youngest = None
for mid in self._by_id:
if youngest is None or mid > youngest:
youngest = mid
return self.get(youngest)
def get_parent(self, mid):
"""
get_parent(message_id) -> str
Returns the message's parent, if found.
"""
mes = self.get(mid)
if mes:
return self.get(mes.parent)
def get_children(self, mid):
"""
get_children(message_id) -> list
Returns a sorted list of children of the given message, if found.
"""
if mid in self._by_parent:
children = self._by_parent[mid][:]
self._sort(children)
return children
def get_top_level(self):
"""
get_top_level() -> list
Returns a sorted list of top-level messages.
"""
msgs = [self.get(mid) for mid in self._by_id if not self.get(mid).parent]
self._sort(msgs)
return msgs

View file

@ -1,617 +0,0 @@
import time
from . import connection
from . import message
from . import messages
from . import session
from . import sessions
from . import callbacks
class Room():
"""
Connects to and provides more abstract access to a room on euphoria.
callback (values passed) - description
----------------------------------------------------------------------------------
delete (message) - message has been deleted
edit (message) - message has been edited
identity - own session or nick has changed
join (session) - user has joined the room
message (message) - message has been sent
messages - message data has changed
nick (session, old, new) - user has changed their nick
part (session) - user has left the room
ping - ping event has happened
room - room info has changed
sessions - session data has changed
change - room has been changed
"""
def __init__(self, room=None, nick=None, password=None, message_limit=500):
"""
room - name of the room to connect to
nick - nick to assume, None -> no nick
password - room password (in case the room is private)
message_limit - maximum amount of messages that will be stored at a time
None - no limit
"""
self.room = room
self.password = password
self.room_is_private = None
self.pm_with_nick = None
self.pm_with_user = None
self.nick = nick
self.session = None
self.message_limit = message_limit
self.ping_last = 0
self.ping_next = 0
self.ping_offset = 0 # difference between server and local time
self._messages = None
self._sessions = None
self._callbacks = callbacks.Callbacks()
self._con = None
if self.room:
self.change(self.room, password=self.password)
def launch(self):
"""
launch() -> Thread
Open connection in a new thread (see connection.Connection.launch).
"""
return self._con.launch()
def stop(self):
"""
stop() -> None
Close connection to room.
"""
self._con.stop()
def change(self, room, password=None):
"""
change(room) -> None
Leave current room (if already connected) and join new room.
Clears all messages and sessions.
A call to launch() is necessary to start a new thread again.
"""
if self._con:
self._con.stop()
self.room = room
self.password = password
self.room_is_private = None
self.pm_with_nick = None
self.pm_with_user = None
self.session = None
self.ping_last = 0
self.ping_next = 0
self.ping_offset = 0 # difference between server and local time
self._messages = messages.Messages(message_limit=self.message_limit)
self._sessions = sessions.Sessions()
self._con = connection.Connection(self.room)
self._con.add_callback("bounce-event", self._handle_bounce_event)
self._con.add_callback("disconnect-event", self._handle_disconnect_event)
self._con.add_callback("hello-event", self._handle_hello_event)
self._con.add_callback("join-event", self._handle_join_event)
self._con.add_callback("network-event", self._handle_network_event)
self._con.add_callback("nick-event", self._handle_nick_event)
self._con.add_callback("edit-message-event", self._handle_edit_message_event)
self._con.add_callback("part-event", self._handle_part_event)
self._con.add_callback("ping-event", self._handle_ping_event)
self._con.add_callback("send-event", self._handle_send_event)
self._con.add_callback("snapshot-event", self._handle_snapshot_event)
self._callbacks.call("change")
def add_callback(self, event, callback, *args, **kwargs):
"""
add_callback(ptype, callback, *args, **kwargs) -> None
Add a function to be called when a certain event happens.
"""
self._callbacks.add(event, callback, *args, **kwargs)
def get_msg(self, mid):
"""
get_msg(message_id) -> Message
Returns the message with the given id, if found.
"""
return self._messages.get(mid)
def get_msg_parent(self, mid):
"""
get_msg_parent(message_id) -> Message
Returns the message's parent, if found.
"""
return self._messages.get_parent(mid)
def get_msg_children(self, mid):
"""
get_msg_children(message_id) -> list
Returns a sorted list of children of the given message, if found.
"""
return self._messages.get_children(mid)
def get_msg_top_level(self):
"""
get_msg_top_level() -> list
Returns a sorted list of top-level messages.
"""
return self._messages.get_top_level()
def get_msg_oldest(self):
"""
get_msg_oldest() -> Message
Returns the oldest message, if found.
"""
return self._messages.get_oldest()
def get_msg_youngest(self):
"""
get_msg_youngest() -> Message
Returns the youngest message, if found.
"""
return self._messages.get_youngest()
def get_session(self, sid):
"""
get_session(session_id) -> Session
Returns the session with that id.
"""
return self._sessions.get(sid)
def get_sessions(self):
"""
get_sessions() -> list
Returns the full list of sessions.
"""
return self._sessions.get_all()
def get_people(self):
"""
get_people() -> list
Returns a list of all non-bot and non-lurker sessions.
"""
return self._sessions.get_people()
def get_accounts(self):
"""
get_accounts() -> list
Returns a list of all logged-in sessions.
"""
return self._sessions.get_accounts()
def get_agents(self):
"""
get_agents() -> list
Returns a list of all sessions who are not signed into an account and not bots or lurkers.
"""
return self._sessions.get_agents()
def get_bots(self):
"""
get_bots() -> list
Returns a list of all bot sessions.
"""
return self._sessions.get_bots()
def get_lurkers(self):
"""
get_lurkers() -> list
Returns a list of all lurker sessions.
"""
return self._sessions.get_lurkers()
def set_nick(self, nick):
"""
set_nick(nick) -> None
Change your nick.
"""
self.nick = nick
if not self.session or self.session.name != self.nick:
self._con.add_next_callback(self._handle_nick_reply)
self._con.send_packet("nick", name=nick)
def mentionable(self, nick=None):
"""
mentionable()
A mentionable version of the nick.
The nick defaults to the bot's nick.
"""
if nick is None:
nick = self.nick
return "".join(c for c in nick if not c in ".!?;&<'\"" and not c.isspace())
def send_message(self, content, parent=None):
"""
send_message(content, parent) -> None
Send a message.
"""
self._con.add_next_callback(self._handle_send_reply)
self._con.send_packet("send", content=content, parent=parent)
def authenticate(self, password=None):
"""
authenticate(passsword) -> None
Try to authenticate so you can enter the room.
"""
self.password = password
self._con.add_next_callback(self._handle_auth_reply)
self._con.send_packet("auth", type="passcode", passcode=self.password)
def update_sessions(self):
"""
update_sessions() -> None
Resets and then updates the list of sessions.
"""
self._con.add_next_callback(self._handle_who_reply)
self._con.send_packet("who")
def load_msgs(self, number=50):
"""
load_msgs(number) -> None
Request a certain number of older messages from the server.
"""
self._con.add_next_callback(self._handle_log_reply)
self._con.send_packet("log", n=number, before=self.get_msg_oldest().id)
def load_msg(self, mid):
"""
load_msg(message_id) -> None
Request an untruncated version of the message with that id.
"""
self._con.add_next_callback(self._handle_get_message_reply)
self._con.send_packet("get-message", id=mid)
# ----- HANDLING OF EVENTS -----
def _handle_connect(self):
"""
TODO
"""
self._callbacks.call("connect")
def _handle_disconnect(self):
"""
TODO
"""
self._callbacks.call("disconnect")
def _handle_stop(self):
"""
TODO
"""
self._callbacks.call("stop")
def _handle_bounce_event(self, data, error):
"""
TODO
"""
if error:
self._callbacks.call("error", "bounce-event", error)
self.stop()
return
if self.password is not None:
self.authenticate(self.password)
else:
self.stop()
def _handle_disconnect_event(self, data, error):
"""
TODO
"""
if error:
self._callbacks.call("error", "disconnect-event", error)
self.stop()
return
self._con.disconnect()
def _handle_hello_event(self, data, error):
"""
TODO
"""
if error:
self._callbacks.call("error", "hello-event", error)
self.stop()
return
self.session = session.Session.from_data(data["session"])
self._sessions.add(self.session)
self._callbacks.call("identity")
self._callbacks.call("sessions")
self.room_is_private = data["room_is_private"]
self._callbacks.call("room")
def _handle_join_event(self, data, error):
"""
TODO
"""
if error:
self._callbacks.call("error", "join-event", error)
self.update_sessions()
return
ses = session.Session.from_data(data)
self._sessions.add(ses)
self._callbacks.call("join", ses)
self._callbacks.call("sessions")
def _handle_network_event(self, data, error):
"""
TODO
"""
if error:
self._callbacks.call("error", "network-event", error)
return
if data["type"] == "partition":
self._sessions.remove_on_network_partition(data["server_id"], data["server_era"])
self._callbacks.call("sessions")
def _handle_nick_event(self, data, error):
"""
TODO
"""
if error:
self._callbacks.call("error", "nick-event", error)
self.update_sessions()
return
ses = self.get_session(data["session_id"])
if ses:
ses.name = data["to"]
self._callbacks.call("nick", ses, data["from"], data["to"])
self._callbacks.call("sessions")
def _handle_edit_message_event(self, data, error):
"""
TODO
"""
if error:
self._callbacks.call("error", "edit-message-event", error)
return
msg = message.Message.from_data(data)
if msg:
self._messages.add(msg)
if msg.deleted:
self._callbacks.call("delete", msg)
elif msg.edited:
self._callbacks.call("edit", msg)
self._callbacks.call("messages")
def _handle_part_event(self, data, error):
"""
TODO
"""
if error:
self._callbacks.call("error", "part-event", error)
self.update_sessions()
return
ses = session.Session.from_data(data)
if ses:
self._sessions.remove(ses.session_id)
self._callbacks.call("part", ses)
self._callbacks.call("sessions")
def _handle_ping_event(self, data, error):
"""
TODO
"""
if error:
self._callbacks.call("error", "ping-event", error)
return
self.ping_last = data["time"]
self.ping_next = data["next"]
self.ping_offset = self.ping_last - time.time()
self._con.send_packet("ping-reply", time=self.ping_last)
self._callbacks.call("ping")
def _handle_send_event(self, data, error):
"""
TODO
"""
if error:
self._callbacks.call("error", "send-event", error)
return
msg = message.Message.from_data(data)
self._callbacks.call("message", msg)
self._messages.add(msg)
self._callbacks.call("messages")
def _handle_snapshot_event(self, data, error):
"""
TODO
"""
if error:
self._callbacks.call("error", "snapshot-event", error)
self.stop()
return
self.set_nick(self.nick)
if "pm_with_nick" in data or "pm_with_user_id" in data:
if "pm_with_nick" in data:
self.pm_with_nick = data["pm_with_nick"]
if "pm_with_user_id" in data:
self.pm_with_user_id = data["pm_with_user_id"]
self._callbacks.call("room")
self._sessions.remove_all()
for sesdata in data["listing"]:
self._sessions.add_from_data(sesdata)
self._callbacks.call("sessions")
self._messages.remove_all()
for msgdata in data["log"]:
self._messages.add_from_data(msgdata)
self._callbacks.call("messages")
# ----- HANDLING OF REPLIES -----
def _handle_auth_reply(self, data, error):
"""
TODO
"""
if error:
self._callbacks.call("error", "auth-reply", error)
self.stop()
return
if not data["success"]:
self._con.stop()
def _handle_get_message_reply(self, data, error):
"""
TODO
"""
if error:
self._callbacks.call("error", "get-message-reply", error)
return
self._messages.add_from_data(data)
self._callbacks.call("messages")
def _handle_log_reply(self, data, error):
"""
TODO
"""
if error:
self._callbacks.call("error", "log-reply", error)
return
for msgdata in data["log"]:
self._messages.add_from_data(msgdata)
self._callbacks.call("messages")
def _handle_nick_reply(self, data, error):
"""
TODO
"""
if error:
self._callbacks.call("error", "nick-reply", error)
return
if "to" in data:
self.session.name = self.nick
self._callbacks.call("identity")
if data["to"] != self.nick:
self.set_nick(self.nick)
def _handle_send_reply(self, data, error):
"""
TODO
"""
if error:
self._callbacks.call("error", "send-reply", error)
return
self._messages.add_from_data(data)
self._callbacks.call("messages")
def _handle_who_reply(self, data, error):
"""
TODO
"""
if error:
self._callbacks.call("error", "who-reply", error)
return
self._sessions.remove_all()
for sesdata in data["listing"]:
self._sessions.add_from_data(sesdata)
self._callbacks.call("sessions")

View file

@ -1,71 +0,0 @@
class Session():
"""
This class keeps track of session details.
"""
def __init__(self, id, name, server_id, server_era, session_id, is_staff=None, is_manager=None):
"""
id - agent/account id
name - name of the client when the SessionView was captured
server_id - id of the server
server_era - era of the server
session_id - session id (unique across euphoria)
is_staff - client is staff
is_manager - client is manager
"""
self.id = id
self.name = name
self.server_id = server_id
self.server_era = server_era
self.session_id = session_id
self.staff = is_staff
self.manager = is_manager
@classmethod
def from_data(self, data):
"""
Creates and returns a session created from the data.
data - a euphoria SessionView: http://api.euphoria.io/#sessionview
"""
is_staff = data["is_staff"] if "is_staff" in data else None
is_manager = data["is_manager"] if "is_manager" in data else None
return self(
data["id"],
data["name"],
data["server_id"],
data["server_era"],
data["session_id"],
is_staff,
is_manager
)
def session_type(self):
"""
session_type() -> str
The session's type (bot, account, agent).
"""
return self.id.split(":")[0]
def is_staff(self):
"""
is_staff() -> bool
Is a user staff?
"""
return self.staff and True or False
def is_manager(self):
"""
is_manager() -> bool
Is a user manager?
"""
return self.staff and True or False

View file

@ -1,146 +0,0 @@
from . import session
class Sessions():
"""
Keeps track of sessions.
"""
def __init__(self):
"""
TODO
"""
self._sessions = {}
def add_from_data(self, data):
"""
add_raw(data) -> None
Create a session from "raw" data and add it.
"""
ses = session.Session.from_data(data)
self._sessions[ses.session_id] = ses
def add(self, ses):
"""
add(session) -> None
Add a session.
"""
self._sessions[ses.session_id] = ses
def get(self, sid):
"""
get(session_id) -> Session
Returns the session with that id.
"""
if sid in self._sessions:
return self._sessions[sid]
def remove(self, sid):
"""
remove(session) -> None
Remove a session.
"""
if sid in self._sessions:
self._sessions.pop(sid)
def remove_on_network_partition(self, server_id, server_era):
"""
remove_on_network_partition(server_id, server_era) -> None
Removes all sessions matching the server_id/server_era combo.
http://api.euphoria.io/#network-event
"""
sesnew = {}
for sid in self._sessions:
ses = self.get(sid)
if not (ses.server_id == server_id and ses.server_era == server_era):
sesnew[sid] = ses
self._sessions = sesnew
def remove_all(self):
"""
remove_all() -> None
Removes all sessions.
"""
self._sessions = {}
def get_all(self):
"""
get_all() -> list
Returns the full list of sessions.
"""
return [ses for sid, ses in self._sessions.items()]
def get_people(self):
"""
get_people() -> list
Returns a list of all non-bot and non-lurker sessions.
"""
# not a list comprehension because that would span several lines too
people = []
for sid in self._sessions:
ses = self.get(sid)
if ses.session_type() in ["agent", "account"] and ses.name:
people.append(ses)
return people
def _get_by_type(self, tp):
"""
_get_by_type(session_type) -> list
Returns a list of all non-lurker sessions with that type.
"""
return [ses for sid, ses in self._sessions.items()
if ses.session_type() == tp and ses.name]
def get_accounts(self):
"""
get_accounts() -> list
Returns a list of all logged-in sessions.
"""
return self._get_by_type("account")
def get_agents(self):
"""
get_agents() -> list
Returns a list of all sessions who are not signed into an account and not bots or lurkers.
"""
return self._get_by_type("agent")
def get_bots(self):
"""
get_bots() -> list
Returns a list of all bot sessions.
"""
return self._get_by_type("bot")
def get_lurkers(self):
"""
get_lurkers() -> list
Returns a list of all lurker sessions.
"""
return [ses for sid, ses in self._sessions.items() if not ses.name]