initial refactoring for multi backend support, elasticsearch 6

This commit is contained in:
dave 2018-08-19 20:46:21 -07:00
parent 4594014b15
commit a2cfc9059d
10 changed files with 246 additions and 211 deletions

View File

@ -1,4 +1,5 @@
backend:
type: elasticsearch
url: 'http://10.0.3.15:9200/'
monitors:
- type: uptime

View File

@ -1 +1,31 @@
__version__ = "0.2.0"
from itertools import chain
class Metric(object):
"""
Wrapper for holding metrics gathered from the system. All monitor modules yield multiple of these objects.
"""
def __init__(self, values, tags=None):
"""
:param values: dict of name->value metric data
:param tags: dict of key->value tags associated with the metric data
"""
self.values = values
self.tags = tags or {}
# def as_dict(self):
# """
# Represent the metric as a basic dictionary. Tags are added
# """
# # TODO return the classic elasticsearch format
# import pdb
# pdb.set_trace()
# pass
def __repr__(self):
fields = []
for k, v in chain(self.values.items(), self.tags.items()):
fields.append("{}={}".format(k, v))
return "<{}{{{}}}>".format(self.__class__.__name__, ','.join(fields))

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python3
from threading import Thread
from elasticsearch import Elasticsearch
from time import time, sleep
from pymonitor.builtins import sysinfo
import traceback
@ -17,7 +16,8 @@ class MonitorDaemon(Thread):
Thread.__init__(self)
self.config = config
self.threads = []
self.backend = Backend(self.config["backend"]["url"])
self.backend = {"elasticsearch": ESBackend,
"influxdb": InfluxBackend}[self.config["backend"]["type"]](self, self.config["backend"])
def run(self):
"""
@ -29,13 +29,13 @@ class MonitorDaemon(Thread):
sys.path.append(checkerPath)
logger.debug("path %s" % checkerPath)
# Create/start all monitoring threads
# Create all monitoring threads
logger.debug("creating monitor threads")
for instance in self.config["monitors"]:
monitor_thread = MonitorThread(instance, self.backend)
self.threads.append(monitor_thread)
self.backend.mapping.update(monitor_thread.mapping)
# Setup backend
self.backend.connect()
logger.debug("starting monitor threads")
@ -57,34 +57,65 @@ class MonitorDaemon(Thread):
monitor_thread.shutdown()
class Backend:
def __init__(self, es_url):
"""
Init elasticsearch client
"""
self.es_url = es_url
self.mapping = {}
class Backend(object):
"""
Base class for data storage backends
"""
def __init__(self, master, conf):
self.master = master
self.conf = conf
self.logger = logging.getLogger("monitordaemon.backend")
self.sysinfo = {}
self.update_sys_info()
self.logger.debug("running on %(hostname)s (%(ipaddr)s)" % self.sysinfo)
def connect(self):
self.logger.debug("final mapping %s" % self.mapping)
self.logger.debug("connecting to backend at %s" % self.es_url)
self.es = Elasticsearch([self.es_url])
self.logger.debug("connected to backend")
self.current_index = ""
self.check_before_entry()
"""
Connect to the backend and do any prep work
"""
raise NotImplementedError()
def update_sys_info(self):
"""
Fetch generic system info that is sent with every piece of monitoring data
"""
self.sysinfo["hostname"] = sysinfo.hostname()
self.sysinfo["hostname_raw"] = self.sysinfo["hostname"]
self.sysinfo["ipaddr"] = sysinfo.ipaddr()
#self.sysinfo["hostname_raw"] = self.sysinfo["hostname"]
#self.sysinfo["ipaddr"] = sysinfo.ipaddr()
def add_data(self, metric):
"""
Accept a Metric() object and send it off to the backend
"""
raise NotImplementedError()
class InfluxBackend(Backend):
pass
class ESBackend(Backend):
def __init__(self, master, conf):
"""
Init elasticsearch client
"""
super().__init__(master, conf)
self.mapping = {}
self.sysinfo = {}
self.update_sys_info()
#self.logger.debug("running on %(hostname)s (%(ipaddr)s)" % self.sysinfo)
def connect(self):
self.logger.debug("connecting to elasticsearch at %s" % self.conf["url"])
from elasticsearch import Elasticsearch
self.es = Elasticsearch([self.conf["url"]])
self.logger.debug("connected to backend")
for monitor_thread in self.master.threads:
self.mapping.update(monitor_thread.imported.mapping)
self.logger.debug("final mapping: ", self.mapping)
self.create_mapping_template()
self.current_index = ""
self.check_index()
def get_index_name(self):
"""
@ -92,35 +123,7 @@ class Backend:
"""
return "monitor-%s" % datetime.datetime.now().strftime("%Y.%m.%d")
def create_index(self, indexName):
"""
Check if current index exists, and if not, create it
"""
if not self.es.indices.exists(index=indexName):
mapping = {
"mappings": {
"_default_": {
"properties": {
"ipaddr": {
"type": "ip"
},
"hostname": {
"type": "string"
},
"hostname_raw": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
}
mapping["mappings"].update(self.mapping)
self.logger.debug("creating index %s with mapping %s" % (indexName, json.dumps(mapping, indent=4)))
self.es.indices.create(index=indexName, ignore=400, body=mapping) # ignore already exists error
self.current_index = indexName
def check_before_entry(self):
def check_index(self):
"""
Called before adding any data to ES. Checks if a new index should be created due to date change
"""
@ -128,19 +131,48 @@ class Backend:
if indexName != self.current_index:
self.create_index(indexName)
def add_data(self, data_type, data):
def create_index(self, indexName):
"""
Check if current index exists, and if not, create it
"""
if not self.es.indices.exists(index=indexName):
self.es.indices.create(index=indexName, ignore=400) # ignore already exists error
self.current_index = indexName
def create_mapping_template(self):
default_fields = {"ipaddr": {"type": "ip"}, # TODO i dont like how these default fields are handled in general
"hostname": {"type": "text"},
"hostname_raw": {"type": "keyword"},
"@timestamp": {"type": "date"}} #"field": "@timestamp"
fields = dict(**self.mapping, **default_fields)
template = {"index_patterns": ["monitor-*"],
"settings": {"number_of_shards": 1}, # TODO shard info from config file
"mappings": {"_default_": {"properties": fields}}}
self.logger.debug("creating template with body %s", json.dumps(template, indent=4))
self.es.indices.put_template(name="monitor", body=template)
def add_data(self, metric):
"""
Submit a piece of monitoring data
"""
self.check_before_entry()
self.check_index()
doc = self.sysinfo.copy()
doc.update(data)
doc["@timestamp"] = datetime.datetime.utcnow().isoformat()
metric.tags.update(**self.sysinfo)
metric.values["@timestamp"] = datetime.datetime.utcnow().isoformat() # TODO elasticsearch server-side timestamp
self.logger.debug("logging type %s: %s" % (data_type, doc))
res = self.es.index(index=self.current_index, doc_type=data_type, body=doc)
self.logger.debug("%s created %s" % (data_type, res["_id"]))
metric_dict = {}
metric_dict.update(metric.values)
metric_dict.update(metric.tags)
# We'll likely group by tags on the eventual frontend, and under elasticsearch this works best if the entire
# field is handled as a single keyword. Duplicate all tags into ${NAME}_raw fields, expected to be not analyzed
for k, v in metric.tags.items():
metric_dict["{}_raw".format(k)] = v
self.logger.debug("logging type %s: %s" % (metric.tags["type"], metric))
res = self.es.index(index=self.current_index, doc_type="monitor_data", body=metric_dict)
self.logger.debug("%s created %s" % (metric.tags["type"], res["_id"]))
class MonitorThread(Thread):
@ -155,16 +187,10 @@ class MonitorThread(Thread):
self.logger.debug("initing worker thread with config %s" % self.config)
self.logger.debug("importing %s" % self.config["type"])
self.checker_func = getattr(__import__(self.config["type"]), self.config["type"])
self.imported = __import__(self.config["type"])
self.checker_func = getattr(self.imported, self.config["type"])
self.logger.debug("checker func %s" % self.checker_func)
self.mapping = {}
# try:
self.mapping.update(__import__(self.config["type"]).mapping)
# except:
# pass
self.logger.debug("mapping %s" % self.mapping)
self.alive = True
self.delay = int(self.config["freq"])
self.lastRun = 0
@ -187,12 +213,13 @@ class MonitorThread(Thread):
def execute(self, args):
"""
Run the loaded checker function
Run the loaded checker function. Pass each Metric object yielded to the backend.
"""
before = time()
for result in self.checker_func(**args):
result.tags.update(type=self.config["type"])
self.logger.debug("result: %s" % (result,))
self.backend.add_data(self.config["type"], result)
self.backend.add_data(result)
duration = time() - before
self.logger.info("runtime: %.3f" % duration)
@ -200,7 +227,7 @@ class MonitorThread(Thread):
"""
Tell thread to exit
"""
self.logger.debug("cancelling scheduler")
self.logger.debug("canceling scheduler")
self.alive = False

View File

@ -1,3 +1,4 @@
from pymonitor import Metric
from psutil import disk_io_counters
@ -12,7 +13,6 @@ def diskio(disks=[]):
continue
stats = {
"disk": disk,
"disk_raw": disk,
"reads_ps": round(stats.read_count / uptime, 2),
"writes_ps": round(stats.write_count / uptime, 2),
"read_ps": round(stats.read_bytes / uptime, 2),
@ -24,50 +24,45 @@ def diskio(disks=[]):
"read_size": round(stats.read_bytes / stats.read_count, 2) if stats.read_count > 0 else 0,
"write_size": round(stats.write_bytes / stats.write_count, 2) if stats.write_count > 0 else 0
}
yield(stats)
yield Metric(stats, {"disk": disk})
mapping = {
"diskio": {
"properties": {
"disk": {
"type": "string"
},
"disk_raw": {
"type": "string",
"index": "not_analyzed"
},
"reads_ps": {
"type": "double"
},
"writes_ps": {
"type": "double"
},
"read_ps": {
"type": "double"
},
"write_ps": {
"type": "double"
},
"reads": {
"type": "long"
},
"writes": {
"type": "long"
},
"read": {
"type": "long"
},
"written": {
"type": "long"
},
"read_size": {
"type": "double"
},
"write_size": {
"type": "double"
}
}
"disk": {
"type": "text"
},
"disk_raw": {
"type": "keyword"
},
"reads_ps": {
"type": "double"
},
"writes_ps": {
"type": "double"
},
"read_ps": {
"type": "double"
},
"write_ps": {
"type": "double"
},
"reads": {
"type": "long"
},
"writes": {
"type": "long"
},
"read": {
"type": "long"
},
"written": {
"type": "long"
},
"read_size": {
"type": "double"
},
"write_size": {
"type": "double"
}
}

View File

@ -1,3 +1,4 @@
from pymonitor import Metric
from os import statvfs
import logging
@ -26,12 +27,10 @@ def diskspace(filesystems=[], discover=True, omit=[]):
try:
stats = statvfs(fs)
except FileNotFoundError:
logging.warning("filesystem not found: %s", fs)
logging.warning("filesystem not found: %s", repr(fs))
continue
info = {
"fs": fs,
"fs_raw": fs,
"diskfree": stats.f_bsize * stats.f_bavail,
"diskused": (stats.f_blocks - stats.f_bavail) * stats.f_bsize,
"disksize": stats.f_bsize * stats.f_blocks,
@ -46,50 +45,45 @@ def diskspace(filesystems=[], discover=True, omit=[]):
info["inodesused_pct"] = round(info["inodesused"] / info["inodesmax"] if info["inodesmax"] > 0 else 0, 5)
info["inodesfree_pct"] = round(info["inodesfree"] / info["inodesmax"] if info["inodesmax"] > 0 else 0, 5)
yield info
yield Metric(info, {"fs": fs})
mapping = {
"diskspace": {
"properties": {
"diskfree": {
"type": "long"
},
"diskused": {
"type": "long"
},
"disksize": {
"type": "long"
},
"diskpctused": {
"type": "double"
},
"diskpctfree": {
"type": "double"
},
"fs": {
"type": "string"
},
"fs_raw": {
"type": "string",
"index": "not_analyzed"
},
"inodesmax": {
"type": "long"
},
"inodesfree": {
"type": "long"
},
"inodesused": {
"type": "long"
},
"inodesused_pct": {
"type": "double"
},
"inodesfree_pct": {
"type": "double"
},
}
"diskfree": {
"type": "long"
},
"diskused": {
"type": "long"
},
"disksize": {
"type": "long"
},
"diskpctused": {
"type": "double"
},
"diskpctfree": {
"type": "double"
},
"fs": {
"type": "text"
},
"fs_raw": {
"type": "keyword"
},
"inodesmax": {
"type": "long"
},
"inodesfree": {
"type": "long"
},
"inodesused": {
"type": "long"
},
"inodesused_pct": {
"type": "double"
},
"inodesfree_pct": {
"type": "double"
}
}

View File

@ -1,3 +1,4 @@
from pymonitor import Metric
from collections import defaultdict
from time import time
@ -22,8 +23,7 @@ def ifstats(omit=[]):
for i in range(0, len(fields)):
fields[i] = int(fields[i])
record = {"iface": ifname,
"rx_bytes": fields[rx_bytes],
record = {"rx_bytes": fields[rx_bytes],
"tx_bytes": fields[tx_bytes],
"rx_packets": fields[rx_packets],
"tx_packets": fields[tx_packets],
@ -41,18 +41,17 @@ def ifstats(omit=[]):
if any([ifname.startswith(i) for i in omit or []]):
continue
yield record
yield Metric(record, {"iface": ifname})
mapping = {
"ifstats": {
"properties": {
"iface": {
"type": "string",
"type": "string"
},
"iface_raw": {
"type": "string",
"index": "not_analyzed"
"type": "keyword"
},
"rx_bytes": {
"type": "long"

View File

@ -1,26 +1,22 @@
from pymonitor import Metric
def load():
with open("/proc/loadavg", "r") as f:
m1, m5, m15, procs, pid = f.read().strip().split(" ")
yield {
"load_1m": m1,
"load_5m": m5,
"load_15m": m15
}
yield Metric({"load_1m": m1,
"load_5m": m5,
"load_15m": m15})
mapping = {
"load": {
"properties": {
"load_15m": {
"type": "double"
},
"load_5m": {
"type": "double"
},
"load_1m": {
"type": "double"
}
}
"load_15m": {
"type": "double"
},
"load_5m": {
"type": "double"
},
"load_1m": {
"type": "double"
}
}

View File

@ -1,3 +1,4 @@
from pymonitor import Metric
import re
memline_pattern = re.compile(r'^(?P<key>[^\\:]+)\:\s+(?P<value>[0-9]+)(\s(?P<unit>[a-zA-Z]+))?')
@ -43,29 +44,25 @@ def meminfo(whitelist=[]):
for key in computed_fields:
result[key] = computed_fields[key](result)
yield result
yield Metric(result)
mapping = {
"meminfo": {
"properties": {
"swaptotal": {"type": "long"},
"swapfree": {"type": "long"},
"swapcached": {"type": "long"},
"memtotal": {"type": "long"},
"memfree": {"type": "long"},
"memavailable": {"type": "long"},
"cached": {"type": "long"},
"active": {"type": "long"},
"inactive": {"type": "long"},
"mempctused": {"type": "double"},
"mempctfree": {"type": "double"},
"mempctused_nocache": {"type": "double"},
"mempctfree_nocache": {"type": "double"},
"swappctused": {"type": "double"},
"swappctfree": {"type": "double"}
}
}
"swaptotal": {"type": "long"},
"swapfree": {"type": "long"},
"swapcached": {"type": "long"},
"memtotal": {"type": "long"},
"memfree": {"type": "long"},
"memavailable": {"type": "long"},
"cached": {"type": "long"},
"active": {"type": "long"},
"inactive": {"type": "long"},
"mempctused": {"type": "double"},
"mempctfree": {"type": "double"},
"mempctused_nocache": {"type": "double"},
"mempctfree_nocache": {"type": "double"},
"swappctused": {"type": "double"},
"swappctfree": {"type": "double"}
}

View File

@ -1,3 +1,4 @@
from pymonitor import Metric
from glob import glob
import re
@ -52,7 +53,7 @@ def procs():
print(e)
print("Failed to open %s" % f)
yield {"procs": num_procs, "threads": num_threads, "kthreads": num_kthreads}
yield Metric({"procs": num_procs, "threads": num_threads, "kthreads": num_kthreads})
mapping = {

View File

@ -1,17 +1,12 @@
from pymonitor import Metric
def uptime():
with open("/proc/uptime", "r") as f:
yield {"uptime": int(float(f.read().split(" ")[0]))}
yield Metric({"uptime": int(float(f.read().split(" ")[0]))})
mapping = {
"uptime": {
"properties": {
"uptime": {
"type": "integer"
}
}
}
}
mapping = {"uptime": {"type": "integer"}}
if __name__ == '__main__':