You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

393 lines
14 KiB

"""
.. module:: PyIRCBot
:synopsis: Main IRC bot class
.. moduleauthor:: Dave Pedu <dave@davepedu.com>
"""
import logging
import time
import sys
import traceback
from pyircbot.rpc import BotRPC
from pyircbot.irccore import IRCCore
import os.path
class PyIRCBot:
""":param botconfig: The configuration of this instance of the bot. Passed by main.py.
:type botconfig: dict
"""
version = "4.0.0-r03"
def __init__(self, botconfig):
self.log = logging.getLogger('PyIRCBot')
"""Reference to logger object"""
self.botconfig = botconfig
"""saved copy of the instance config"""
"""storage of imported modules"""
self.modules = {}
"""instances of modules"""
self.moduleInstances = {}
self.rpc = BotRPC(self)
"""Reference to BotRPC thread"""
self.irc = IRCCore()
"""IRC protocol class"""
self.irc.servers = self.botconfig["connection"]["servers"]
self.irc.port = self.botconfig["connection"]["port"]
self.irc.ipv6 = True if self.botconfig["connection"]["ipv6"]=="on" else False
self.irc.addHook("_DISCONNECT", self.connection_closed)
# legacy support
self.act_PONG = self.irc.act_PONG
self.act_USER = self.irc.act_USER
self.act_NICK = self.irc.act_NICK
self.act_JOIN = self.irc.act_JOIN
self.act_PRIVMSG = self.irc.act_PRIVMSG
self.act_MODE = self.irc.act_MODE
self.act_ACTION = self.irc.act_ACTION
self.act_KICK = self.irc.act_KICK
self.act_QUIT = self.irc.act_QUIT
self.get_nick = self.irc.get_nick
self.decodePrefix = IRCCore.decodePrefix
# Load modules
self.initModules()
# Connect to IRC
self.connect()
def connect(self):
try:
self.irc._connect()
except:
self.log.error("Pyircbot attempted to connect and failed!")
self.log.error(traceback.format_exc())
def loop(self):
self.irc.loop()
def disconnect(self, message, reconnect=True):
"""Send quit message and disconnect from IRC.
:param message: Quit message
:type message: str
:param reconnect: True causes a reconnection attempt to be made after the disconnect
:type reconnect: bool
"""
self.log.info("disconnect")
self.irc.kill(message=message, alive=reconnect)
def kill(self, sys_exit=True, message="Help! Another thread is killing me :("):
"""Shut down the bot violently
:param sys_exit: True causes sys.exit(0) to be called
:type sys_exit: bool
:param message: Quit message
:type message: str
"""
#Close all modules
self.closeAllModules()
self.irc.kill(message=message, alive=not sys_exit)
if sys_exit:
sys.exit(0)
def connection_closed(self, args, prefix, trailing):
"""Called when the socket is disconnected. We will want to reconnect. """
if self.irc.alive:
self.log.warning("Connection was lost. Reconnecting in 5 seconds.")
time.sleep(5)
self.connect()
def initModules(self):
"""load modules specified in instance config"""
" append module location to path "
sys.path.append(os.path.dirname(__file__)+"/modules/")
" append usermodule dir to beginning of path"
for path in self.botconfig["bot"]["usermodules"]:
sys.path.insert(0, path+"/")
for modulename in self.botconfig["modules"]:
self.loadmodule(modulename)
def importmodule(self, name):
"""Import a module
:param moduleName: Name of the module to import
:type moduleName: str"""
" check if already exists "
if not name in self.modules:
self.log.debug("Importing %s" % name)
" attempt to load "
try:
moduleref = __import__(name)
self.modules[name]=moduleref
return (True, None)
except Exception as e:
" on failure (usually syntax error in Module code) print an error "
self.log.error("Module %s failed to load: " % name)
self.log.error("Module load failure reason: " + str(e))
return (False, str(e))
else:
self.log.warning("Module %s already imported" % name)
return (False, "Module already imported")
def deportmodule(self, name):
"""Remove a module's code from memory. If the module is loaded it will be unloaded silently.
:param moduleName: Name of the module to import
:type moduleName: str"""
self.log.debug("Deporting %s" % name)
" unload if necessary "
if name in self.moduleInstances:
self.unloadmodule(name)
" delete all references to the module"
if name in self.modules:
item = self.modules[name]
del self.modules[name]
del item
" delete copy that python stores in sys.modules "
if name in sys.modules:
del sys.modules[name]
def loadmodule(self, name):
"""Activate a module.
:param moduleName: Name of the module to activate
:type moduleName: str"""
" check if already loaded "
if name in self.moduleInstances:
self.log.warning( "Module %s already loaded" % name )
return False
" check if needs to be imported, and verify it was "
if not name in self.modules:
importResult = self.importmodule(name)
if not importResult[0]:
return importResult
" init the module "
self.moduleInstances[name] = getattr(self.modules[name], name)(self, name)
" load hooks "
self.loadModuleHooks(self.moduleInstances[name])
def unloadmodule(self, name):
"""Deactivate a module.
:param moduleName: Name of the module to deactivate
:type moduleName: str"""
if name in self.moduleInstances:
" notify the module of disabling "
self.moduleInstances[name].ondisable()
" unload all hooks "
self.unloadModuleHooks(self.moduleInstances[name])
" remove the instance "
item = self.moduleInstances.pop(name)
" delete the instance"
del item
self.log.info( "Module %s unloaded" % name )
return (True, None)
else:
self.log.info("Module %s not loaded" % name)
return (False, "Module not loaded")
def reloadmodule(self, name):
"""Deactivate and activate a module.
:param moduleName: Name of the target module
:type moduleName: str"""
" make sure it's imporeted"
if name in self.modules:
" remember if it was loaded before"
loadedbefore = name in self.moduleInstances
self.log.info("Reloading %s" % self.modules[name])
" unload "
self.unloadmodule(name)
" load "
if loadedbefore:
self.loadmodule(name)
return (True, None)
return (False, "Module is not loaded")
def redomodule(self, name):
"""Reload a running module from disk
:param moduleName: Name of the target module
:type moduleName: str"""
" remember if it was loaded before "
loadedbefore = name in self.moduleInstances
" unload/deport "
self.deportmodule(name)
" import "
importResult = self.importmodule(name)
if not importResult[0]:
return importResult
" load "
if loadedbefore:
self.loadmodule(name)
return (True, None)
def loadModuleHooks(self, module):
"""**Internal.** Enable (connect) hooks of a module
:param module: module object to hook in
:type module: object"""
" activate a module's hooks "
for hook in module.hooks:
if type(hook.hook) == list:
for hookcmd in hook.hook:
self.irc.addHook(hookcmd, hook.method)
else:
self.irc.addHook(hook.hook, hook.method)
def unloadModuleHooks(self, module):
"""**Internal.** Disable (disconnect) hooks of a module
:param module: module object to unhook
:type module: object"""
" remove a modules hooks "
for hook in module.hooks:
if type(hook.hook) == list:
for hookcmd in hook.hook:
self.irc.removeHook(hookcmd, hook.method)
else:
self.irc.removeHook(hook.hook, hook.method)
def getmodulebyname(self, name):
"""Get a module object by name
:param name: name of the module to return
:type name: str
:returns: object -- the module object"""
if not name in self.moduleInstances:
return None
return self.moduleInstances[name]
def getmodulesbyservice(self, service):
"""Get a list of modules that provide the specified service
:param service: name of the service searched for
:type service: str
:returns: list -- a list of module objects"""
validModules = []
for module in self.moduleInstances:
if service in self.moduleInstances[module].services:
validModules.append(self.moduleInstances[module])
return validModules
def getBestModuleForService(self, service):
"""Get the first module that provides the specified service
:param service: name of the service searched for
:type service: str
:returns: object -- the module object, if found. None if not found."""
m = self.getmodulesbyservice(service)
if len(m)>0:
return m[0]
return None
def closeAllModules(self):
""" Deport all modules (for shutdown). Modules are unloaded in the opposite order listed in the config. """
loaded = list(self.moduleInstances.keys())
loadOrder = self.botconfig["modules"]
loadOrder.reverse()
for key in loadOrder:
if key in loaded:
loaded.remove(key)
self.deportmodule(key)
for key in loaded:
self.deportmodule(key)
" Filesystem Methods "
def getDataPath(self, moduleName):
"""Return the absolute path for a module's data dir
:param moduleName: the module who's data dir we want
:type moduleName: str"""
if not os.path.exists("%s/data/%s" % (self.botconfig["bot"]["datadir"], moduleName)):
os.mkdir("%s/data/%s/" % (self.botconfig["bot"]["datadir"], moduleName))
return "%s/data/%s/" % (self.botconfig["bot"]["datadir"], moduleName)
def getConfigPath(self, moduleName):
"""Return the absolute path for a module's config file
:param moduleName: the module who's config file we want
:type moduleName: str"""
basepath = "%s/config/%s" % (self.botconfig["bot"]["datadir"], moduleName)
if os.path.exists("%s.json"%basepath):
return "%s.json"%basepath
return None
" Utility methods "
@staticmethod
def messageHasCommand(command, message, requireArgs=False):
"""Check if a message has a command with or without args in it
:param command: the command string to look for, like !ban. If a list is passed, the first match is returned.
:type command: str or list
:param message: the message string to look in, like "!ban Lil_Mac"
:type message: str
:param requireArgs: if true, only validate if the command use has any amount of trailing text
:type requireArgs: bool"""
if not type(command)==list:
command = [command]
for item in command:
cmd = PyIRCBot.messageHasCommandSingle(item, message, requireArgs)
if cmd:
return cmd
return False
@staticmethod
def messageHasCommandSingle(command, message, requireArgs=False):
# Check if the message at least starts with the command
messageBeginning = message[0:len(command)]
if messageBeginning!=command:
return False
# Make sure it's not a subset of a longer command (ie .meme being set off by .memes)
subsetCheck = message[len(command):len(command)+1]
if subsetCheck!=" " and subsetCheck!="":
return False
# We've got the command! Do we need args?
argsStart = len(command)
args = ""
if argsStart > 0:
args = message[argsStart+1:]
if requireArgs and args.strip() == '':
return False
# Verified! Return the set.
ob = type('ParsedCommand', (object,), {})
ob.command = command
ob.args = [] if args=="" else args.split(" ")
ob.args_str = args
ob.message = message
return ob
# return (True, command, args, message)
@staticmethod
def load(filepath):
"""Return an object from the passed filepath
:param filepath: path to a json file. filename must end with .json
:type filepath: str
:Returns: | dict
"""
if filepath.endswith(".json"):
from json import load
return load(open(filepath, 'r'))
else:
raise Exception("Unknown config format")