More refactoring

This commit is contained in:
dave 2016-09-04 20:40:28 -07:00
parent 604379d9b1
commit b6155c3145
3 changed files with 28 additions and 19 deletions

View File

@ -24,7 +24,7 @@ entires sent by containers and writes them to disk on the host.
**Running** **Running**
* `loginjector_daemon -s unix://var/run/docker.sock -o /var/log/container/` * `loginjector -s unix://var/run/docker.sock -o /var/log/container/`
(The above arguments are actually the defaults and need not be specified) (The above arguments are actually the defaults and need not be specified)

View File

@ -7,13 +7,14 @@ import socket
import logging import logging
import argparse import argparse
from time import sleep
from select import select from select import select
from threading import Thread, Lock, Timer from threading import Thread, Lock, Timer
from requests import exceptions as requests_exc
from collections import defaultdict, namedtuple from collections import defaultdict, namedtuple
from docker import tls # NOQA from docker import tls # NOQA
from docker import Client, errors from docker import Client
from docker import errors as docker_exc
from jinja2 import Environment from jinja2 import Environment
@ -82,8 +83,8 @@ class LogInjectorDaemon(object):
self.loggers_lock = Lock() self.loggers_lock = Lock()
self.container_names = {} self.container_names = {}
self.rescan_delay = 10 self.rescan_delay = 30
self.rescans = 1 self.rescans = 2
signal.signal(signal.SIGTERM, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler)
@ -113,7 +114,7 @@ class LogInjectorDaemon(object):
change_listner.join(0.1) change_listner.join(0.1)
message_recvr.join(0.1) message_recvr.join(0.1)
except KeyboardInterrupt: except KeyboardInterrupt:
pass self.signal_handler(None, None)
self.docker.close() self.docker.close()
@ -167,13 +168,15 @@ class LogInjectorDaemon(object):
Thread(target=self.relisten_on, args=(event["id"],)).start() Thread(target=self.relisten_on, args=(event["id"],)).start()
elif event["status"] == "die": elif event["status"] == "die":
Thread(target=self.end_listen_on, args=(event["id"],)).start() # delay ending listener so remaining messages have time to flush
t = Timer(10, self.end_listen_on, args=(event["id"],))
t.daemon = True
t.start()
def end_listen_on(self, container_id): def end_listen_on(self, container_id):
""" """
Kill local listener for container_id Kill local listener for container_id
""" """
sleep(10) # hack: kill some time for any straggling log messages to be flushed out
with self.loggers_lock: with self.loggers_lock:
loggers_to_close = [l for l in self.loggers.keys() if self.loggers[l]["container_id"] == container_id] loggers_to_close = [l for l in self.loggers.keys() if self.loggers[l]["container_id"] == container_id]
@ -208,17 +211,14 @@ class LogInjectorDaemon(object):
self.container_names[container_id] = raw_name self.container_names[container_id] = raw_name
logging.info("{}: is named {}".format(container_id, raw_name)) logging.info("{}: is named {}".format(container_id, raw_name))
except errors.NotFound: except (docker_exc.NotFound, requests_exc.ReadTimeout):
logging.info("{}: no longer exists, aborting".format(container_id)) logging.info("{}: no longer exists, aborting".format(container_id))
return return
logging.info("{}: setup".format(container_id)) logging.info("{}: setup".format(container_id))
# Check for commonly know processes in the container # Check for commonly know processes in the container
ps_output = self.exec_in_container(container_id, ps_lines = self.get_running_procs(container_id)
"ps --ppid 2 -p 2 --deselect -o cmd --no-headers").decode('utf-8')
ps_lines = [line.strip() for line in ps_output.split("\n") if line]
# logging.info("{}: running procs: {}".format(container_id, str(ps_lines)))
# look at ps, see no syslog # look at ps, see no syslog
if any(["rsyslogd" in i for i in ps_lines]): if any(["rsyslogd" in i for i in ps_lines]):
@ -240,9 +240,12 @@ class LogInjectorDaemon(object):
logging.info("{}: no log files found, ignoring".format(container_id)) logging.info("{}: no log files found, ignoring".format(container_id))
return None return None
logging.info("{}: no log files found, scheduling rescan #{} in {} seconds".format(container_id, rescan_num, logging.info("{}: no log files found, scheduling rescan #{} in {} seconds".format(container_id,
rescan_num,
self.rescan_delay)) self.rescan_delay))
Timer(self.rescan_delay, self.relisten_on, args=[container_id, rescan_num - 1]).start() t = Timer(self.rescan_delay, self.relisten_on, args=[container_id, rescan_num - 1])
t.daemon = True
t.start()
return None return None
syslog_conf = self.render_template(container_id, self.template, modules_use) syslog_conf = self.render_template(container_id, self.template, modules_use)
@ -254,6 +257,14 @@ class LogInjectorDaemon(object):
logging.info("{}: spawning rsyslogd".format(container_id)) logging.info("{}: spawning rsyslogd".format(container_id))
self.exec_in_container(container_id, '/usr/sbin/rsyslogd') self.exec_in_container(container_id, '/usr/sbin/rsyslogd')
def get_running_procs(self, container_id):
"""
Runs `ps` in a container to get a list of processes
"""
ps_output = self.exec_in_container(container_id,
"ps --ppid 2 -p 2 --deselect -o cmd --no-headers").decode('utf-8')
return [line.strip() for line in ps_output.split("\n") if line]
def render_template(self, container_id, template_contents, log_modules): def render_template(self, container_id, template_contents, log_modules):
""" """
Create a rsyslog config from template Create a rsyslog config from template

View File

@ -12,8 +12,6 @@ setup(name='loginjector',
packages=['loginjector'], packages=['loginjector'],
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
'loginjector_daemon = loginjector.loginjector:shell', 'loginjector = loginjector.loginjector:shell',
] ]
}, })
#install_requires=['requests==2.10.0']
)