From 6214b8220e8d8730f64507af592e75a1a520ee95 Mon Sep 17 00:00:00 2001 From: dave Date: Sat, 3 Sep 2016 22:37:20 -0700 Subject: [PATCH] Logging over udp works --- loginjector/loginjector.py | 130 ++++++++++++++++++++++++++++--------- requirements.txt | 1 - 2 files changed, 101 insertions(+), 30 deletions(-) diff --git a/loginjector/loginjector.py b/loginjector/loginjector.py index d9824ef..6834e58 100644 --- a/loginjector/loginjector.py +++ b/loginjector/loginjector.py @@ -1,16 +1,19 @@ -import logging -import argparse -from docker import Client, tls -from threading import Thread -from collections import defaultdict, namedtuple -from jinja2 import Environment import math import json import os +import socket +import logging +import argparse -from time import sleep, time # NOQA -import pdb # NOQA +from select import select +from collections import defaultdict, namedtuple +from threading import Thread, Semaphore + +from docker import Client +from docker import tls # NOQA + +from jinja2 import Environment DEFAULT_TEMPLATE = """ @@ -43,15 +46,14 @@ module(load="imfile") input(type="imfile" File="{{ logfile.path }}" statefile="{{ logfile.statefile }}" - Tag="{{ logfile.program }}" + Tag="{{ logfile.program }}-{{ logfile.logname }}" Severity="{{ logfile.program }}" facility="local0") -if ($syslogtag == "{{ logfile.program }}") then { +if ($syslogtag == "{{ logfile.program }}-{{ logfile.logname }}") then { local0.* @{{ logfile.dest_ip }}:{{ logfile.dest_port }};myFormat } - {% endfor %} *.* /var/log/syslog @@ -66,23 +68,24 @@ def shell(): parser = argparse.ArgumentParser(description="Python logging daemon") parser.add_argument('-s', '--socket', required=True, help="Path or URL to docker daemon socket") parser.add_argument('-t', '--template', required=False, help="Path to syslog template") - #parser.add_argument('-d', '--dest', required=True, help="Logs destination IP 1.2.3.4:xxxx", type=lambda x: x.split(":")) + parser.add_argument('-o', '--output', required=True, help="Path to host log output dir") args = parser.parse_args() - # TODO fixme - client_certs = tls.TLSConfig(client_cert=('/Users/dave/.docker/machine/machines/digio/cert.pem', - '/Users/dave/.docker/machine/machines/digio/key.pem'), - verify='/Users/dave/.docker/machine/machines/digio/ca.pem') - docker_c = Client(base_url=args.socket, tls=client_certs) + # TODO support ssl + # client_certs = tls.TLSConfig(client_cert=('/Users/dave/.docker/machine/machines/digio/cert.pem', + # '/Users/dave/.docker/machine/machines/digio/key.pem'), + # verify='/Users/dave/.docker/machine/machines/digio/ca.pem') + docker_c = Client(base_url=args.socket, version='auto') # , tls=client_certs) # test connection docker_c.containers() - #with open(args.template) as f: + # TODO template file + # with open(args.template) as f: # template_contents = f.read() - daemon = LogInjectorDaemon(docker_c, dest=args.dest) + daemon = LogInjectorDaemon(docker_c, output_dir=args.output) daemon.run() @@ -97,12 +100,14 @@ class LogInjectorDaemon(object): detector = namedtuple("Detector", "match paths") - def __init__(self, docker_client, dest, syslog_template=DEFAULT_TEMPLATE): + def __init__(self, docker_client, output_dir, syslog_template=DEFAULT_TEMPLATE): self.docker = docker_client self.alive = True self.template = syslog_template - self.dest = dest self.use_builtins = {'nginx', 'php-fpm', 'xxx'} + self.output_dir = os.path.abspath(output_dir) + + self.docker_bridge_ip = '172.7.0.1' # TODO autodetect self.detectors = { "nginx": LogInjectorDaemon.detector("nginx", @@ -113,6 +118,7 @@ class LogInjectorDaemon(object): } self.loggers = {} + self.loggers_lock = Semaphore(1) # TODO use this def run(self): containers = self.docker.containers() @@ -120,9 +126,13 @@ class LogInjectorDaemon(object): change_listner = Thread(target=self.listen_events, daemon=True) change_listner.start() + message_recvr = Thread(target=self.listen_udp, daemon=True) + message_recvr.start() + for container in containers: - Thread(target=self.relisten_on, args=(container["Id"],)).start() - # self.relisten_on(container['Id']) + # TODO swap this in for prod + # Thread(target=self.relisten_on, args=(container["Id"],)).start() + self.relisten_on(container['Id']) try: while self.alive: @@ -134,6 +144,35 @@ class LogInjectorDaemon(object): logging.warning("Main thread exiting") + def listen_udp(self): + """ + Loop through active loggers. If there's data on the line, read it. This is meant to be ran as a Thread + """ + while True: + socket_fnos = list(self.loggers.keys()) + readable, _, dead = select(socket_fnos, [], socket_fnos, 1) + for fno in readable: + self.read_udp(fno) + + def read_udp(self, fno): + """ + Called when there's data on the line on one of the incoming log data sockets. Read the data and write to the + logger's logfile. + :param fno: file number of the socket with waiting data. also keys of self.loggers + """ + logger = self.loggers[fno] + + data = logger["socket"].recv(1024 * 32) + + logging.info("writing {} bytes to {}".format(len(data), logger["local_logfile"])) + + # this seems inefficient + # TODO periodically close/open the file + with open(logger["local_logfile"], 'ab') as f: + f.write(data) + f.flush() + os.fsync(f.fileno()) # is this necessary since we're closing the file?l + def listen_events(self): try: for e in self.docker.events(filters=LogInjectorDaemon.EVENT_FILTERS_STOPSTART): @@ -145,12 +184,12 @@ class LogInjectorDaemon(object): elif event["status"] == "stop": logging.info("Got stop on {}".format(event["id"])) + # TODO kill socket listener except KeyboardInterrupt: logging.warning("Stopped listening for events") def relisten_on(self, container_id): - sleep(2) logging.info("{}: Checking for logs".format(container_id)) # Check for commonly know processes in the container @@ -161,8 +200,8 @@ class LogInjectorDaemon(object): # look at ps, see no syslog if any(["rsyslogd" in i for i in ps_lines]): - logging.warning("{}: Syslog already running,..".format(container_id)) - return + logging.warning("{}: Syslog already running... killing it".format(container_id)) + self.exec_in_container(container_id, "pkill rsyslogd") modules_found = self.find_logs(ps_lines) logging.info("{}: logs detected: {}".format(container_id, str(modules_found))) @@ -174,19 +213,27 @@ class LogInjectorDaemon(object): for mod in modules_use: for path in self.detectors[mod].paths: + original_logname = os.path.basename(path["path"]) + # add local listener + new_port = self.add_udp_listener(container_id, mod, original_logname) # TODO use container name + logfiles += [{"program": mod, "path": path["path"], "level": path["level"], + "logname": original_logname, "statefile":"mod-{}-{}-{}.state".format(mod, - os.path.basename(path["path"]), + original_logname, path["level"]), - "dest_ip": self.dest[0], # TODO different dest per log - "dest_port": self.dest[1]}] # TODO different port per log + "dest_ip": self.docker_bridge_ip, + "dest_port": new_port, + "container_id": container_id}] if len(logfiles) == 0: logging.info("{}: no log files found, exiting".format(container_id)) return + print(json.dumps(logfiles)) + # generate syslog config syslog_conf = Environment().from_string(self.template).render(logfiles=logfiles) @@ -197,6 +244,31 @@ class LogInjectorDaemon(object): logging.info("{}: spawning rsyslogd".format(container_id)) self.exec_in_container(container_id, '/usr/sbin/rsyslogd') + def add_udp_listener(self, container_id, program, original_logname): + """ + Listen on a random UDP socket and create a new listener. A listener is an association between a udp port and + a log file source. Return the port number + :param container_name: container name + :param program: program name in the container + :param container_id: should be obvious + :return: int port number + """ + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.bind((self.docker_bridge_ip, 0)) # TODO autodetect docker bridge IP. + + log_path = os.path.join(self.output_dir, container_id, program, original_logname) # TODO container name + os.makedirs(os.path.dirname(log_path), exist_ok=True) + + self.loggers[s.fileno()] = {"socket": s, + "container_id": container_id, + "program": program, + "logfile": original_logname, + "local_logfile": log_path, + #"local_fp": open(log_path, 'ab') + } + + return s.getsockname()[1] + def find_logs(self, process_names): """ Given a list of process names, guess common places for their logs to be diff --git a/requirements.txt b/requirements.txt index cd20092..32435f6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,6 @@ backports.ssl-match-hostname==3.5.0.1 docker-py==1.9.0 Jinja2==2.8 -loginjector==0.0.0 MarkupSafe==0.23 requests==2.11.1 six==1.10.0