python-esmonitor/pymonitor/elasticsearch.py

85 lines
3.3 KiB
Python

from pymonitor import Backend
import datetime
import json
class ESBackend(Backend):
def __init__(self, master, conf):
"""
Init elasticsearch client
"""
super().__init__(master, conf)
self.mapping = {}
self.current_index = None
def connect(self):
self.logger.debug("connecting to elasticsearch at %s" % self.conf["url"])
from elasticsearch import Elasticsearch
self.es = Elasticsearch([self.conf["url"]])
self.logger.debug("connected to backend")
for monitor_thread in self.master.threads:
self.mapping.update(monitor_thread.imported.mapping)
self.logger.debug("final mapping: ", self.mapping)
self.create_mapping_template()
self.check_index()
def get_index_name(self):
"""
Return name of current index such as 'monitor-2015.12.05'
"""
return "monitor-%s" % datetime.datetime.now().strftime("%Y.%m.%d")
def check_index(self):
"""
Called before adding any data to ES. Checks if a new index should be created due to date change
"""
indexName = self.get_index_name()
if indexName != self.current_index:
self.create_index(indexName)
def create_index(self, indexName):
"""
Check if current index exists, and if not, create it
"""
if not self.es.indices.exists(index=indexName):
self.es.indices.create(index=indexName, ignore=400) # ignore already exists error
self.current_index = indexName
def create_mapping_template(self):
default_fields = {"ipaddr": {"type": "ip"}, # TODO i dont like how these default fields are handled in general
"hostname": {"type": "text"},
"hostname_raw": {"type": "keyword"},
"@timestamp": {"type": "date"}} #"field": "@timestamp"
fields = dict(**self.mapping)
fields.update(**default_fields)
template = {"index_patterns": ["monitor-*"],
"settings": {"number_of_shards": 1}, # TODO shard info from config file
"mappings": {"_default_": {"properties": fields}}}
self.logger.debug("creating template with body %s", json.dumps(template, indent=4))
self.es.indices.put_template(name="monitor", body=template)
def add_data(self, metric):
"""
Submit a piece of monitoring data
"""
self.check_index()
metric.tags.update(**self.sysinfo)
metric.values["@timestamp"] = datetime.datetime.utcnow().isoformat()
metric_dict = {}
metric_dict.update(metric.values)
metric_dict.update(metric.tags)
# We'll likely group by tags on the eventual frontend, and under elasticsearch this works best if the entire
# field is handled as a single keyword. Duplicate all tags into ${NAME}_raw fields, expected to be not analyzed
for k, v in metric.tags.items():
metric_dict["{}_raw".format(k)] = v
self.logger.debug("logging type %s: %s" % (metric.tags["type"], metric))
res = self.es.index(index=self.current_index, doc_type="monitor_data", body=metric_dict)
self.logger.debug("%s created %s" % (metric.tags["type"], res["_id"]))