diff --git a/pymonitor/builtins/sysinfo.py b/pymonitor/builtins/sysinfo.py index ce6f276..e5056fd 100644 --- a/pymonitor/builtins/sysinfo.py +++ b/pymonitor/builtins/sysinfo.py @@ -1,4 +1,5 @@ -from subprocess import Popen,PIPE +from subprocess import Popen, PIPE + def ipaddr(): """ @@ -6,6 +7,7 @@ def ipaddr(): """ return Popen(["hostname", "--all-ip-addresses"], stdout=PIPE).communicate()[0].decode().split(" ")[0].strip() + def hostname(): """ Return system hostname from hostname -f diff --git a/pymonitor/daemon.py b/pymonitor/daemon.py index 92bb2ad..a1e53b0 100755 --- a/pymonitor/daemon.py +++ b/pymonitor/daemon.py @@ -2,7 +2,7 @@ from threading import Thread from elasticsearch import Elasticsearch -from time import time,sleep +from time import time, sleep from pymonitor.builtins import sysinfo import traceback import datetime @@ -11,43 +11,44 @@ import json import sys import os + class MonitorDaemon(Thread): def __init__(self, config): Thread.__init__(self) self.config = config self.threads = [] self.backend = Backend(self.config["backend"]["url"]) - + def run(self): """ Start all monitoring threads and block until they exit """ logger = logging.getLogger("monitordaemon") - - checkerPath = os.path.dirname(os.path.realpath(__file__))+"/monitors/" + + checkerPath = os.path.dirname(os.path.realpath(__file__)) + "/monitors/" sys.path.append(checkerPath) logger.debug("path %s" % checkerPath) - + # Create/start all monitoring threads logger.debug("creating monitor threads") for instance in self.config["monitors"]: monitor_thread = MonitorThread(instance, self.backend) self.threads.append(monitor_thread) self.backend.mapping.update(monitor_thread.mapping) - + self.backend.connect() - + logger.debug("starting monitor threads") for monitor_thread in self.threads: monitor_thread.start() - + # Tear down all threads logger.debug("joining monitor threads") for monitor_thread in self.threads: monitor_thread.join() - + logger.debug("joined monitor threads") - + def shutdown(self): """ Signal all monitoring threads to stop @@ -64,11 +65,11 @@ class Backend: self.es_url = es_url self.mapping = {} self.logger = logging.getLogger("monitordaemon.backend") - + self.sysinfo = {} self.update_sys_info() self.logger.debug("running on %(hostname)s (%(ipaddr)s)" % self.sysinfo) - + def connect(self): self.logger.debug("final mapping %s" % self.mapping) self.logger.debug("connecting to backend at %s" % self.es_url) @@ -76,7 +77,7 @@ class Backend: self.logger.debug("connected to backend") self.current_index = "" self.check_before_entry() - + def update_sys_info(self): """ Fetch generic system info that is sent with every piece of monitoring data @@ -84,13 +85,13 @@ class Backend: self.sysinfo["hostname"] = sysinfo.hostname() self.sysinfo["hostname_raw"] = self.sysinfo["hostname"] self.sysinfo["ipaddr"] = sysinfo.ipaddr() - + def get_index_name(self): """ Return name of current index such as 'monitor-2015.12.05' """ return "monitor-%s" % datetime.datetime.now().strftime("%Y.%m.%d") - + def create_index(self, indexName): """ Check if current index exists, and if not, create it @@ -98,7 +99,7 @@ class Backend: if not self.es.indices.exists(index=indexName): mapping = { "mappings": { - "_default_":{ + "_default_": { "properties": { "ipaddr": { "type": "ip" @@ -107,8 +108,8 @@ class Backend: "type": "string" }, "hostname_raw": { - "type" : "string", - "index" : "not_analyzed" + "type": "string", + "index": "not_analyzed" } } } @@ -116,9 +117,9 @@ class Backend: } mapping["mappings"].update(self.mapping) self.logger.debug("creating index %s with mapping %s" % (indexName, json.dumps(mapping, indent=4))) - self.es.indices.create(index=indexName, ignore=400, body=mapping)# ignore already exists error + self.es.indices.create(index=indexName, ignore=400, body=mapping) # ignore already exists error self.current_index = indexName - + def check_before_entry(self): """ Called before adding any data to ES. Checks if a new index should be created due to date change @@ -126,21 +127,21 @@ class Backend: indexName = self.get_index_name() if indexName != self.current_index: self.create_index(indexName) - + def add_data(self, data_type, data): """ Submit a piece of monitoring data """ self.check_before_entry() - + doc = self.sysinfo.copy() doc.update(data) doc["@timestamp"] = datetime.datetime.utcnow().isoformat() - + self.logger.debug("logging type %s: %s" % (data_type, doc)) res = self.es.index(index=self.current_index, doc_type=data_type, body=doc) self.logger.debug("%s created %s" % (data_type, res["_id"])) - + class MonitorThread(Thread): def __init__(self, config, backend): @@ -150,24 +151,24 @@ class MonitorThread(Thread): Thread.__init__(self) self.config = config self.backend = backend - self.logger = logging.getLogger("monitordaemon.monitorthread.%s"%self.config["type"]) + self.logger = logging.getLogger("monitordaemon.monitorthread.%s" % self.config["type"]) self.logger.debug("initing worker thread with config %s" % self.config) - + self.logger.debug("importing %s" % self.config["type"]) self.checker_func = getattr(__import__(self.config["type"]), self.config["type"]) self.logger.debug("checker func %s" % self.checker_func) - + self.mapping = {} - #try: + # try: self.mapping.update(__import__(self.config["type"]).mapping) - #except: - # pass + # except: + # pass self.logger.debug("mapping %s" % self.mapping) - + self.alive = True self.delay = int(self.config["freq"]) self.lastRun = 0 - + def run(self): """ Call execute method every x seconds forever @@ -183,7 +184,7 @@ class MonitorThread(Thread): self.logger.warning(tb) sleep(0.5) self.logger.debug("scheduler exited") - + def execute(self, args): """ Run the loaded checker function @@ -194,32 +195,35 @@ class MonitorThread(Thread): self.backend.add_data(self.config["type"], result) duration = time() - before self.logger.info("runtime: %.3f" % duration) - + def shutdown(self): """ Tell thread to exit """ self.logger.debug("cancelling scheduler") - self.alive=False + self.alive = False + def run_cli(): from optparse import OptionParser - + parser = OptionParser() - parser.add_option("-c", "--config", action="store", type="string", dest="config", help="Path to config file") - parser.add_option("-l", "--logging", action="store", dest="logging", help="Logging level", default="INFO", choices=['WARN', 'CRITICAL', 'WARNING', 'INFO', 'ERROR', 'DEBUG']) - + parser.add_option("-c", "--config", action="store", type="string", dest="config", help="Path to config file") + parser.add_option("-l", "--logging", action="store", dest="logging", help="Logging level", default="INFO", + choices=['WARN', 'CRITICAL', 'WARNING', 'INFO', 'ERROR', 'DEBUG']) + (options, args) = parser.parse_args() - - logging.basicConfig(level=getattr(logging, options.logging), format="%(asctime)-15s %(levelname)-8s %(name)s@%(filename)s:%(lineno)d %(message)s") + + logging.basicConfig(level=getattr(logging, options.logging), + format="%(asctime)-15s %(levelname)-8s %(name)s@%(filename)s:%(lineno)d %(message)s") logger = logging.getLogger("init") - + logger.debug("options: %s" % options) - - if options.config == None: + + if options.config is None: parser.print_help() sys.exit() - + with open(options.config, "r") as c: if options.config[-5:] == '.json': conf = json.load(c) @@ -228,9 +232,9 @@ def run_cli(): conf = yaml_load(c) else: raise Exception("Invalid config format") - + logger.debug("starting daemon with conf: %s" % conf) - + daemon = MonitorDaemon(conf) try: daemon.start() @@ -239,5 +243,6 @@ def run_cli(): print("") daemon.shutdown() + if __name__ == '__main__': run_cli() diff --git a/pymonitor/monitors/diskio.py b/pymonitor/monitors/diskio.py index 813f4c9..90c088e 100644 --- a/pymonitor/monitors/diskio.py +++ b/pymonitor/monitors/diskio.py @@ -1,10 +1,11 @@ from psutil import disk_io_counters + def diskio(disks=[]): with open("/proc/uptime", "r") as f: uptime = int(float(f.read().split(" ")[0])) diskinfo = disk_io_counters(perdisk=True) - for disk,stats in diskinfo.items(): + for disk, stats in diskinfo.items(): if disks and disk not in disks: continue if stats.read_count == 0 and disk not in disks: @@ -12,19 +13,20 @@ def diskio(disks=[]): stats = { "disk": disk, "disk_raw": disk, - "reads_ps": round(stats.read_count/uptime, 2), - "writes_ps":round(stats.write_count/uptime, 2), - "read_ps": round(stats.read_bytes/uptime, 2), - "write_ps": round(stats.write_bytes/uptime, 2), - "reads": stats.read_count, - "writes": stats.write_count, - "read": stats.read_bytes, - "written": stats.write_bytes, - "read_size":round(stats.read_bytes/stats.read_count, 2) if stats.read_count > 0 else 0, - "write_size":round(stats.write_bytes/stats.write_count, 2) if stats.write_count > 0 else 0 + "reads_ps": round(stats.read_count / uptime, 2), + "writes_ps": round(stats.write_count / uptime, 2), + "read_ps": round(stats.read_bytes / uptime, 2), + "write_ps": round(stats.write_bytes / uptime, 2), + "reads": stats.read_count, + "writes": stats.write_count, + "read": stats.read_bytes, + "written": stats.write_bytes, + "read_size": round(stats.read_bytes / stats.read_count, 2) if stats.read_count > 0 else 0, + "write_size": round(stats.write_bytes / stats.write_count, 2) if stats.write_count > 0 else 0 } yield(stats) + mapping = { "diskio": { "properties": { @@ -33,7 +35,7 @@ mapping = { }, "disk_raw": { "type": "string", - "index" : "not_analyzed" + "index": "not_analyzed" }, "reads_ps": { "type": "double" @@ -72,4 +74,3 @@ mapping = { if __name__ == '__main__': for item in diskio(): print(item) - diff --git a/pymonitor/monitors/diskspace.py b/pymonitor/monitors/diskspace.py index b2dae50..4845594 100644 --- a/pymonitor/monitors/diskspace.py +++ b/pymonitor/monitors/diskspace.py @@ -1,28 +1,30 @@ from os import statvfs + def diskspace(filesystems=[]): for fs in filesystems: stats = statvfs(fs) - + info = { "fs": fs, "fs_raw": fs, "diskfree": stats.f_bsize * stats.f_bavail, - "diskused": (stats.f_blocks-stats.f_bavail) * stats.f_bsize, + "diskused": (stats.f_blocks - stats.f_bavail) * stats.f_bsize, "disksize": stats.f_bsize * stats.f_blocks, "inodesmax": stats.f_files, "inodesfree": stats.f_favail, "inodesused": stats.f_files - stats.f_favail } - - info["diskpctused"] = round(info["diskused"]/info["disksize"] if info["disksize"] > 0 else 0, 2) - info["diskpctfree"] = round(info["diskfree"]/info["disksize"] if info["disksize"] > 0 else 0, 2) - + + info["diskpctused"] = round(info["diskused"] / info["disksize"] if info["disksize"] > 0 else 0, 2) + info["diskpctfree"] = round(info["diskfree"] / info["disksize"] if info["disksize"] > 0 else 0, 2) + info["inodesused_pct"] = round(info["inodesused"] / info["inodesmax"] if info["inodesmax"] > 0 else 0, 2) info["inodesfree_pct"] = round(info["inodesfree"] / info["inodesmax"] if info["inodesmax"] > 0 else 0, 2) - + yield info + mapping = { "diskspace": { "properties": { @@ -45,8 +47,8 @@ mapping = { "type": "string" }, "fs_raw": { - "type" : "string", - "index" : "not_analyzed" + "type": "string", + "index": "not_analyzed" }, "inodesmax": { "type": "long" @@ -67,6 +69,7 @@ mapping = { } } + if __name__ == '__main__': for item in diskspace(filesystems=["/", "/dev"]): print(item) diff --git a/pymonitor/monitors/load.py b/pymonitor/monitors/load.py index 7e0fe01..c2e4d19 100644 --- a/pymonitor/monitors/load.py +++ b/pymonitor/monitors/load.py @@ -4,9 +4,10 @@ def load(): yield { "load_1m": m1, "load_5m": m5, - "load_15m":m15 + "load_15m": m15 } + mapping = { "load": { "properties": { @@ -23,6 +24,7 @@ mapping = { } } + if __name__ == '__main__': for item in load(): print(item) diff --git a/pymonitor/monitors/meminfo.py b/pymonitor/monitors/meminfo.py index a035f9a..a4cb204 100644 --- a/pymonitor/monitors/meminfo.py +++ b/pymonitor/monitors/meminfo.py @@ -3,65 +3,72 @@ import re memline_pattern = re.compile(r'^(?P[^\\:]+)\:\s+(?P[0-9]+)(\s(?P[a-zA-Z]+))?') computed_fields = { - "mempctused": lambda items: round((items["memtotal"]-items["memfree"])/items["memtotal"], 2), - "mempctfree": lambda items: 1-round((items["memtotal"]-items["memfree"])/items["memtotal"], 2), - "mempctused_nocache": lambda items: round((items["memtotal"]-items["memfree"]-items["cached"])/items["memtotal"], 2), - "mempctfree_nocache": lambda items: 1-round((items["memtotal"]-items["memfree"]-items["cached"])/items["memtotal"], 2), - "swappctused": lambda items: round((items["swaptotal"]-items["swapfree"])/items["swaptotal"] if items["swaptotal"] > 0 else 0, 2), - "swappctfree": lambda items: 1-round((items["swaptotal"]-items["swapfree"])/items["swaptotal"] if items["swaptotal"] > 0 else 0, 2) + "mempctused": lambda items: round((items["memtotal"] - items["memfree"]) / items["memtotal"], 2), + "mempctfree": lambda items: 1 - round((items["memtotal"] - items["memfree"]) / items["memtotal"], 2), + "mempctused_nocache": lambda items: round((items["memtotal"] - items["memfree"] - items["cached"]) / + items["memtotal"], 2), + "mempctfree_nocache": lambda items: 1 - round((items["memtotal"] - items["memfree"] - items["cached"]) / + items["memtotal"], 2), + "swappctused": lambda items: round((items["swaptotal"] - items["swapfree"]) / + items["swaptotal"] if items["swaptotal"] > 0 else 0, 2), + "swappctfree": lambda items: 1 - round((items["swaptotal"] - items["swapfree"]) / + items["swaptotal"] if items["swaptotal"] > 0 else 0, 2) } + def meminfo(whitelist=[]): if not whitelist: - whitelist = ["swaptotal", "swapfree", "swapcached", - "memtotal", "memfree", "cached", + whitelist = ["swaptotal", "swapfree", "swapcached", + "memtotal", "memfree", "cached", "active", "inactive", ] - + result = {} with open("/proc/meminfo", "r") as f: for line in f.read().strip().split("\n"): matches = memline_pattern.match(line) - + value = int(matches.group("value")) unit = matches.group("unit") - + if unit: if unit == "kB": - value*=1024 + value *= 1024 else: raise Exception("Unknown unit") - - name = ''.join(c for c in matches.group("key").lower() if 96name mapping users = {} with open('/etc/passwd', 'r') as passwd: @@ -13,9 +14,9 @@ def procs(): line = passwd.readline() if not line: break - uname,x,uid,gid,opts,home,shell = line.split(":") - users[int(uid)]=uname - + uname, _, uid, gid, opts, home, shell = line.split(":") + users[int(uid)] = uname + # Get gid->groupname mapping groups = {} with open('/etc/group', 'r') as group: @@ -23,13 +24,13 @@ def procs(): line = group.readline() if not line: break - gname,x,gid,y = line.split(":") - groups[int(gid)]=gname - + gname, _, gid, y = line.split(":") + groups[int(gid)] = gname + num_procs = 0 num_threads = 0 num_kthreads = 0 - + for f in glob('/proc/[0-9]*/stat'): try: with open(f, "r") as statfile: @@ -38,21 +39,21 @@ def procs(): # Fix spaces in process names stat = PAT_REMOVE_PROC_SPACES.sub("PROCNAME", stat) stat = stat.split(" ") - - proc_id = int(stat[0]) + proc_parent = int(stat[3]) - + if proc_parent == KTHREADD_PID: - num_kthreads+=1 + num_kthreads += 1 else: - num_procs+=1 + num_procs += 1 num_threads += int(stat[19]) - + except Exception as e: print(e) print("Failed to open %s" % f) - - yield {"procs": num_procs, "threads":num_threads, "kthreads": num_kthreads} + + yield {"procs": num_procs, "threads": num_threads, "kthreads": num_kthreads} + mapping = { "procs": { @@ -70,6 +71,7 @@ mapping = { } } + if __name__ == '__main__': for item in procs(): print(item) diff --git a/pymonitor/monitors/uptime.py b/pymonitor/monitors/uptime.py index 9050151..c4ff45a 100644 --- a/pymonitor/monitors/uptime.py +++ b/pymonitor/monitors/uptime.py @@ -1,6 +1,7 @@ def uptime(): with open("/proc/uptime", "r") as f: - yield {"uptime":int(float(f.read().split(" ")[0]))} + yield {"uptime": int(float(f.read().split(" ")[0]))} + mapping = { "uptime": { @@ -12,6 +13,7 @@ mapping = { } } + if __name__ == '__main__': for item in uptime(): print(item)