85 lines
3.3 KiB
Python
85 lines
3.3 KiB
Python
from pymonitor import Backend
|
|
import datetime
|
|
import json
|
|
|
|
|
|
class ESBackend(Backend):
|
|
def __init__(self, master, conf):
|
|
"""
|
|
Init elasticsearch client
|
|
"""
|
|
super().__init__(master, conf)
|
|
self.mapping = {}
|
|
self.current_index = None
|
|
|
|
def connect(self):
|
|
self.logger.debug("connecting to elasticsearch at %s" % self.conf["url"])
|
|
from elasticsearch import Elasticsearch
|
|
self.es = Elasticsearch([self.conf["url"]])
|
|
self.logger.debug("connected to backend")
|
|
|
|
for monitor_thread in self.master.threads:
|
|
self.mapping.update(monitor_thread.imported.mapping)
|
|
self.logger.debug("final mapping: ", self.mapping)
|
|
self.create_mapping_template()
|
|
|
|
self.check_index()
|
|
|
|
def get_index_name(self):
|
|
"""
|
|
Return name of current index such as 'monitor-2015.12.05'
|
|
"""
|
|
return "monitor-%s" % datetime.datetime.now().strftime("%Y.%m.%d")
|
|
|
|
def check_index(self):
|
|
"""
|
|
Called before adding any data to ES. Checks if a new index should be created due to date change
|
|
"""
|
|
indexName = self.get_index_name()
|
|
if indexName != self.current_index:
|
|
self.create_index(indexName)
|
|
|
|
def create_index(self, indexName):
|
|
"""
|
|
Check if current index exists, and if not, create it
|
|
"""
|
|
if not self.es.indices.exists(index=indexName):
|
|
self.es.indices.create(index=indexName, ignore=400) # ignore already exists error
|
|
self.current_index = indexName
|
|
|
|
def create_mapping_template(self):
|
|
default_fields = {"ipaddr": {"type": "ip"}, # TODO i dont like how these default fields are handled in general
|
|
"hostname": {"type": "text"},
|
|
"hostname_raw": {"type": "keyword"},
|
|
"@timestamp": {"type": "date"}} #"field": "@timestamp"
|
|
|
|
fields = dict(**self.mapping)
|
|
fields.update(**default_fields)
|
|
template = {"index_patterns": ["monitor-*"],
|
|
"settings": {"number_of_shards": 1}, # TODO shard info from config file
|
|
"mappings": {"_default_": {"properties": fields}}}
|
|
self.logger.debug("creating template with body %s", json.dumps(template, indent=4))
|
|
self.es.indices.put_template(name="monitor", body=template)
|
|
|
|
def add_data(self, metric):
|
|
"""
|
|
Submit a piece of monitoring data
|
|
"""
|
|
self.check_index()
|
|
|
|
metric.tags.update(**self.sysinfo)
|
|
metric.values["@timestamp"] = datetime.datetime.utcnow().isoformat()
|
|
|
|
metric_dict = {}
|
|
metric_dict.update(metric.values)
|
|
metric_dict.update(metric.tags)
|
|
|
|
# We'll likely group by tags on the eventual frontend, and under elasticsearch this works best if the entire
|
|
# field is handled as a single keyword. Duplicate all tags into ${NAME}_raw fields, expected to be not analyzed
|
|
for k, v in metric.tags.items():
|
|
metric_dict["{}_raw".format(k)] = v
|
|
|
|
self.logger.debug("logging type %s: %s" % (metric.tags["type"], metric))
|
|
res = self.es.index(index=self.current_index, doc_type="monitor_data", body=metric_dict)
|
|
self.logger.debug("%s created %s" % (metric.tags["type"], res["_id"]))
|