From 3d047bb0cce07add003f2196d19f4f4cf80c1729 Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 24 Nov 2017 16:06:55 -0800 Subject: [PATCH] Add output rate limiting --- docs/setup/initial_config.rst | 11 +++++++++- examples/config.json | 6 ++++- pyircbot/common.py | 41 +++++++++++++++++++++++++++++++++++ pyircbot/irccore.py | 37 ++++++++++++++++++++++++++----- pyircbot/pyircbot.py | 12 +++++++--- 5 files changed, 97 insertions(+), 10 deletions(-) create mode 100644 pyircbot/common.py diff --git a/docs/setup/initial_config.rst b/docs/setup/initial_config.rst index b0261ad..1205c7c 100644 --- a/docs/setup/initial_config.rst +++ b/docs/setup/initial_config.rst @@ -40,7 +40,11 @@ Instance Configuration ], "force_ipv6": false, "force_ipv4": false, - "bind": ["1.2.3.4", 5678] + "bind": ["1.2.3.4", 5678], + "rate_limit": { + "rate_max": 5.0, + "rate_int":1.1 + } }, "modules":[ "PingResponder", @@ -98,6 +102,11 @@ options: To bind to an address but no specific port, set the second tuple entry to `null`. +.. cmdoption:: connection.rate_limit + + Set to false to disable rate limiting. Otherwise, a dict containing two floats keyed: `rate_max`: how many messages + may be bursted at once, and `rate_int`: after bursting, how many seconds between messages. + .. cmdoption:: modules A list of modules to load. Modules are loaded in the order they are listed diff --git a/examples/config.json b/examples/config.json index 0c629b5..ea29378 100644 --- a/examples/config.json +++ b/examples/config.json @@ -13,7 +13,11 @@ ["dickson.freenode.net", 6667], ["morgan.freenode.net", 6667] ], - "force_ipv6": false + "force_ipv6": false, + "rate_limit": { + "rate_max": 5.0, + "rate_int":1.1 + } }, "modules":[ "PingResponder", diff --git a/pyircbot/common.py b/pyircbot/common.py new file mode 100644 index 0000000..ca03fd2 --- /dev/null +++ b/pyircbot/common.py @@ -0,0 +1,41 @@ +from time import time +from math import floor + + +class burstbucket(object): + def __init__(self, maximum, interval): + """ + Burst bucket class for rate limiting + :param maximum: maximum value in the bucket + :param interval: how often a whole item is added to the bucket + """ + # How many messages can be bursted + self.bucket_max = maximum + # how often the bucket has 1 item added + self.bucket_period = interval + # last time the burst bucket was filled + self.bucket_lastfill = time() + + self.bucket = self.bucket_max + + def get(self): + """ + Return 0 if no sleeping is necessary to rate limit. Otherwise, return the number of seconds to sleep. This + method should be called again by the user after sleeping + """ + # First, update the bucket + # Check if $period time has passed since the bucket was filled + since_fill = time() - self.bucket_lastfill + if since_fill > self.bucket_period: + # How many complete points are credited + fills = floor(since_fill / self.bucket_period) + self.bucket += fills + if self.bucket > self.bucket_max: + self.bucket = self.bucket_max + # Advance the lastfill time appropriately + self.bucket_lastfill += self.bucket_period * fills + + if self.bucket >= 1: + self.bucket -= 1 + return 0 + return self.bucket_period - since_fill diff --git a/pyircbot/irccore.py b/pyircbot/irccore.py index 24b6690..30662fe 100644 --- a/pyircbot/irccore.py +++ b/pyircbot/irccore.py @@ -12,7 +12,7 @@ import logging import traceback import sys from inspect import getargspec -from time import sleep +from pyircbot.common import burstbucket from collections import namedtuple from io import StringIO @@ -23,7 +23,13 @@ ServerPrefix = namedtuple("ServerPrefix", "hostname") class IRCCore(object): - def __init__(self, servers): + def __init__(self, servers, loop, rate_limit=True, rate_max=5.0, rate_int=1.1): + self._loop = loop + + # rate limiting options + self.rate_limit = rate_limit + self.rate_max = float(rate_max) + self.rate_int = float(rate_int) self.connected = False """If we're connected or not""" @@ -53,6 +59,9 @@ class IRCCore(object): # Set up hooks for modules self.initHooks() + self.outputq = asyncio.Queue() + self._loop.call_soon(asyncio.ensure_future, self.outputqueue()) + async def loop(self, loop): while self.alive: try: @@ -88,6 +97,26 @@ class IRCCore(object): logging.info("Reconnecting in 3s...") await asyncio.sleep(3) + async def outputqueue(self): + bucket = burstbucket(self.rate_max, self.rate_int) + while True: + prio, line = await self.outputq.get() + # sleep until the bucket allows us to send + if self.rate_limit: + while True: + s = bucket.get() + if s == 0: + break + else: + await asyncio.sleep(s, loop=self._loop) + self.fire_hook('_SEND', args=None, prefix=None, trailing=None) + self.log.debug(">>> {}".format(repr(line))) + try: + self.writer.write((line + "\r\n").encode("UTF-8")) + except Exception as e: # Probably fine if we drop messages while offline + print(e) + print(self.trace()) + async def kill(self, message="Help! Another thread is killing me :(", forever=True): """Send quit message, flush queue, and close the socket @@ -137,9 +166,7 @@ class IRCCore(object): self.fire_hook(command, args=args, prefix=prefix, trailing=trailing) def sendRaw(self, data): - self.log.debug(">>> {}".format(repr(data))) - self.fire_hook('_SEND', args=None, prefix=None, trailing=None) - self.writer.write((data + "\r\n").encode("UTF-8")) + asyncio.run_coroutine_threadsafe(self.outputq.put((5, data, )), self._loop) " Module related code " def initHooks(self): diff --git a/pyircbot/pyircbot.py b/pyircbot/pyircbot.py index 82ab7a7..53eede7 100644 --- a/pyircbot/pyircbot.py +++ b/pyircbot/pyircbot.py @@ -31,6 +31,8 @@ class PyIRCBot(object): self.log = logging.getLogger('PyIRCBot') """Reference to logger object""" + self.loop = asyncio.get_event_loop() + """saved copy of the instance config""" self.botconfig = botconfig @@ -43,8 +45,14 @@ class PyIRCBot(object): """Reference to BotRPC thread""" self.rpc = BotRPC(self) + ratelimit = self.botconfig["connection"].get("rate_limit", None) or dict(rate_max=5.0, rate_int=1.1) + """IRC protocol handler""" - self.irc = IRCCore(servers=self.botconfig["connection"]["servers"]) + self.irc = IRCCore(servers=self.botconfig["connection"]["servers"], + loop=self.loop, + rate_limit=True if ratelimit else False, + rate_max=ratelimit["rate_max"], + rate_int=ratelimit["rate_int"]) if self.botconfig.get("connection").get("force_ipv6", False): self.irc.connection_family = AF_INET6 elif self.botconfig.get("connection").get("force_ipv4", False): @@ -72,8 +80,6 @@ class PyIRCBot(object): self.irc.addHook("PRIVMSG", self._irchook_internal) def run(self): - self.loop = asyncio.get_event_loop() - self.client = asyncio.ensure_future(self.irc.loop(self.loop), loop=self.loop) try: self.loop.set_debug(True)