Add output queue

This commit is contained in:
dave 2015-10-31 16:27:37 -07:00
parent 2643883a88
commit 640e3fd3a9
1 changed files with 79 additions and 9 deletions

View File

@ -6,6 +6,7 @@
"""
import queue
import socket
import asynchat
import asyncore
@ -14,6 +15,8 @@ import traceback
import sys
from inspect import getargspec
from socket import SHUT_RDWR
from threading import Thread
from time import sleep,time
try:
from cStringIO import StringIO
@ -44,6 +47,12 @@ class IRCCore(asynchat.async_chat):
self.ipv6 = False
"""Use IPv6?"""
self.OUTPUT_BUFFER_SIZE = 1000
self.SEND_WAIT = 0.800
self.outputQueue = queue.PriorityQueue(self.OUTPUT_BUFFER_SIZE)
self.outputQueueRunner = OutputQueueRunner(self)
self.outputQueueRunner.start()
# IRC Messages are terminated with \r\n
self.set_terminator(b"\r\n")
@ -57,8 +66,18 @@ class IRCCore(asynchat.async_chat):
asyncore.loop(map=self.asynmap)
def kill(self):
"""TODO close the socket"""
"""Send quit message and close the socket"""
# Pauses output queue
self.outputQueueRunner.paused = True
# Clear any pending messages
self.outputQueueRunner.clear()
# Send quit message and flush queue
self.act_QUIT("Help! Another thread is killing me :(")
self.outputQueueRunner.flush()
# Signal disconnection
self.alive=False
# Close socket
self.close()
" Net related code here on down "
@ -126,16 +145,13 @@ class IRCCore(asynchat.async_chat):
self.fire_hook("_CONNECT")
self.log.debug("handle_connect: complete")
def sendRaw(self, text):
"""Send a raw string to the IRC server
def sendRaw(self, text, prio=2):
"""Queue messages (raw string) to be sent to the IRC server
:param text: the string to send
:type text: str"""
if self.connected:
#self.log.debug(">> "+text)
self.send( (text+"\r\n").encode("UTF-8").decode().encode("UTF-8"))
else:
self.log.warning("Send attempted while disconnected. >> "+text)
text = (text+"\r\n").encode("UTF-8").decode().encode("UTF-8")
self.outputQueue.put((prio, text), block=False)
def process_data(self, data):
"""Process one line of tet irc sent us
@ -413,5 +429,59 @@ class IRCCore(asynchat.async_chat):
:param message: quit message
:type message: str"""
self.sendRaw("QUIT :%s" % message)
self.sendRaw("QUIT :%s" % message, prio=0)
class OutputQueueRunner(Thread):
"""Rate-limited output queue"""
def __init__(self, bot):
Thread.__init__(self, daemon=True)
self.bot = bot #reference to main bot thread
self.log = logging.getLogger('OutputQueueRunner')
self.paused = False
def run(self):
"""Constantly sends messages unless bot is disconnecting/ed"""
lastSend = time()
while True:
# Rate limit
sinceLast = time() - lastSend
if sinceLast < self.bot.SEND_WAIT:
toSleep = self.bot.SEND_WAIT - sinceLast
sleep(toSleep)
# Pop item and execute
if self.bot.connected and not self.paused:
try:
self.process_queue_item()
lastSend = time()
except queue.Empty:
#self.log.debug("Queue is empty")
pass
sleep(0) # yield
def process_queue_item(self):
"""Remove 1 item from queue and process it"""
prio,text = self.bot.outputQueue.get(block=True, timeout=10)
#self.log.debug("%s>> %s" % (prio,text))
self.bot.outputQueue.task_done()
self.bot.send(text)
def clear(self):
"""Discard all items from queue"""
length = self.bot.outputQueue.qsize()
try:
while True:
self.bot.outputQueue.get(block=False)
except queue.Empty:
pass
#self.log.debug("output queue cleared")
return length
def flush(self):
"""Process all items in queue"""
for i in range(0, self.bot.outputQueue.qsize()):
try:
self.process_queue_item()
except:
pass
#self.log.debug("output queue flushed")