Add output rate limiting
This commit is contained in:
parent
2f88dc28c6
commit
3d047bb0cc
|
@ -40,7 +40,11 @@ Instance Configuration
|
||||||
],
|
],
|
||||||
"force_ipv6": false,
|
"force_ipv6": false,
|
||||||
"force_ipv4": false,
|
"force_ipv4": false,
|
||||||
"bind": ["1.2.3.4", 5678]
|
"bind": ["1.2.3.4", 5678],
|
||||||
|
"rate_limit": {
|
||||||
|
"rate_max": 5.0,
|
||||||
|
"rate_int":1.1
|
||||||
|
}
|
||||||
},
|
},
|
||||||
"modules":[
|
"modules":[
|
||||||
"PingResponder",
|
"PingResponder",
|
||||||
|
@ -98,6 +102,11 @@ options:
|
||||||
|
|
||||||
To bind to an address but no specific port, set the second tuple entry to `null`.
|
To bind to an address but no specific port, set the second tuple entry to `null`.
|
||||||
|
|
||||||
|
.. cmdoption:: connection.rate_limit
|
||||||
|
|
||||||
|
Set to false to disable rate limiting. Otherwise, a dict containing two floats keyed: `rate_max`: how many messages
|
||||||
|
may be bursted at once, and `rate_int`: after bursting, how many seconds between messages.
|
||||||
|
|
||||||
.. cmdoption:: modules
|
.. cmdoption:: modules
|
||||||
|
|
||||||
A list of modules to load. Modules are loaded in the order they are listed
|
A list of modules to load. Modules are loaded in the order they are listed
|
||||||
|
|
|
@ -13,7 +13,11 @@
|
||||||
["dickson.freenode.net", 6667],
|
["dickson.freenode.net", 6667],
|
||||||
["morgan.freenode.net", 6667]
|
["morgan.freenode.net", 6667]
|
||||||
],
|
],
|
||||||
"force_ipv6": false
|
"force_ipv6": false,
|
||||||
|
"rate_limit": {
|
||||||
|
"rate_max": 5.0,
|
||||||
|
"rate_int":1.1
|
||||||
|
}
|
||||||
},
|
},
|
||||||
"modules":[
|
"modules":[
|
||||||
"PingResponder",
|
"PingResponder",
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
from time import time
|
||||||
|
from math import floor
|
||||||
|
|
||||||
|
|
||||||
|
class burstbucket(object):
|
||||||
|
def __init__(self, maximum, interval):
|
||||||
|
"""
|
||||||
|
Burst bucket class for rate limiting
|
||||||
|
:param maximum: maximum value in the bucket
|
||||||
|
:param interval: how often a whole item is added to the bucket
|
||||||
|
"""
|
||||||
|
# How many messages can be bursted
|
||||||
|
self.bucket_max = maximum
|
||||||
|
# how often the bucket has 1 item added
|
||||||
|
self.bucket_period = interval
|
||||||
|
# last time the burst bucket was filled
|
||||||
|
self.bucket_lastfill = time()
|
||||||
|
|
||||||
|
self.bucket = self.bucket_max
|
||||||
|
|
||||||
|
def get(self):
|
||||||
|
"""
|
||||||
|
Return 0 if no sleeping is necessary to rate limit. Otherwise, return the number of seconds to sleep. This
|
||||||
|
method should be called again by the user after sleeping
|
||||||
|
"""
|
||||||
|
# First, update the bucket
|
||||||
|
# Check if $period time has passed since the bucket was filled
|
||||||
|
since_fill = time() - self.bucket_lastfill
|
||||||
|
if since_fill > self.bucket_period:
|
||||||
|
# How many complete points are credited
|
||||||
|
fills = floor(since_fill / self.bucket_period)
|
||||||
|
self.bucket += fills
|
||||||
|
if self.bucket > self.bucket_max:
|
||||||
|
self.bucket = self.bucket_max
|
||||||
|
# Advance the lastfill time appropriately
|
||||||
|
self.bucket_lastfill += self.bucket_period * fills
|
||||||
|
|
||||||
|
if self.bucket >= 1:
|
||||||
|
self.bucket -= 1
|
||||||
|
return 0
|
||||||
|
return self.bucket_period - since_fill
|
|
@ -12,7 +12,7 @@ import logging
|
||||||
import traceback
|
import traceback
|
||||||
import sys
|
import sys
|
||||||
from inspect import getargspec
|
from inspect import getargspec
|
||||||
from time import sleep
|
from pyircbot.common import burstbucket
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
from io import StringIO
|
from io import StringIO
|
||||||
|
|
||||||
|
@ -23,7 +23,13 @@ ServerPrefix = namedtuple("ServerPrefix", "hostname")
|
||||||
|
|
||||||
class IRCCore(object):
|
class IRCCore(object):
|
||||||
|
|
||||||
def __init__(self, servers):
|
def __init__(self, servers, loop, rate_limit=True, rate_max=5.0, rate_int=1.1):
|
||||||
|
self._loop = loop
|
||||||
|
|
||||||
|
# rate limiting options
|
||||||
|
self.rate_limit = rate_limit
|
||||||
|
self.rate_max = float(rate_max)
|
||||||
|
self.rate_int = float(rate_int)
|
||||||
|
|
||||||
self.connected = False
|
self.connected = False
|
||||||
"""If we're connected or not"""
|
"""If we're connected or not"""
|
||||||
|
@ -53,6 +59,9 @@ class IRCCore(object):
|
||||||
# Set up hooks for modules
|
# Set up hooks for modules
|
||||||
self.initHooks()
|
self.initHooks()
|
||||||
|
|
||||||
|
self.outputq = asyncio.Queue()
|
||||||
|
self._loop.call_soon(asyncio.ensure_future, self.outputqueue())
|
||||||
|
|
||||||
async def loop(self, loop):
|
async def loop(self, loop):
|
||||||
while self.alive:
|
while self.alive:
|
||||||
try:
|
try:
|
||||||
|
@ -88,6 +97,26 @@ class IRCCore(object):
|
||||||
logging.info("Reconnecting in 3s...")
|
logging.info("Reconnecting in 3s...")
|
||||||
await asyncio.sleep(3)
|
await asyncio.sleep(3)
|
||||||
|
|
||||||
|
async def outputqueue(self):
|
||||||
|
bucket = burstbucket(self.rate_max, self.rate_int)
|
||||||
|
while True:
|
||||||
|
prio, line = await self.outputq.get()
|
||||||
|
# sleep until the bucket allows us to send
|
||||||
|
if self.rate_limit:
|
||||||
|
while True:
|
||||||
|
s = bucket.get()
|
||||||
|
if s == 0:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
await asyncio.sleep(s, loop=self._loop)
|
||||||
|
self.fire_hook('_SEND', args=None, prefix=None, trailing=None)
|
||||||
|
self.log.debug(">>> {}".format(repr(line)))
|
||||||
|
try:
|
||||||
|
self.writer.write((line + "\r\n").encode("UTF-8"))
|
||||||
|
except Exception as e: # Probably fine if we drop messages while offline
|
||||||
|
print(e)
|
||||||
|
print(self.trace())
|
||||||
|
|
||||||
async def kill(self, message="Help! Another thread is killing me :(", forever=True):
|
async def kill(self, message="Help! Another thread is killing me :(", forever=True):
|
||||||
"""Send quit message, flush queue, and close the socket
|
"""Send quit message, flush queue, and close the socket
|
||||||
|
|
||||||
|
@ -137,9 +166,7 @@ class IRCCore(object):
|
||||||
self.fire_hook(command, args=args, prefix=prefix, trailing=trailing)
|
self.fire_hook(command, args=args, prefix=prefix, trailing=trailing)
|
||||||
|
|
||||||
def sendRaw(self, data):
|
def sendRaw(self, data):
|
||||||
self.log.debug(">>> {}".format(repr(data)))
|
asyncio.run_coroutine_threadsafe(self.outputq.put((5, data, )), self._loop)
|
||||||
self.fire_hook('_SEND', args=None, prefix=None, trailing=None)
|
|
||||||
self.writer.write((data + "\r\n").encode("UTF-8"))
|
|
||||||
|
|
||||||
" Module related code "
|
" Module related code "
|
||||||
def initHooks(self):
|
def initHooks(self):
|
||||||
|
|
|
@ -31,6 +31,8 @@ class PyIRCBot(object):
|
||||||
self.log = logging.getLogger('PyIRCBot')
|
self.log = logging.getLogger('PyIRCBot')
|
||||||
"""Reference to logger object"""
|
"""Reference to logger object"""
|
||||||
|
|
||||||
|
self.loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
"""saved copy of the instance config"""
|
"""saved copy of the instance config"""
|
||||||
self.botconfig = botconfig
|
self.botconfig = botconfig
|
||||||
|
|
||||||
|
@ -43,8 +45,14 @@ class PyIRCBot(object):
|
||||||
"""Reference to BotRPC thread"""
|
"""Reference to BotRPC thread"""
|
||||||
self.rpc = BotRPC(self)
|
self.rpc = BotRPC(self)
|
||||||
|
|
||||||
|
ratelimit = self.botconfig["connection"].get("rate_limit", None) or dict(rate_max=5.0, rate_int=1.1)
|
||||||
|
|
||||||
"""IRC protocol handler"""
|
"""IRC protocol handler"""
|
||||||
self.irc = IRCCore(servers=self.botconfig["connection"]["servers"])
|
self.irc = IRCCore(servers=self.botconfig["connection"]["servers"],
|
||||||
|
loop=self.loop,
|
||||||
|
rate_limit=True if ratelimit else False,
|
||||||
|
rate_max=ratelimit["rate_max"],
|
||||||
|
rate_int=ratelimit["rate_int"])
|
||||||
if self.botconfig.get("connection").get("force_ipv6", False):
|
if self.botconfig.get("connection").get("force_ipv6", False):
|
||||||
self.irc.connection_family = AF_INET6
|
self.irc.connection_family = AF_INET6
|
||||||
elif self.botconfig.get("connection").get("force_ipv4", False):
|
elif self.botconfig.get("connection").get("force_ipv4", False):
|
||||||
|
@ -72,8 +80,6 @@ class PyIRCBot(object):
|
||||||
self.irc.addHook("PRIVMSG", self._irchook_internal)
|
self.irc.addHook("PRIVMSG", self._irchook_internal)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.loop = asyncio.get_event_loop()
|
|
||||||
|
|
||||||
self.client = asyncio.ensure_future(self.irc.loop(self.loop), loop=self.loop)
|
self.client = asyncio.ensure_future(self.irc.loop(self.loop), loop=self.loop)
|
||||||
try:
|
try:
|
||||||
self.loop.set_debug(True)
|
self.loop.set_debug(True)
|
||||||
|
|
Loading…
Reference in New Issue