Ditch asynchat for asyncio. Python 2 no longer supported.

This commit is contained in:
dave 2017-03-27 22:57:03 -07:00
parent 7e0a3a700e
commit f4fda5f60d
6 changed files with 138 additions and 309 deletions

View File

@ -1,9 +1,11 @@
#!/usr/bin/env python3
import sys
import logging
import signal
from argparse import ArgumentParser
from pyircbot import PyIRCBot
if __name__ == "__main__":
" logging level and facility "
logging.basicConfig(level=logging.INFO,
@ -30,7 +32,12 @@ if __name__ == "__main__":
log.debug(botconfig)
bot = PyIRCBot(botconfig)
try:
bot.loop()
except KeyboardInterrupt:
bot.kill(message="Ctrl-C pressed!")
def signal_handler(signum, stack):
print('Received:', signum)
bot.kill(message="received signal {}".format(signum))
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
bot.run()

Binary file not shown.

View File

@ -32,14 +32,13 @@ Instance Configuration
},
"connection":{
"servers": [
"weber.freenode.net",
"asimov.freenode.net",
"card.freenode.net",
"dickson.freenode.net",
"morgan.freenode.net"
["weber.freenode.net", 6667],
["asimov.freenode.net", 6667],
["card.freenode.net", 6667],
["dickson.freenode.net", 6667],
["morgan.freenode.net", 6667]
],
"ipv6":"off",
"port":6667
"ipv6":"off"
},
"modules":[
"PingResponder",
@ -74,18 +73,15 @@ options:
.. cmdoption:: connection.servers
List of hostnames or IP addresses of the IRC server to connection to. First
entry will be used for the initial connection on startup. If we the bot
must reconnect to the IRC server later, the next server will be used.
List of hostnames or IP addresses and ports of the IRC server to connection
to. First entry will be used for the initial connection on startup. If we
the bot must reconnect to the IRC server later, the next server will
be used.
.. cmdoption:: connection.ipv6
Enable or disable defaulting to IPv6 using the value "off" or "on"
.. cmdoption:: connection.port
Port to connect to on the IRC server
.. cmdoption:: modules
A list of modules to load. Modules are loaded in the order they are listed

View File

@ -7,14 +7,13 @@
},
"connection":{
"servers": [
"weber.freenode.net",
"asimov.freenode.net",
"card.freenode.net",
"dickson.freenode.net",
"morgan.freenode.net"
["weber.freenode.net", 6667],
["asimov.freenode.net", 6667],
["card.freenode.net", 6667],
["dickson.freenode.net", 6667],
["morgan.freenode.net", 6667]
],
"ipv6":"off",
"port":6667
},
"modules":[
"PingResponder",

View File

@ -6,34 +6,24 @@
"""
import queue
import socket
import asynchat
import asyncore
import asyncio
import logging
import traceback
import sys
from inspect import getargspec
from socket import SHUT_RDWR
from threading import Thread
from time import sleep, time
from time import sleep
from collections import namedtuple
from io import StringIO
try:
from cStringIO import StringIO
except:
from io import BytesIO as StringIO
IRCEvent = namedtuple("IRCEvent", "args prefix trailing")
IRCEvent = namedtuple("IRCEvent", "command args prefix trailing")
UserPrefix = namedtuple("UserPrefix", "nick username hostname")
ServerPrefix = namedtuple("ServerPrefix", "hostname")
class IRCCore(asynchat.async_chat):
class IRCCore(object):
def __init__(self):
asynchat.async_chat.__init__(self)
def __init__(self, servers):
self.connected = False
"""If we're connected or not"""
@ -49,155 +39,61 @@ class IRCCore(asynchat.async_chat):
self.server = 0
"""Current server index"""
self.servers = []
self.servers = servers
"""List of server address"""
self.port = 0
"""Server port"""
self.ipv6 = False
"""Use IPv6?"""
self.OUTPUT_BUFFER_SIZE = 1000
self.SEND_WAIT = 0.800
self.outputQueue = queue.Queue(self.OUTPUT_BUFFER_SIZE)
self.outputQueueRunner = OutputQueueRunner(self)
self.outputQueueRunner.start()
# IRC Messages are terminated with \r\n
self.set_terminator(b"\r\n")
# Set up hooks for modules
self.initHooks()
# Map for asynchat
self.asynmap = {}
def loop(self):
async def loop(self, loop):
while self.alive:
try:
asyncore.loop(map=self.asynmap, timeout=1)
except Exception:
# TODO support ipv6 again
self.reader, self.writer = await asyncio.open_connection(self.servers[self.server][0],
port=self.servers[self.server][1],
loop=loop,
ssl=None)
self.fire_hook("_CONNECT")
except (socket.gaierror, ConnectionRefusedError):
traceback.print_exc()
logging.warning("Non-fatal connect error, trying next server...")
self.server = (self.server + 1) % len(self.servers)
await asyncio.sleep(1, loop=loop)
continue
self.log.error("Loop error: ")
self.log.error(IRCCore.trace())
while self.alive:
try:
data = await self.reader.readuntil()
self.log.debug("<<< {}".format(repr(data)))
self.process_line(data.decode("UTF-8"))
except (ConnectionResetError, asyncio.streams.IncompleteReadError):
traceback.print_exc()
break
self.fire_hook("_DISCONNECT")
self.writer.close()
if self.alive:
# TODO ramp down reconnect attempts
logging.info("Reconnecting in 3s...")
sleep(3)
# Remove from asynmap
for key in list(self.asynmap.keys())[:]:
del self.asynmap[key]
if self.alive:
logging.info("Loop: reconnecting")
try:
self._connect()
except Exception:
self.log.error("Error reconnecting: ")
self.log.error(IRCCore.trace())
def kill(self, message="Help! Another thread is killing me :(", alive=False):
async def kill(self, message="Help! Another thread is killing me :("):
"""Send quit message, flush queue, and close the socket
:param message: Quit message
:param message: Quit message to send before disconnecting
:type message: str
:param alive: True causes a reconnect after disconnecting
:type alive: bool
"""
# Pauses output queue
self.outputQueueRunner.paused = not alive
# Clear any pending messages
self.outputQueueRunner.clear()
# Send quit message and flush queue
self.alive = False
self.act_QUIT(message) # TODO will this hang if the socket is having issues?
self.outputQueueRunner.flush()
# Signal disconnection
self.alive = alive
# Close socket
self.socket.shutdown(SHUT_RDWR)
self.close()
await self.writer.drain()
self.writer.close()
self.log.info("Kill complete")
" Net related code here on down "
def getBuf(self):
"""Return the network buffer and clear it"""
self.buffer.seek(0)
data = self.buffer.read()
self.buffer = StringIO()
return data
def collect_incoming_data(self, data):
"""Recieve data from the IRC server, append it to the buffer
:param data: the data that was recieved
:type data: str"""
self.buffer.write(data)
def found_terminator(self):
"""A complete command was pushed through, so clear the buffer and process it."""
line = None
buf = self.getBuf()
try:
line = buf.decode("UTF-8")
except UnicodeDecodeError as ude:
self.log.error("found_terminator(): could not decode input as UTF-8")
self.log.error("found_terminator(): data: %s" % line)
self.log.error("found_terminator(): repr(data): %s" % repr(line))
self.log.error("found_terminator(): error: %s" % str(ude))
return
self.log.debug("< {}".format(line))
self.process_data(line)
def handle_close(self):
"""Called when the socket is disconnected. Triggers the _DISCONNECT hook"""
self.log.info("handle_close")
self.connected = False
self.close()
self.fire_hook("_DISCONNECT")
def handle_error(self, *args, **kwargs):
"""Called on fatal network errors."""
self.log.error("Connection failed (handle_error)")
self.log.error(str(args))
self.log.error(str(kwargs))
self.log.error(IRCCore.trace())
def _connect(self):
"""Connect to IRC"""
self.server += 1
if self.server >= len(self.servers):
self.server = 0
serverHostname = self.servers[self.server]
self.log.info("Connecting to %(server)s:%(port)i", {"server": serverHostname, "port": self.port})
socket_type = socket.AF_INET
if self.ipv6:
self.log.info("IPv6 is enabled.")
socket_type = socket.AF_INET6
socketInfo = socket.getaddrinfo(serverHostname, self.port, socket_type)
self.create_socket(socket_type, socket.SOCK_STREAM)
self.log.info("Socket created: %s" % self.socket.fileno())
self.connect(socketInfo[0][4])
self.log.info("Connection established")
self._fileno = self.socket.fileno()
# See http://willpython.blogspot.com/2010/08/multiple-event-loops-with-asyncore-and.html
self.asynmap[self._fileno] = self
self.log.info("_connect: Socket map: %s" % str(self.asynmap))
def handle_connect(self):
"""When asynchat indicates our socket is connected, fire the _CONNECT hook"""
self.connected = True
self.log.info("handle_connect: connected")
self.fire_hook("_CONNECT")
self.log.info("handle_connect: complete")
def sendRaw(self, text, prio=2):
"""Queue messages (raw string) to be sent to the IRC server
:param text: the string to send
:type text: str"""
text = (text + "\r\n").encode("UTF-8").decode().encode("UTF-8")
self.outputQueue.put((prio, text), block=False)
def process_data(self, data):
"""Process one line of tet irc sent us
def process_line(self, data):
"""Process one line of text irc sent us
:param data: the data to process
:type data: str"""
@ -223,6 +119,7 @@ class IRCCore(asynchat.async_chat):
args = data.split(" ")
for index, arg in enumerate(args):
args[index] = arg.strip()
self.fire_hook("_RECV", args=args, prefix=prefix, trailing=trailing)
if command not in self.hookcalls:
self.log.warning("Unknown command: cmd='%s' prefix='%s' args='%s' trailing='%s'" % (command, prefix, args,
@ -230,44 +127,50 @@ class IRCCore(asynchat.async_chat):
else:
self.fire_hook(command, args=args, prefix=prefix, trailing=trailing)
def sendRaw(self, data):
self.log.warning(">>> {}".format(repr(data)))
self.writer.write((data + "\r\n").encode("UTF-8"))
" Module related code "
def initHooks(self):
"""Defines hooks that modules can listen for events of"""
self.hooks = [
'_CONNECT', # Called when the bot connects to IRC on the socket level
'_DISCONNECT', # Called when the irc socket is forcibly closed
'_RECV', # Called on network activity
'NOTICE', # :irc.129irc.com NOTICE AUTH :*** Looking up your hostname...
'MODE', # :CloneABCD MODE CloneABCD :+iwx
'PING', # PING :irc.129irc.com
'JOIN', # :CloneA!dave@hidden-B4F6B1AA.rit.edu JOIN :#clonea
'QUIT', # :HCSMPBot!~HCSMPBot@108.170.48.18 QUIT :Quit: Disconnecting!
'NICK', # :foxiAway!foxi@irc.hcsmp.com NICK :foxi
'PART', # :CloneA!dave@hidden-B4F6B1AA.rit.edu PART #clonea
'PRIVMSG', # :CloneA!dave@hidden-B4F6B1AA.rit.edu PRIVMSG #clonea :aaa
'KICK', # :xMopxShell!~rduser@host KICK #xMopx2 xBotxShellTest :xBotxShellTest
'INVITE', # :gmx!~gmxgeek@irc.hcsmp.com INVITE Tyrone :#hcsmp'
'001', # :irc.129irc.com 001 CloneABCD :Welcome to the 129irc IRC Network CloneABCD!CloneABCD@djptwc-laptop1.rit.edu
'002', # :irc.129irc.com 002 CloneABCD :Your host is irc.129irc.com, running version Unreal3.2.8.1
'003', # :irc.129irc.com 003 CloneABCD :This server was created Mon Jul 19 2010 at 03:12:01 EDT
'004', # :irc.129irc.com 004 CloneABCD irc.129irc.com Unreal3.2.8.1 iowghraAsORTVSxNCWqBzvdHtGp lvhopsmntikrRcaqOALQbSeIKVfMCuzNTGj
'005', # :irc.129irc.com 005 CloneABCD CMDS=KNOCK,MAP,DCCALLOW,USERIP UHNAMES NAMESX SAFELIST HCN MAXCHANNELS=10 CHANLIMIT=#:10 MAXLIST=b:60,e:60,I:60 NICKLEN=30 CHANNELLEN=32 TOPICLEN=307 KICKLEN=307 AWAYLEN=307 :are supported by this server
'250', # :chaos.esper.net 250 xBotxShellTest :Highest connection count: 1633 (1632 clients) (186588 connections received)
'251', # :irc.129irc.com 251 CloneABCD :There are 1 users and 48 invisible on 2 servers
'252', # :irc.129irc.com 252 CloneABCD 9 :operator(s) online
'254', # :irc.129irc.com 254 CloneABCD 6 :channels formed
'255', # :irc.129irc.com 255 CloneABCD :I have 42 clients and 1 servers
'265', # :irc.129irc.com 265 CloneABCD :Current Local Users: 42 Max: 47
'266', # :irc.129irc.com 266 CloneABCD :Current Global Users: 49 Max: 53
'332', # :chaos.esper.net 332 xBotxShellTest #xMopx2 :/ #XMOPX2 / https://code.google.com/p/pyircbot/ (Channel Topic)
'333', # :chaos.esper.net 333 xBotxShellTest #xMopx2 xMopxShell!~rduser@108.170.60.242 1344370109
'353', # :irc.129irc.com 353 CloneABCD = #clonea :CloneABCD CloneABC
'366', # :irc.129irc.com 366 CloneABCD #clonea :End of /NAMES list.
'372', # :chaos.esper.net 372 xBotxShell :motd text here
'375', # :chaos.esper.net 375 xBotxShellTest :- chaos.esper.net Message of the Day -
'376', # :chaos.esper.net 376 xBotxShell :End of /MOTD command.
'422', # :irc.129irc.com 422 CloneABCD :MOTD File is missing
'433', # :nova.esper.net 433 * pyircbot3 :Nickname is already in use.
'_CONNECT',
'_DISCONNECT',
'_RECV',
'NOTICE',
'MODE',
'PING',
'JOIN',
'QUIT',
'NICK',
'PART',
'PRIVMSG',
'KICK',
'INVITE',
'001',
'002',
'003',
'004',
'005',
'250',
'251',
'252',
'254',
'255',
'265',
'266',
'332',
'333',
'353',
'366',
'372',
'375',
'376',
'422',
'433',
]
" mapping of hooks to methods "
self.hookcalls = {command: [] for command in self.hooks}
@ -283,11 +186,10 @@ class IRCCore(asynchat.async_chat):
:type prefix: str
:param trailing: data payload of the command
:type trailing: str"""
for hook in self.hookcalls[command]:
try:
if len(getargspec(hook).args) == 2:
hook(IRCCore.packetAsObject(args, prefix, trailing))
hook(IRCCore.packetAsObject(command, args, prefix, trailing))
else:
hook(args, prefix, trailing)
@ -324,7 +226,7 @@ class IRCCore(asynchat.async_chat):
self.log.warning("Invalid hook - %s" % command)
return False
def packetAsObject(args, prefix, trailing):
def packetAsObject(command, args, prefix, trailing):
"""Given an irc message's args, prefix, and trailing data return an object with these properties
:param args: list of args from the IRC packet
@ -335,7 +237,7 @@ class IRCCore(asynchat.async_chat):
:type trailing: str
:returns: object -- a IRCEvent object with the ``args``, ``prefix``, ``trailing``"""
return IRCEvent(args,
return IRCEvent(command, args,
IRCCore.decodePrefix(prefix) if prefix else None,
trailing)
@ -467,67 +369,10 @@ class IRCCore(asynchat.async_chat):
:param message: quit message
:type message: str"""
self.sendRaw("QUIT :%s" % message, prio=0)
self.sendRaw("QUIT :%s" % message)
def act_PASS(self, password):
"""
Send server password, for use on connection
"""
self.sendRaw("PASS %s" % password, prio=0)
class OutputQueueRunner(Thread):
"""Rate-limited output queue"""
def __init__(self, bot):
Thread.__init__(self, daemon=True)
self.bot = bot # reference to main bot thread
self.log = logging.getLogger('OutputQueueRunner')
self.paused = False
def run(self):
"""Constantly sends messages unless bot is disconnecting/ed"""
lastSend = time()
while True:
# Rate limit
sinceLast = time() - lastSend
if sinceLast < self.bot.SEND_WAIT:
toSleep = self.bot.SEND_WAIT - sinceLast
sleep(toSleep)
# Pop item and execute
if self.bot.connected and not self.paused:
try:
self.process_queue_item()
lastSend = time()
except queue.Empty:
# self.log.debug("Queue is empty")
pass
sleep(0.01)
def process_queue_item(self):
"""Remove 1 item from queue and process it"""
prio, text = self.bot.outputQueue.get(block=True, timeout=10)
# self.log.debug("%s>> %s" % (prio,text))
self.bot.outputQueue.task_done()
self.log.debug("> {}".format(text.decode('UTF-8')))
self.bot.send(text)
def clear(self):
"""Discard all items from queue"""
length = self.bot.outputQueue.qsize()
try:
while True:
self.bot.outputQueue.get(block=False)
except queue.Empty:
pass
# self.log.debug("output queue cleared")
return length
def flush(self):
"""Process all items in queue"""
for i in range(0, self.bot.outputQueue.qsize()):
try:
self.process_queue_item()
except:
pass
# self.log.debug("output queue flushed")
self.sendRaw("PASS %s" % password)

View File

@ -7,28 +7,29 @@
"""
import logging
import time
import sys
import traceback
from pyircbot.rpc import BotRPC
from pyircbot.irccore import IRCCore
from collections import namedtuple
import os.path
import asyncio
ParsedCommand = namedtuple("ParsedCommand", "command args args_str message")
class PyIRCBot(object):
""":param botconfig: The configuration of this instance of the bot. Passed by main.py.
:type botconfig: dict
"""
version = "4.0.0-r03"
version = "4.1.0"
def __init__(self, botconfig):
self.log = logging.getLogger('PyIRCBot')
"""Reference to logger object"""
self.botconfig = botconfig
"""saved copy of the instance config"""
self.botconfig = botconfig
"""storage of imported modules"""
self.modules = {}
@ -36,16 +37,11 @@ class PyIRCBot(object):
"""instances of modules"""
self.moduleInstances = {}
self.rpc = BotRPC(self)
"""Reference to BotRPC thread"""
self.rpc = BotRPC(self)
self.irc = IRCCore()
"""IRC protocol class"""
self.irc.servers = self.botconfig["connection"]["servers"]
self.irc.port = self.botconfig["connection"]["port"]
self.irc.ipv6 = True if self.botconfig["connection"]["ipv6"] == "on" else False
self.irc.addHook("_DISCONNECT", self.connection_closed)
"""IRC protocol handler"""
self.irc = IRCCore(servers=self.botconfig["connection"]["servers"])
# legacy support
self.act_PONG = self.irc.act_PONG
@ -64,20 +60,17 @@ class PyIRCBot(object):
# Load modules
self.initModules()
# Connect to IRC
self.connect()
def run(self):
self.loop = asyncio.get_event_loop()
def connect(self):
self.client = asyncio.ensure_future(self.irc.loop(self.loop), loop=self.loop)
try:
self.irc._connect()
except:
self.log.error("Pyircbot attempted to connect and failed!")
self.log.error(traceback.format_exc())
self.loop.set_debug(True)
self.loop.run_until_complete(self.client)
finally:
logging.debug("Escaped main loop")
def loop(self):
self.irc.loop()
def disconnect(self, message, reconnect=True):
def disconnect(self, message):
"""Send quit message and disconnect from IRC.
:param message: Quit message
@ -86,9 +79,9 @@ class PyIRCBot(object):
:type reconnect: bool
"""
self.log.info("disconnect")
self.irc.kill(message=message, alive=reconnect)
self.kill(message=message)
def kill(self, sys_exit=True, message="Help! Another thread is killing me :("):
def kill(self, message="Help! Another thread is killing me :("):
"""Shut down the bot violently
:param sys_exit: True causes sys.exit(0) to be called
@ -97,18 +90,7 @@ class PyIRCBot(object):
:type message: str
"""
self.closeAllModules()
self.irc.kill(message=message, alive=not sys_exit)
if sys_exit:
sys.exit(0)
def connection_closed(self, args, prefix, trailing):
"""Called when the socket is disconnected. We will want to reconnect. """
if self.irc.alive:
self.log.warning("Connection was lost. Reconnecting in 5 seconds.")
time.sleep(5)
self.connect()
asyncio.run_coroutine_threadsafe(self.irc.kill(message=message), self.loop)
def initModules(self):
"""load modules specified in instance config"""