Create separate wtf module/bot
This commit is contained in:
commit
79a5b2d087
3 changed files with 206 additions and 0 deletions
6
.gitignore
vendored
Normal file
6
.gitignore
vendored
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
**/__pycache__
|
||||
yaboli
|
||||
websockets
|
||||
*.cookie
|
||||
*.conf
|
||||
*.db
|
||||
166
wtf.py
Normal file
166
wtf.py
Normal file
|
|
@ -0,0 +1,166 @@
|
|||
import asyncio
|
||||
import configparser
|
||||
import logging
|
||||
import re
|
||||
|
||||
import yaboli
|
||||
from yaboli.utils import *
|
||||
|
||||
|
||||
logger = logging.getLogger("wtf")
|
||||
|
||||
class WtfDB(yaboli.Database):
|
||||
def initialize(self, db):
|
||||
with db:
|
||||
db.execute((
|
||||
"CREATE TABLE IF NOT EXISTS acronyms ("
|
||||
"acronym_id INTEGER PRIMARY KEY, "
|
||||
"acronym TEXT NOT NULL, "
|
||||
"explanation TEXT NOT NULL, "
|
||||
"author TEXT NOT NULL, "
|
||||
"deleted BOOLEAN NOT NULL DEFAULT 0"
|
||||
")"
|
||||
))
|
||||
db.create_function("p_lower", 1, str.lower)
|
||||
|
||||
@yaboli.operation
|
||||
def add(self, db, acronym, explanation, author):
|
||||
with db:
|
||||
db.execute((
|
||||
"INSERT INTO acronyms (acronym, explanation, author) "
|
||||
"VALUES (?,?,?)"
|
||||
), (acronym, explanation, author))
|
||||
|
||||
@yaboli.operation
|
||||
def find(self, db, acronym):
|
||||
c = db.execute((
|
||||
"SELECT acronym, explanation FROM acronyms "
|
||||
"WHERE NOT deleted AND p_lower(acronym) = ?"
|
||||
), (acronym.lower(),))
|
||||
return c.fetchall()
|
||||
|
||||
@yaboli.operation
|
||||
def find_full(self, db, acronym):
|
||||
c = db.execute((
|
||||
"SELECT acronym_id, acronym, explanation, author FROM acronyms "
|
||||
"WHERE NOT deleted AND p_lower(acronym) = ?"
|
||||
), (acronym.lower(),))
|
||||
return c.fetchall()
|
||||
|
||||
@yaboli.operation
|
||||
def delete(self, db, acronym_id):
|
||||
with db:
|
||||
db.execute("UPDATE acronyms SET deleted = 1 WHERE acronym_id = ?", (acronym_id,))
|
||||
|
||||
class Wtf:
|
||||
DESCRIPTION = (
|
||||
"'wtf' is a database of acronyms and initialisms."
|
||||
" It is inspired by the linux wtf program and uses its acronyms,"
|
||||
" in addition to user generated ones.\n"
|
||||
)
|
||||
COMMANDS = (
|
||||
"!wtf is <acronym> - look up an acronym\n"
|
||||
"!wtf add <acronym> <explanation> - add a new acronym\n"
|
||||
"!wtf detail <acronym> - shows more info about the acronym\n"
|
||||
"!wtf delete <id> - delete acronym with corresponding id"
|
||||
" (Look up the id using !wtf detail)\n"
|
||||
)
|
||||
CREDITS = "Created by @Garmy using github.com/Garmelon/yaboli\n"
|
||||
|
||||
RE_IS = r"\s*is\s+(\S+)\s*"
|
||||
RE_ADD = r"\s*add\s+(\S+)\s+(.*)"
|
||||
RE_DETAIL = r"\s*detail\s+(\S+)\s*"
|
||||
RE_DELETE = r"\s*delete\s+(\d+)\s*"
|
||||
|
||||
def __init__(self, dbfile):
|
||||
self.db = WtfDB(dbfile)
|
||||
|
||||
@yaboli.command("wtf")
|
||||
async def command_wtf(self, room, message, argstr):
|
||||
match_is = re.fullmatch(self.RE_IS, argstr)
|
||||
match_add = re.fullmatch(self.RE_ADD, argstr)
|
||||
match_detail = re.fullmatch(self.RE_DETAIL, argstr)
|
||||
match_delete = re.fullmatch(self.RE_DELETE, argstr)
|
||||
|
||||
if match_is:
|
||||
acronym = match_is.group(1)
|
||||
explanations = await self.db.find(acronym)
|
||||
if explanations:
|
||||
# Acronym, Explanation
|
||||
lines = [f"{a} - {e}" for a, e in explanations]
|
||||
text = "\n".join(lines)
|
||||
await room.send(text, message.mid)
|
||||
else:
|
||||
await room.send(f"{acronym!r} not found.", message.mid)
|
||||
|
||||
elif match_add:
|
||||
acronym = match_add.group(1)
|
||||
explanation = match_add.group(2).strip()
|
||||
await self.db.add(acronym, explanation, message.sender.nick)
|
||||
await room.send(f"Added acronym: {acronym} - {explanation}", message.mid)
|
||||
logger.INFO(f"{mention(message.sender.nick)} added acronym: {acronym} - {explanation}")
|
||||
|
||||
elif match_detail:
|
||||
acronym = match_detail.group(1)
|
||||
explanations = await self.db.find_full(acronym)
|
||||
if explanations:
|
||||
# Id, Acronym, Explanation, aUthor
|
||||
lines = [f"{i}: {a} - {e} (by {mention(u, ping=False)})" for i, a, e, u in explanations]
|
||||
text = "\n".join(lines)
|
||||
await room.send(text, message.mid)
|
||||
else:
|
||||
await room.send(f"{acronym!r} not found.", message.mid)
|
||||
|
||||
elif match_delete:
|
||||
aid = match_delete.group(1)
|
||||
await self.db.delete(aid)
|
||||
await room.send(f"Deleted.", message.mid)
|
||||
logger.INFO(f"{mention(message.sender.nick)} deleted acronym with id {aid}")
|
||||
|
||||
else:
|
||||
text = "Usage:\n" + self.COMMANDS
|
||||
await room.send(text, message.mid)
|
||||
|
||||
class WtfBot(yaboli.Bot):
|
||||
SHORT_HELP = "An acronym database"
|
||||
LONG_HELP = Wtf.DESCRIPTION + Wtf.COMMANDS + Wtf.CREDITS
|
||||
|
||||
def __init__(self, nick, wtfdbfile, cookiefile=None):
|
||||
super().__init__(nick, cookiefile=cookiefile)
|
||||
self.wtf = Wtf(wtfdbfile)
|
||||
|
||||
async def on_command_specific(self, room, message, command, nick, argstr):
|
||||
if similar(nick, room.session.nick) and not argstr:
|
||||
await self.botrulez_ping(room, message, command)
|
||||
await self.botrulez_help(room, message, command, text=self.LONG_HELP)
|
||||
await self.botrulez_uptime(room, message, command)
|
||||
await self.botrulez_kill(room, message, command)
|
||||
await self.botrulez_restart(room, message, command)
|
||||
|
||||
async def on_command_general(self, room, message, command, argstr):
|
||||
if not argstr:
|
||||
await self.botrulez_ping(room, message, command)
|
||||
await self.botrulez_help(room, message, command, text=self.SHORT_HELP)
|
||||
|
||||
await self.wtf.command_wtf(room, message, command, argstr)
|
||||
|
||||
def main(configfile):
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
config = configparser.ConfigParser(allow_no_value=True)
|
||||
config.read(configfile)
|
||||
|
||||
nick = config.get("general", "nick")
|
||||
cookiefile = config.get("general", "cookiefile", fallback=None)
|
||||
wtfdbfile = config.get("general", "wtfdbfile", fallback=None)
|
||||
bot = WtfBot(nick, wtfdbfile, cookiefile=cookiefile)
|
||||
|
||||
for room, password in config.items("rooms"):
|
||||
if not password:
|
||||
password = None
|
||||
bot.join_room(room, password=password)
|
||||
|
||||
asyncio.get_event_loop().run_forever()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main("wtf.conf")
|
||||
34
wtf_import.py
Executable file
34
wtf_import.py
Executable file
|
|
@ -0,0 +1,34 @@
|
|||
#!/bin/env python3
|
||||
|
||||
# A short script to import the wtf program's "databases" of acronyms.
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
|
||||
import wtf
|
||||
|
||||
|
||||
async def import_file(db, acronymfile):
|
||||
with open(acronymfile) as f:
|
||||
for line in f:
|
||||
s = line.split("\t", 1)
|
||||
if len(s) == 2:
|
||||
acronym, explanation = s
|
||||
explanation = explanation.strip()
|
||||
print(f"{acronym} - {explanation}")
|
||||
await db.add(acronym, explanation, "importer")
|
||||
|
||||
def main(dbfile, acronymfiles):
|
||||
db = wtf.WtfDB(dbfile)
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
for acronymfile in acronymfiles:
|
||||
loop.run_until_complete(import_file(db, acronymfile))
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) >= 3:
|
||||
main(sys.argv[1], sys.argv[2:])
|
||||
else:
|
||||
print(" USAGE:")
|
||||
print(f"{sys.argv[0]} <dbfile> <acronymfile> [<acronymfile> ...]")
|
||||
exit(1)
|
||||
Loading…
Add table
Add a link
Reference in a new issue