Listen for correct events
This commit is contained in:
parent
2226434f1d
commit
604379d9b1
|
@ -9,11 +9,11 @@ import argparse
|
|||
|
||||
from time import sleep
|
||||
from select import select
|
||||
from threading import Thread, Lock
|
||||
from threading import Thread, Lock, Timer
|
||||
from collections import defaultdict, namedtuple
|
||||
|
||||
from docker import tls # NOQA
|
||||
from docker import Client
|
||||
from docker import Client, errors
|
||||
|
||||
from jinja2 import Environment
|
||||
|
||||
|
@ -55,9 +55,9 @@ def shell():
|
|||
|
||||
class LogInjectorDaemon(object):
|
||||
|
||||
EVENT_FILTERS_STOPSTART = {"type": "container",
|
||||
"event": ["stop",
|
||||
"start"]}
|
||||
EVENT_FILTERS_STARTDIE = {"type": "container",
|
||||
"event": ["start",
|
||||
"die"]}
|
||||
|
||||
detector = namedtuple("Detector", "match paths")
|
||||
|
||||
|
@ -82,6 +82,9 @@ class LogInjectorDaemon(object):
|
|||
self.loggers_lock = Lock()
|
||||
self.container_names = {}
|
||||
|
||||
self.rescan_delay = 10
|
||||
self.rescans = 1
|
||||
|
||||
signal.signal(signal.SIGTERM, self.signal_handler)
|
||||
|
||||
def signal_handler(self, signum, frame):
|
||||
|
@ -150,7 +153,7 @@ class LogInjectorDaemon(object):
|
|||
"""
|
||||
Docker change listener thread. Subscribes to docker's event api and respond to containers stopping/starting
|
||||
"""
|
||||
for e in self.docker.events(filters=LogInjectorDaemon.EVENT_FILTERS_STOPSTART):
|
||||
for e in self.docker.events(filters=LogInjectorDaemon.EVENT_FILTERS_STARTDIE):
|
||||
event = json.loads(e.decode('UTF-8'))
|
||||
self.handle_event(event)
|
||||
|
||||
|
@ -163,14 +166,14 @@ class LogInjectorDaemon(object):
|
|||
if event["status"] == "start":
|
||||
Thread(target=self.relisten_on, args=(event["id"],)).start()
|
||||
|
||||
elif event["status"] == "stop":
|
||||
elif event["status"] == "die":
|
||||
Thread(target=self.end_listen_on, args=(event["id"],)).start()
|
||||
|
||||
def end_listen_on(self, container_id):
|
||||
"""
|
||||
Kill local listener for container_id
|
||||
"""
|
||||
sleep(10) # kill some time for any straggling log messages to be flushed out
|
||||
sleep(10) # hack: kill some time for any straggling log messages to be flushed out
|
||||
with self.loggers_lock:
|
||||
loggers_to_close = [l for l in self.loggers.keys() if self.loggers[l]["container_id"] == container_id]
|
||||
|
||||
|
@ -183,19 +186,33 @@ class LogInjectorDaemon(object):
|
|||
|
||||
del self.container_names[container_id]
|
||||
|
||||
def relisten_on(self, container_id):
|
||||
def relisten_on(self, container_id, rescan_num=None):
|
||||
"""
|
||||
Configure and spawn rsyslog in a container
|
||||
"""
|
||||
sleep(2) # hack
|
||||
|
||||
if container_id not in self.container_names:
|
||||
self.container_names[container_id] = self.get_container_name(container_id)
|
||||
try:
|
||||
container_info = self.docker.inspect_container(container_id)
|
||||
|
||||
container_name = self.container_names[container_id]
|
||||
# hack: lazy loading of bridge ip - we must listen for udp packets on the docker bridge interface, so we
|
||||
# need the IP for binding. Lazily set it after the first container is fetched from the docker host, as this
|
||||
# will always happen before any udp binding
|
||||
if not self.docker_bridge_ip:
|
||||
bridge_ip = container_info["NetworkSettings"]["Networks"]["bridge"]["Gateway"]
|
||||
logging.info("Found bridge ip: {}".format(bridge_ip))
|
||||
self.docker_bridge_ip = bridge_ip
|
||||
|
||||
if container_id not in self.container_names:
|
||||
# strip leading slash
|
||||
raw_name = container_info["Name"][1:]
|
||||
self.container_names[container_id] = raw_name
|
||||
logging.info("{}: is named {}".format(container_id, raw_name))
|
||||
|
||||
except errors.NotFound:
|
||||
logging.info("{}: no longer exists, aborting".format(container_id))
|
||||
return
|
||||
|
||||
logging.info("{}: setup".format(container_id))
|
||||
logging.info("{}: is named {}".format(container_id, container_name))
|
||||
|
||||
# Check for commonly know processes in the container
|
||||
ps_output = self.exec_in_container(container_id,
|
||||
|
@ -215,7 +232,17 @@ class LogInjectorDaemon(object):
|
|||
logging.info("{}: using: {}".format(container_id, str(modules_use)))
|
||||
|
||||
if len(modules_use) == 0:
|
||||
logging.info("{}: no log files found, exiting".format(container_id))
|
||||
|
||||
if rescan_num is None:
|
||||
rescan_num = self.rescans
|
||||
|
||||
if rescan_num == 0:
|
||||
logging.info("{}: no log files found, ignoring".format(container_id))
|
||||
return None
|
||||
|
||||
logging.info("{}: no log files found, scheduling rescan #{} in {} seconds".format(container_id, rescan_num,
|
||||
self.rescan_delay))
|
||||
Timer(self.rescan_delay, self.relisten_on, args=[container_id, rescan_num - 1]).start()
|
||||
return None
|
||||
|
||||
syslog_conf = self.render_template(container_id, self.template, modules_use)
|
||||
|
@ -255,24 +282,6 @@ class LogInjectorDaemon(object):
|
|||
# generate syslog config
|
||||
return Environment().from_string(template_contents).render(logfiles=logfiles)
|
||||
|
||||
def get_container_name(self, container_id):
|
||||
container_info = self.docker.inspect_container(container_id)
|
||||
|
||||
raw_name = container_info["Name"]
|
||||
|
||||
# strip leading slash
|
||||
raw_name = raw_name[1:]
|
||||
|
||||
# hack: lazy loading of bridge ip - we must listen for udp packets on the docker bridge interface, so we need
|
||||
# the IP for binding. Lazily set it after the first container is fetched from the docker host, as this will
|
||||
# always happen before any udp binding
|
||||
if not self.docker_bridge_ip:
|
||||
bridge_ip = container_info["NetworkSettings"]["Networks"]["bridge"]["Gateway"]
|
||||
logging.info("Found bridge ip: {}".format(bridge_ip))
|
||||
self.docker_bridge_ip = bridge_ip
|
||||
|
||||
return raw_name
|
||||
|
||||
def add_udp_listener(self, container_id, program, original_logname):
|
||||
"""
|
||||
Listen on a random UDP socket and create a new listener. A listener is an association between a udp port and
|
||||
|
|
Loading…
Reference in New Issue