python-esmonitor/pymonitor/daemon.py

154 lines
4.8 KiB
Python
Executable File

#!/usr/bin/env python3
from threading import Thread
from time import time, sleep
import traceback
import logging
import json
import sys
import os
from pymonitor.elasticsearch import ESBackend
from pymonitor.influxdb import InfluxBackend
class MonitorDaemon(Thread):
def __init__(self, config):
Thread.__init__(self)
self.config = config
self.threads = []
self.backend = {"elasticsearch": ESBackend,
"influxdb": InfluxBackend}[self.config["backend"]["type"]](self, self.config["backend"])
def run(self):
"""
Start all monitoring threads and block until they exit
"""
logger = logging.getLogger("monitordaemon")
checkerPath = os.path.dirname(os.path.realpath(__file__)) + "/monitors/"
sys.path.append(checkerPath)
logger.debug("path %s" % checkerPath)
# Create all monitoring threads
logger.debug("creating monitor threads")
for instance in self.config["monitors"]:
monitor_thread = MonitorThread(instance, self.backend)
self.threads.append(monitor_thread)
# Setup backend
self.backend.connect()
logger.debug("starting monitor threads")
for monitor_thread in self.threads:
monitor_thread.start()
# Tear down all threads
logger.debug("joining monitor threads")
for monitor_thread in self.threads:
monitor_thread.join()
logger.debug("joined monitor threads")
def shutdown(self):
"""
Signal all monitoring threads to stop
"""
for monitor_thread in self.threads:
monitor_thread.shutdown()
class MonitorThread(Thread):
def __init__(self, config, backend):
"""
Load checker function and prepare scheduler
"""
Thread.__init__(self)
self.config = config
self.backend = backend
self.logger = logging.getLogger("monitordaemon.monitorthread.%s" % self.config["type"])
self.logger.debug("initing worker thread with config %s" % self.config)
self.logger.debug("importing %s" % self.config["type"])
self.imported = __import__(self.config["type"])
self.checker_func = getattr(self.imported, self.config["type"])
self.logger.debug("checker func %s" % self.checker_func)
self.alive = True
self.delay = int(self.config["freq"])
self.lastRun = 0
def run(self):
"""
Call execute method every x seconds forever
"""
self.logger.debug("starting scheduler")
while self.alive:
if time() - self.lastRun > self.delay:
self.lastRun = time()
try:
self.execute(self.config["args"])
except:
tb = traceback.format_exc()
self.logger.warning(tb)
sleep(0.5)
self.logger.debug("scheduler exited")
def execute(self, args):
"""
Run the loaded checker function. Pass each Metric object yielded to the backend.
"""
before = time()
for result in self.checker_func(**args):
result.tags.update(type=self.config["type"])
self.logger.debug("result: %s" % (result,))
self.backend.add_data(result)
duration = time() - before
self.logger.info("runtime: %.3f" % duration)
def shutdown(self):
"""
Tell thread to exit
"""
self.logger.debug("canceling scheduler")
self.alive = False
def main():
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-c", "--config", action="store", type="string", dest="config", help="Path to config file")
parser.add_option("-l", "--logging", action="store", dest="logging", help="Logging level", default="INFO",
choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'])
(options, args) = parser.parse_args()
logging.basicConfig(level=getattr(logging, options.logging),
format="%(asctime)-15s %(levelname)-8s %(name)s@%(filename)s:%(lineno)d %(message)s")
logger = logging.getLogger("init")
logger.debug("options: %s" % options)
if options.config is None:
parser.print_help()
sys.exit()
with open(options.config, "r") as c:
if options.config.endswith('.json'):
conf = json.load(c)
elif options.config.endswith('.yml'):
from yaml import load as yaml_load
conf = yaml_load(c)
else:
raise Exception("Invalid config format")
logger.debug("starting daemon with conf: %s" % conf)
daemon = MonitorDaemon(conf)
try:
daemon.start()
daemon.join()
except KeyboardInterrupt:
print("")
daemon.shutdown()