python-esmonitor/pymonitor/daemon.py

154 lines
4.8 KiB
Python
Raw Permalink Normal View History

2015-12-04 22:41:57 -08:00
#!/usr/bin/env python3
from threading import Thread
2017-05-15 17:29:20 -07:00
from time import time, sleep
2015-12-27 15:16:18 -08:00
import traceback
2015-12-04 22:41:57 -08:00
import logging
import json
import sys
import os
2018-10-04 18:50:34 -07:00
from pymonitor.elasticsearch import ESBackend
from pymonitor.influxdb import InfluxBackend
2015-12-04 22:41:57 -08:00
2017-05-15 17:29:20 -07:00
2015-12-04 22:41:57 -08:00
class MonitorDaemon(Thread):
def __init__(self, config):
Thread.__init__(self)
self.config = config
self.threads = []
2018-10-04 18:50:34 -07:00
self.backend = {"elasticsearch": ESBackend,
"influxdb": InfluxBackend}[self.config["backend"]["type"]](self, self.config["backend"])
2017-05-15 17:29:20 -07:00
2015-12-04 22:41:57 -08:00
def run(self):
"""
Start all monitoring threads and block until they exit
"""
logger = logging.getLogger("monitordaemon")
2017-05-15 17:29:20 -07:00
checkerPath = os.path.dirname(os.path.realpath(__file__)) + "/monitors/"
2015-12-04 22:41:57 -08:00
sys.path.append(checkerPath)
logger.debug("path %s" % checkerPath)
2017-05-15 17:29:20 -07:00
2018-10-04 18:50:34 -07:00
# Create all monitoring threads
logger.debug("creating monitor threads")
2015-12-04 22:41:57 -08:00
for instance in self.config["monitors"]:
2015-12-05 00:11:26 -08:00
monitor_thread = MonitorThread(instance, self.backend)
2015-12-04 22:41:57 -08:00
self.threads.append(monitor_thread)
2017-05-15 17:29:20 -07:00
2018-10-04 18:50:34 -07:00
# Setup backend
2015-12-05 01:21:40 -08:00
self.backend.connect()
2017-05-15 17:29:20 -07:00
logger.debug("starting monitor threads")
2015-12-05 01:21:40 -08:00
for monitor_thread in self.threads:
monitor_thread.start()
2017-05-15 17:29:20 -07:00
2015-12-04 22:41:57 -08:00
# Tear down all threads
logger.debug("joining monitor threads")
2015-12-04 22:41:57 -08:00
for monitor_thread in self.threads:
monitor_thread.join()
2017-05-15 17:29:20 -07:00
logger.debug("joined monitor threads")
2017-05-15 17:29:20 -07:00
2015-12-04 22:41:57 -08:00
def shutdown(self):
"""
Signal all monitoring threads to stop
"""
for monitor_thread in self.threads:
monitor_thread.shutdown()
class MonitorThread(Thread):
2015-12-05 00:11:26 -08:00
def __init__(self, config, backend):
2015-12-04 22:41:57 -08:00
"""
Load checker function and prepare scheduler
"""
Thread.__init__(self)
self.config = config
2015-12-05 00:11:26 -08:00
self.backend = backend
2017-05-15 17:29:20 -07:00
self.logger = logging.getLogger("monitordaemon.monitorthread.%s" % self.config["type"])
self.logger.debug("initing worker thread with config %s" % self.config)
2017-05-15 17:29:20 -07:00
self.logger.debug("importing %s" % self.config["type"])
2018-10-04 18:50:34 -07:00
self.imported = __import__(self.config["type"])
self.checker_func = getattr(self.imported, self.config["type"])
self.logger.debug("checker func %s" % self.checker_func)
2017-05-15 17:29:20 -07:00
2015-12-04 22:41:57 -08:00
self.alive = True
self.delay = int(self.config["freq"])
self.lastRun = 0
2017-05-15 17:29:20 -07:00
2015-12-04 22:41:57 -08:00
def run(self):
"""
Call execute method every x seconds forever
"""
self.logger.debug("starting scheduler")
2015-12-04 22:41:57 -08:00
while self.alive:
if time() - self.lastRun > self.delay:
self.lastRun = time()
2015-12-27 15:16:18 -08:00
try:
self.execute(self.config["args"])
except:
tb = traceback.format_exc()
self.logger.warning(tb)
2015-12-05 00:40:54 -08:00
sleep(0.5)
self.logger.debug("scheduler exited")
2017-05-15 17:29:20 -07:00
2015-12-04 22:41:57 -08:00
def execute(self, args):
"""
2018-10-04 18:50:34 -07:00
Run the loaded checker function. Pass each Metric object yielded to the backend.
2015-12-04 22:41:57 -08:00
"""
before = time()
for result in self.checker_func(**args):
2018-10-04 18:50:34 -07:00
result.tags.update(type=self.config["type"])
self.logger.debug("result: %s" % (result,))
2018-10-04 18:50:34 -07:00
self.backend.add_data(result)
duration = time() - before
self.logger.info("runtime: %.3f" % duration)
2017-05-15 17:29:20 -07:00
2015-12-04 22:41:57 -08:00
def shutdown(self):
"""
Tell thread to exit
"""
2018-10-04 18:50:34 -07:00
self.logger.debug("canceling scheduler")
2017-05-15 17:29:20 -07:00
self.alive = False
2015-12-04 22:41:57 -08:00
2018-10-04 18:50:34 -07:00
def main():
2015-12-04 22:41:57 -08:00
from optparse import OptionParser
2017-05-15 17:29:20 -07:00
2015-12-04 22:41:57 -08:00
parser = OptionParser()
2017-05-15 17:29:20 -07:00
parser.add_option("-c", "--config", action="store", type="string", dest="config", help="Path to config file")
parser.add_option("-l", "--logging", action="store", dest="logging", help="Logging level", default="INFO",
2018-10-04 18:50:34 -07:00
choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'])
2017-05-15 17:29:20 -07:00
2015-12-04 22:41:57 -08:00
(options, args) = parser.parse_args()
2017-05-15 17:29:20 -07:00
logging.basicConfig(level=getattr(logging, options.logging),
format="%(asctime)-15s %(levelname)-8s %(name)s@%(filename)s:%(lineno)d %(message)s")
logger = logging.getLogger("init")
2017-05-15 17:29:20 -07:00
2015-12-04 22:41:57 -08:00
logger.debug("options: %s" % options)
2017-05-15 17:29:20 -07:00
if options.config is None:
2015-12-04 22:41:57 -08:00
parser.print_help()
sys.exit()
2017-05-15 17:29:20 -07:00
2015-12-04 22:41:57 -08:00
with open(options.config, "r") as c:
2018-10-04 18:50:34 -07:00
if options.config.endswith('.json'):
2015-12-27 15:51:42 -08:00
conf = json.load(c)
2018-10-04 18:50:34 -07:00
elif options.config.endswith('.yml'):
2015-12-27 15:51:42 -08:00
from yaml import load as yaml_load
conf = yaml_load(c)
else:
raise Exception("Invalid config format")
2017-05-15 17:29:20 -07:00
logger.debug("starting daemon with conf: %s" % conf)
2017-05-15 17:29:20 -07:00
2015-12-04 22:41:57 -08:00
daemon = MonitorDaemon(conf)
try:
daemon.start()
daemon.join()
except KeyboardInterrupt:
print("")
daemon.shutdown()