diff --git a/pyircbot/irccore.py b/pyircbot/irccore.py index d71bcde..a76b9f1 100644 --- a/pyircbot/irccore.py +++ b/pyircbot/irccore.py @@ -6,6 +6,7 @@ """ +import queue import socket import asynchat import asyncore @@ -14,6 +15,8 @@ import traceback import sys from inspect import getargspec from socket import SHUT_RDWR +from threading import Thread +from time import sleep,time try: from cStringIO import StringIO @@ -44,6 +47,12 @@ class IRCCore(asynchat.async_chat): self.ipv6 = False """Use IPv6?""" + self.OUTPUT_BUFFER_SIZE = 1000 + self.SEND_WAIT = 0.800 + self.outputQueue = queue.PriorityQueue(self.OUTPUT_BUFFER_SIZE) + self.outputQueueRunner = OutputQueueRunner(self) + self.outputQueueRunner.start() + # IRC Messages are terminated with \r\n self.set_terminator(b"\r\n") @@ -57,8 +66,18 @@ class IRCCore(asynchat.async_chat): asyncore.loop(map=self.asynmap) def kill(self): - """TODO close the socket""" + """Send quit message and close the socket""" + # Pauses output queue + self.outputQueueRunner.paused = True + # Clear any pending messages + self.outputQueueRunner.clear() + # Send quit message and flush queue self.act_QUIT("Help! Another thread is killing me :(") + self.outputQueueRunner.flush() + # Signal disconnection + self.alive=False + # Close socket + self.close() " Net related code here on down " @@ -126,16 +145,13 @@ class IRCCore(asynchat.async_chat): self.fire_hook("_CONNECT") self.log.debug("handle_connect: complete") - def sendRaw(self, text): - """Send a raw string to the IRC server + def sendRaw(self, text, prio=2): + """Queue messages (raw string) to be sent to the IRC server :param text: the string to send :type text: str""" - if self.connected: - #self.log.debug(">> "+text) - self.send( (text+"\r\n").encode("UTF-8").decode().encode("UTF-8")) - else: - self.log.warning("Send attempted while disconnected. >> "+text) + text = (text+"\r\n").encode("UTF-8").decode().encode("UTF-8") + self.outputQueue.put((prio, text), block=False) def process_data(self, data): """Process one line of tet irc sent us @@ -413,5 +429,59 @@ class IRCCore(asynchat.async_chat): :param message: quit message :type message: str""" - self.sendRaw("QUIT :%s" % message) + self.sendRaw("QUIT :%s" % message, prio=0) +class OutputQueueRunner(Thread): + """Rate-limited output queue""" + def __init__(self, bot): + Thread.__init__(self, daemon=True) + self.bot = bot #reference to main bot thread + self.log = logging.getLogger('OutputQueueRunner') + self.paused = False + + def run(self): + """Constantly sends messages unless bot is disconnecting/ed""" + lastSend = time() + while True: + # Rate limit + sinceLast = time() - lastSend + if sinceLast < self.bot.SEND_WAIT: + toSleep = self.bot.SEND_WAIT - sinceLast + sleep(toSleep) + + # Pop item and execute + if self.bot.connected and not self.paused: + try: + self.process_queue_item() + lastSend = time() + except queue.Empty: + #self.log.debug("Queue is empty") + pass + sleep(0) # yield + + def process_queue_item(self): + """Remove 1 item from queue and process it""" + prio,text = self.bot.outputQueue.get(block=True, timeout=10) + #self.log.debug("%s>> %s" % (prio,text)) + self.bot.outputQueue.task_done() + self.bot.send(text) + + def clear(self): + """Discard all items from queue""" + length = self.bot.outputQueue.qsize() + try: + while True: + self.bot.outputQueue.get(block=False) + except queue.Empty: + pass + #self.log.debug("output queue cleared") + return length + + def flush(self): + """Process all items in queue""" + for i in range(0, self.bot.outputQueue.qsize()): + try: + self.process_queue_item() + except: + pass + #self.log.debug("output queue flushed")