diff --git a/pyircbot/irccore.py b/pyircbot/irccore.py index 2f4eb04..c366af5 100644 --- a/pyircbot/irccore.py +++ b/pyircbot/irccore.py @@ -61,7 +61,8 @@ class IRCCore(object): # Set up hooks for modules self.initHooks() - self.outputq = asyncio.Queue() + self.outseq = 5 + self.outputq = asyncio.PriorityQueue() self._loop.call_soon_threadsafe(asyncio.ensure_future, self.outputqueue()) async def loop(self, loop): @@ -105,19 +106,20 @@ class IRCCore(object): await asyncio.sleep(self.reconnect_delay) async def outputqueue(self): - bucket = burstbucket(self.rate_max, self.rate_int) + self.bucket = burstbucket(self.rate_max, self.rate_int) while True: prio, line = await self.outputq.get() # sleep until the bucket allows us to send if self.rate_limit: while True: - s = bucket.get() + s = self.bucket.get() if s == 0: break else: await asyncio.sleep(s, loop=self._loop) self.fire_hook('_SEND', args=None, prefix=None, trailing=None) self.log.debug(">>> {}".format(repr(line))) + self.outputq.task_done() try: self.writer.write((line + "\r\n").encode("UTF-8")) except Exception as e: # Probably fine if we drop messages while offline @@ -137,8 +139,18 @@ class IRCCore(object): self.writer.close() self.log.info("Kill complete") - def sendRaw(self, data): - asyncio.run_coroutine_threadsafe(self.outputq.put((5, data, )), self._loop) + def sendRaw(self, data, priority=None): + """ + Send data on the wire. Lower priorities are sent first. + :param data: unicode data to send. will be converted to utf-8 + :param priority: numerical priority value. If not None, the message will likely be sent first. Otherwise, an + ever-increasing sequence number is used to maintain order. For a minimum priority message, + use a priority value of sys.maxsize. + """ + if priority is None: + self.outseq += 1 + priority = self.outseq + asyncio.run_coroutine_threadsafe(self.outputq.put((priority, data, )), self._loop) " Module related code " def initHooks(self): @@ -306,7 +318,7 @@ class IRCCore(object): :type data: str""" self.sendRaw("PONG :%s" % data) - def act_USER(self, username, hostname, realname): + def act_USER(self, username, hostname, realname, priority=2): """Use the USER protocol command. Used during connection :param username: the bot's username @@ -315,22 +327,22 @@ class IRCCore(object): :type hostname: str :param realname: the bot's realname :type realname: str""" - self.sendRaw("USER %s %s %s :%s" % (username, hostname, self.servers[self.server], realname)) + self.sendRaw("USER %s %s %s :%s" % (username, hostname, self.servers[self.server], realname), priority) - def act_NICK(self, newNick): + def act_NICK(self, newNick, priority=2): """Use the `/nick` command :param newNick: new nick for the bot :type newNick: str""" self.nick = newNick - self.sendRaw("NICK %s" % newNick) + self.sendRaw("NICK %s" % newNick, priority) - def act_JOIN(self, channel): + def act_JOIN(self, channel, priority=3): """Use the `/join` command :param channel: the channel to attempt to join :type channel: str""" - self.sendRaw("JOIN %s" % channel) + self.sendRaw("JOIN %s" % channel, priority=3) def act_PRIVMSG(self, towho, message): """Use the `/msg` command @@ -375,12 +387,12 @@ class IRCCore(object): :type comment: str""" self.sendRaw("KICK %s %s :%s" % (channel, who, comment)) - def act_QUIT(self, message): + def act_QUIT(self, message, priority=2): """Use the `/quit` command :param message: quit message :type message: str""" - self.sendRaw("QUIT :%s" % message) + self.sendRaw("QUIT :%s" % message, priority) def act_PASS(self, password): """