close sockets when container stops and fix locking
clean up logging autodetect docker0 bridge ip
This commit is contained in:
parent
6214b8220e
commit
a0b18fc417
|
@ -8,7 +8,8 @@ import argparse
|
|||
|
||||
from select import select
|
||||
from collections import defaultdict, namedtuple
|
||||
from threading import Thread, Semaphore
|
||||
from threading import Thread, Lock
|
||||
from time import sleep
|
||||
|
||||
from docker import Client
|
||||
from docker import tls # NOQA
|
||||
|
@ -62,12 +63,13 @@ if ($syslogtag == "{{ logfile.program }}-{{ logfile.logname }}") then {
|
|||
|
||||
|
||||
def shell():
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logging.basicConfig(level=logging.DEBUG,
|
||||
format="%(asctime)-15s %(levelname)-8s %(filename)s:%(lineno)d %(message)s")
|
||||
[logging.getLogger(mute).setLevel(logging.ERROR) for mute in ["docker", "requests"]]
|
||||
|
||||
parser = argparse.ArgumentParser(description="Python logging daemon")
|
||||
parser.add_argument('-s', '--socket', required=True, help="Path or URL to docker daemon socket")
|
||||
parser.add_argument('-t', '--template', required=False, help="Path to syslog template")
|
||||
# parser.add_argument('-t', '--template', required=False, help="Path to syslog template")
|
||||
parser.add_argument('-o', '--output', required=True, help="Path to host log output dir")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
@ -81,7 +83,7 @@ def shell():
|
|||
# test connection
|
||||
docker_c.containers()
|
||||
|
||||
# TODO template file
|
||||
# TODO support template file from arg
|
||||
# with open(args.template) as f:
|
||||
# template_contents = f.read()
|
||||
|
||||
|
@ -107,7 +109,7 @@ class LogInjectorDaemon(object):
|
|||
self.use_builtins = {'nginx', 'php-fpm', 'xxx'}
|
||||
self.output_dir = os.path.abspath(output_dir)
|
||||
|
||||
self.docker_bridge_ip = '172.7.0.1' # TODO autodetect
|
||||
self.docker_bridge_ip = None
|
||||
|
||||
self.detectors = {
|
||||
"nginx": LogInjectorDaemon.detector("nginx",
|
||||
|
@ -118,7 +120,8 @@ class LogInjectorDaemon(object):
|
|||
}
|
||||
|
||||
self.loggers = {}
|
||||
self.loggers_lock = Semaphore(1) # TODO use this
|
||||
self.loggers_lock = Lock()
|
||||
self.container_names = {}
|
||||
|
||||
def run(self):
|
||||
containers = self.docker.containers()
|
||||
|
@ -130,9 +133,7 @@ class LogInjectorDaemon(object):
|
|||
message_recvr.start()
|
||||
|
||||
for container in containers:
|
||||
# TODO swap this in for prod
|
||||
# Thread(target=self.relisten_on, args=(container["Id"],)).start()
|
||||
self.relisten_on(container['Id'])
|
||||
Thread(target=self.relisten_on, args=(container["Id"],)).start()
|
||||
|
||||
try:
|
||||
while self.alive:
|
||||
|
@ -149,10 +150,11 @@ class LogInjectorDaemon(object):
|
|||
Loop through active loggers. If there's data on the line, read it. This is meant to be ran as a Thread
|
||||
"""
|
||||
while True:
|
||||
socket_fnos = list(self.loggers.keys())
|
||||
readable, _, dead = select(socket_fnos, [], socket_fnos, 1)
|
||||
for fno in readable:
|
||||
self.read_udp(fno)
|
||||
with self.loggers_lock:
|
||||
socket_fnos = list(self.loggers.keys())
|
||||
readable, _, dead = select(socket_fnos, [], socket_fnos, 0.2)
|
||||
for fno in readable:
|
||||
self.read_udp(fno)
|
||||
|
||||
def read_udp(self, fno):
|
||||
"""
|
||||
|
@ -164,7 +166,7 @@ class LogInjectorDaemon(object):
|
|||
|
||||
data = logger["socket"].recv(1024 * 32)
|
||||
|
||||
logging.info("writing {} bytes to {}".format(len(data), logger["local_logfile"]))
|
||||
logging.info("{}: writing {} bytes to {}".format(logger["container_id"], len(data), logger["local_logfile"]))
|
||||
|
||||
# this seems inefficient
|
||||
# TODO periodically close/open the file
|
||||
|
@ -177,30 +179,58 @@ class LogInjectorDaemon(object):
|
|||
try:
|
||||
for e in self.docker.events(filters=LogInjectorDaemon.EVENT_FILTERS_STOPSTART):
|
||||
event = json.loads(e.decode('UTF-8'))
|
||||
logging.info("event: {}".format(str(event)))
|
||||
# logging.info("event: {}".format(str(event)))
|
||||
if event["status"] == "start":
|
||||
logging.info("Got start on {}".format(event["id"]))
|
||||
self.relisten_on(event["id"])
|
||||
logging.info("{}: got start event".format(event["id"]))
|
||||
Thread(target=self.relisten_on, args=(event["id"],)).start()
|
||||
|
||||
elif event["status"] == "stop":
|
||||
logging.info("Got stop on {}".format(event["id"]))
|
||||
# TODO kill socket listener
|
||||
logging.info("{}: got stop event".format(event["id"]))
|
||||
Thread(target=self.end_listen_on, args=(event["id"],)).start()
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logging.warning("Stopped listening for events")
|
||||
|
||||
def end_listen_on(self, container_id):
|
||||
"""
|
||||
Kill local listener for container_id
|
||||
"""
|
||||
sleep(10) # kill some time for any straggling log messages to be flushed out
|
||||
with self.loggers_lock:
|
||||
loggers_to_close = [l for l in self.loggers.keys() if self.loggers[l]["container_id"] == container_id]
|
||||
|
||||
logging.info("{}: was stopped, closing fnos {}".format(container_id, str(loggers_to_close)))
|
||||
|
||||
for logger_fno in loggers_to_close:
|
||||
logger = self.loggers[logger_fno]
|
||||
del self.loggers[logger_fno]
|
||||
logger["socket"].close()
|
||||
|
||||
del self.container_names[container_id]
|
||||
|
||||
def relisten_on(self, container_id):
|
||||
logging.info("{}: Checking for logs".format(container_id))
|
||||
"""
|
||||
Configure and spawn rsyslog in a container
|
||||
"""
|
||||
sleep(2) # hack
|
||||
|
||||
if container_id not in self.container_names:
|
||||
self.container_names[container_id] = self.get_container_name(container_id)
|
||||
|
||||
container_name = self.container_names[container_id]
|
||||
|
||||
logging.info("{}: setup".format(container_id))
|
||||
logging.info("{}: is named {}".format(container_id, container_name))
|
||||
|
||||
# Check for commonly know processes in the container
|
||||
ps_output = self.exec_in_container(container_id,
|
||||
"ps --ppid 2 -p 2 --deselect -o cmd --no-headers").decode('utf-8')
|
||||
ps_lines = [line.strip() for line in ps_output.split("\n") if line]
|
||||
logging.info("{}: running procs: {}".format(container_id, str(ps_lines)))
|
||||
# logging.info("{}: running procs: {}".format(container_id, str(ps_lines)))
|
||||
|
||||
# look at ps, see no syslog
|
||||
if any(["rsyslogd" in i for i in ps_lines]):
|
||||
logging.warning("{}: Syslog already running... killing it".format(container_id))
|
||||
logging.warning("{}: killing rsyslogd".format(container_id))
|
||||
self.exec_in_container(container_id, "pkill rsyslogd")
|
||||
|
||||
modules_found = self.find_logs(ps_lines)
|
||||
|
@ -215,7 +245,7 @@ class LogInjectorDaemon(object):
|
|||
for path in self.detectors[mod].paths:
|
||||
original_logname = os.path.basename(path["path"])
|
||||
# add local listener
|
||||
new_port = self.add_udp_listener(container_id, mod, original_logname) # TODO use container name
|
||||
new_port = self.add_udp_listener(container_id, mod, original_logname)
|
||||
|
||||
logfiles += [{"program": mod,
|
||||
"path": path["path"],
|
||||
|
@ -232,8 +262,6 @@ class LogInjectorDaemon(object):
|
|||
logging.info("{}: no log files found, exiting".format(container_id))
|
||||
return
|
||||
|
||||
print(json.dumps(logfiles))
|
||||
|
||||
# generate syslog config
|
||||
syslog_conf = Environment().from_string(self.template).render(logfiles=logfiles)
|
||||
|
||||
|
@ -244,6 +272,28 @@ class LogInjectorDaemon(object):
|
|||
logging.info("{}: spawning rsyslogd".format(container_id))
|
||||
self.exec_in_container(container_id, '/usr/sbin/rsyslogd')
|
||||
|
||||
def get_container_name(self, container_id):
|
||||
container_info = self.docker.inspect_container(container_id)
|
||||
|
||||
raw_name = container_info["Name"]
|
||||
|
||||
# strip leading slash
|
||||
raw_name = raw_name[1:]
|
||||
|
||||
# hacky lazy loading
|
||||
if not self.docker_bridge_ip:
|
||||
self.set_bridge_ip(container_info["NetworkSettings"]["Networks"]["bridge"]["Gateway"])
|
||||
|
||||
return raw_name
|
||||
|
||||
def set_bridge_ip(self, bridge_ip):
|
||||
"""
|
||||
We must listen for udp packets on the docker bridge interface, so we need the IP for binding. Lazily set it
|
||||
after the first container is fetched from the docker host, as this will always happen before any udp binding
|
||||
"""
|
||||
logging.info("Found bridge ip: {}".format(bridge_ip))
|
||||
self.docker_bridge_ip = bridge_ip
|
||||
|
||||
def add_udp_listener(self, container_id, program, original_logname):
|
||||
"""
|
||||
Listen on a random UDP socket and create a new listener. A listener is an association between a udp port and
|
||||
|
@ -254,18 +304,16 @@ class LogInjectorDaemon(object):
|
|||
:return: int port number
|
||||
"""
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
s.bind((self.docker_bridge_ip, 0)) # TODO autodetect docker bridge IP.
|
||||
s.bind((self.docker_bridge_ip, 0))
|
||||
|
||||
log_path = os.path.join(self.output_dir, container_id, program, original_logname) # TODO container name
|
||||
log_path = os.path.join(self.output_dir, self.container_names[container_id], program, original_logname)
|
||||
os.makedirs(os.path.dirname(log_path), exist_ok=True)
|
||||
|
||||
self.loggers[s.fileno()] = {"socket": s,
|
||||
"container_id": container_id,
|
||||
"program": program,
|
||||
"logfile": original_logname,
|
||||
"local_logfile": log_path,
|
||||
#"local_fp": open(log_path, 'ab')
|
||||
}
|
||||
"local_logfile": log_path}
|
||||
|
||||
return s.getsockname()[1]
|
||||
|
||||
|
@ -292,7 +340,7 @@ class LogInjectorDaemon(object):
|
|||
This is ugly and sucks
|
||||
"""
|
||||
|
||||
logging.info("{}: writing {} bytes to {}".format(container, len(contents), path))
|
||||
logging.info("{}: writing {} bytes to container's {}".format(container, len(contents), path))
|
||||
|
||||
if type(contents) != bytes:
|
||||
contents = contents.encode('UTF-8')
|
||||
|
|
Loading…
Reference in New Issue