glados-matrix/plugins/plugin_manager.py
2025-04-23 08:10:40 +00:00

101 lines
4.0 KiB
Python

#!/usr/bin/env python3
import importlib
import inspect
from collections import OrderedDict
from plugins.plugin import Plugin
class PluginManager:
def __init__(self, bot, config):
self.bot = bot
self.config = config
self.plugins = OrderedDict()
self.commands = OrderedDict()
self.command_names = []
def load_plugin(self, name):
mod = importlib.import_module('plugins.{0}'.format(name))
classes = inspect.getmembers(mod, inspect.isclass)
for class_name, c in classes:
if c != Plugin and issubclass(c, Plugin):
self.plugins[name] = c(self)
self.plugins[name]._module = mod
for method_name, method in inspect.getmembers(self.plugins[name], inspect.ismethod):
if getattr(method, '_plugin_command', False):
options = method._plugin_command['options']
command = options.pop('command', None)
if command is None:
command = ' '.join(method.__name__.split('_'))
self.add_command(command, method, **options)
self.plugins[name].config = self.config
# Only for debug
def reload_plugins(self):
plugin_names = self.plugins.keys()
self.deactivate_plugins()
importlib.invalidate_caches()
# Force rebuilding modules
for plugin in self.plugins.values():
importlib.reload(plugin._module)
self.plugins = OrderedDict()
self.commands = OrderedDict()
self.command_names = []
for plugin_name in plugin_names:
self.load_plugin(plugin_name)
self.activate_plugins()
def activate_plugins(self):
for k, plugin in self.plugins.items():
plugin.activate()
def deactivate_plugins(self):
for k, plugin in self.plugins.items():
plugin.deactivate()
def notify_message(self, room, event, private=False):
# for k, plugin in self.plugins.items():
# reply = plugin.notify_message(event, private)
# if reply:
# self.bot.send_message(mto=message['from'].full if private else message['from'].bare, mbody=reply, mtype='chat' if private else 'groupchat')
body = event['content']['body']
if body.startswith('!'):
command_found = False
for command in self.command_names:
command_chain = command.split(' ')
body_chain = body.split(' ')
if len(body_chain) >= len(command_chain) and body_chain[:len(command_chain)] == command_chain:
admin_needed = self.commands[command]['options'].get('admin', False)
if admin_needed and not self.is_sender_admin(event):
reply = "Unauthorized"
else:
args = [a for a in body[len(command):].strip().split(' ') if a]
f = self.commands[command]['function']
reply = f(room, event, args)
command_found = True
break
if not command_found:
reply = 'Unknown command'
if reply and isinstance(reply, str):
room.send_text(reply)
def is_sender_admin(self, event):
return event['sender'] in self.config.ADMINS
def is_admin_command(self, command):
return command in self.commands and self.commands[command]['options'].get('admin', False)
def add_command(self, command, function, **options):
# TODO: check the command doesn't exist. If not, try another name
command_name = '!{0}'.format(command)
class_name = inspect.getmro(function.__self__.__class__)[0].__name__
self.commands[command_name] = {'function': function, 'class': class_name, 'options': options}
self.command_names.append(command_name)
self.command_names.sort(key=len, reverse=True)