z-hypervisor/zhypervisor/daemon.py

296 lines
10 KiB
Python
Raw Permalink Normal View History

2016-12-26 16:42:48 -08:00
import os
import json
import signal
import logging
import argparse
2016-12-28 18:43:07 -08:00
from glob import iglob
2016-12-26 17:14:28 -08:00
from threading import Thread
2016-12-28 18:43:07 -08:00
from concurrent.futures import ThreadPoolExecutor
2016-12-26 16:42:48 -08:00
from zhypervisor.logging import setup_logging
from zhypervisor.machine import MachineSpec
2016-12-28 18:43:07 -08:00
from zhypervisor.clients.qmachine import QDisk, IsoDisk
2017-02-24 18:11:36 -08:00
from zhypervisor.clients.dockermachine import DockerDisk
2016-12-28 18:43:07 -08:00
from zhypervisor.util import ZDisk
2016-12-26 17:14:28 -08:00
from zhypervisor.api.api import ZApi
2016-12-26 16:42:48 -08:00
class ZHypervisorDaemon(object):
def __init__(self, config):
2016-12-26 17:14:28 -08:00
"""
Z Hypervisor main thread. Roles:
- Load and start machines and API on init
- Cleanup on shutdown
- Committing changes to machines to disk
- Primary interface to modify machines
"""
self.config = config # JSON config listing, mainly, datastore paths
self.datastores = {} # Mapping of datastore name -> objects
2016-12-28 18:43:07 -08:00
self.disks = {} # Mapping of disk name -> objects
2016-12-26 17:14:28 -08:00
self.machines = {} # Mapping of machine name -> objects
2016-12-26 16:42:48 -08:00
self.running = True
2016-12-26 17:14:28 -08:00
# Set up datastores and use the default datastore for "State" storage
2016-12-26 16:42:48 -08:00
self.init_datastores()
self.state = ZConfig(self.datastores["default"])
2016-12-28 18:43:07 -08:00
# Set up disks
self.init_disks()
2016-12-26 17:14:28 -08:00
# start API
self.api = ZApi(self)
# Set up shutdown signal handlers
2016-12-26 16:42:48 -08:00
signal.signal(signal.SIGINT, self.signal_handler) # ctrl-c
signal.signal(signal.SIGTERM, self.signal_handler) # sigterm
def init_datastores(self):
2016-12-26 17:14:28 -08:00
"""
Per datastore in the config, create a ZDataStore object
"""
2016-12-26 16:42:48 -08:00
for name, info in self.config["datastores"].items():
self.datastores[name] = ZDataStore(name, info["path"], info.get("init", False))
2016-12-28 18:43:07 -08:00
def init_disks(self):
"""
Load all disks and ensure reachability
"""
for disk in self.state.get_disks():
self.add_disk(disk["disk_id"], disk["properties"])
2016-12-28 18:43:07 -08:00
2016-12-26 16:42:48 -08:00
def init_machines(self):
2016-12-26 17:14:28 -08:00
"""
Per machine in the on-disk state, create a machine object
"""
2016-12-26 16:42:48 -08:00
for machine_info in self.state.get_machines():
2016-12-26 17:14:28 -08:00
machine_id = machine_info["machine_id"]
self.add_machine(machine_id, machine_info["properties"])
2016-12-26 17:14:28 -08:00
2016-12-28 23:32:57 -08:00
# Launch if machine is an autostarted machine
machine = self.machines[machine_id]
if machine.properties.get("autostart", False) and machine.machine.get_status() == "stopped":
2016-12-28 23:32:57 -08:00
machine.start()
2016-12-26 16:42:48 -08:00
def signal_handler(self, signum, frame):
2016-12-26 17:14:28 -08:00
"""
Handle signals sent to the daemon. On any, exit.
"""
2016-12-26 16:42:48 -08:00
logging.critical("Got signal {}".format(signum))
self.stop()
def run(self):
2016-12-26 17:14:28 -08:00
"""
Main loop of the daemon. Sets up & starts machines, runs api, and waits.
"""
2016-12-26 16:42:48 -08:00
self.init_machines()
2016-12-26 17:14:28 -08:00
self.api.run()
2016-12-26 16:42:48 -08:00
def stop(self):
2016-12-26 17:14:28 -08:00
"""
SHut down the hypervisor. Stop the API then shut down machines
"""
2016-12-26 16:42:48 -08:00
self.running = False
2016-12-26 17:14:28 -08:00
self.api.stop()
2016-12-26 16:42:48 -08:00
with ThreadPoolExecutor(10) as pool:
2016-12-26 17:14:28 -08:00
for machine_id in self.machines.keys():
pool.submit(self.forceful_stop, machine_id)
# Sequential shutdown code below is easier to debug
# for machine_id in self.machines.keys():
# self.forceful_stop(machine_id)
2016-12-28 16:53:26 -08:00
# Below here are methods external forces may use to manipulate disks
2016-12-28 18:43:07 -08:00
def add_disk(self, disk_id, disk_spec, write=False):
2016-12-28 16:53:26 -08:00
"""
2016-12-28 18:43:07 -08:00
Create a disk
2016-12-28 16:53:26 -08:00
"""
2016-12-28 19:05:19 -08:00
assert disk_id not in self.disks, "Cannot update disks, only create supported"
disk_type = disk_spec["type"]
disk_datastore = disk_spec["datastore"]
2016-12-28 18:43:07 -08:00
datastore = self.datastores[disk_datastore]
if disk_type == "qdisk":
disk = QDisk(datastore, disk_id, disk_spec)
elif disk_type == "iso":
disk = IsoDisk(datastore, disk_id, disk_spec)
2017-02-24 18:11:36 -08:00
elif disk_type == "dockerdisk":
disk = DockerDisk(datastore, disk_id, disk_spec)
2016-12-28 18:43:07 -08:00
else:
raise Exception("Unknown disk type: {}".format(disk_type))
disk = ZDisk(datastore, disk_id, disk_spec)
if not disk.exists():
disk.init()
assert disk.exists(), "Disk file path is missing: {}".format(disk.get_path())
self.disks[disk_id] = disk
if write:
self.state.write_disk(disk_id, disk_spec)
def remove_disk(self, disk_id):
"""
Remove a disk from the system
"""
self.disks[disk_id].delete()
del self.disks[disk_id]
self.state.remove_disk(disk_id)
2016-12-28 16:53:26 -08:00
# Below here are methods external forces may use to manipulate machines
2016-12-28 18:43:07 -08:00
def add_machine(self, machine_id, machine_spec, write=False):
2016-12-28 16:53:26 -08:00
"""
Create or update a machine.
:param machine_id: alphanumeric id of machine to modify/create
:param machine_spec: dictionary of machine properties - see example/ubuntu.json
2016-12-28 16:53:26 -08:00
:param write: commit machinge changes to on-disk state
"""
# Find / create the machine
if machine_id in self.machines:
machine = self.machines[machine_id]
machine.properties = machine_spec
2016-12-28 16:53:26 -08:00
else:
2016-12-28 18:43:07 -08:00
machine = MachineSpec(self, machine_id, machine_spec)
2016-12-28 16:53:26 -08:00
self.machines[machine_id] = machine
# Update if necessary
if write:
2016-12-28 18:43:07 -08:00
self.state.write_machine(machine_id, machine_spec)
2016-12-28 16:53:26 -08:00
2016-12-28 23:32:57 -08:00
def forceful_stop(self, machine_id, timeout=30): # make this timeout longer?
2016-12-26 17:14:28 -08:00
"""
Gracefully stop a machine by asking it nicely, waiting some time, then forcefully killing it.
"""
machine_spec = self.machines[machine_id]
nice_stop = Thread(target=machine_spec.stop)
nice_stop.start()
nice_stop.join(timeout)
if nice_stop.is_alive():
logging.error("%s did not respond in %s seconds, killing", machine_id, timeout)
machine_spec.machine.kill_machine()
def remove_machine(self, machine_id):
"""
2016-12-28 16:53:26 -08:00
Remove a stopped machine from the system. The machine should already be stopped.
2016-12-26 17:14:28 -08:00
"""
assert self.machines[machine_id].machine.get_status() == "stopped"
self.state.remove_machine(machine_id)
del self.machines[machine_id]
2016-12-26 16:42:48 -08:00
class ZDataStore(object):
2016-12-26 17:14:28 -08:00
"""
Helper module representing a data storage location somewhere on disk
"""
2016-12-26 16:42:48 -08:00
def __init__(self, name, root_path, init_ok=False):
self.name = name
self.root_path = root_path
os.makedirs(self.root_path, exist_ok=True)
try:
metainfo_path = self.get_filepath(".datastore.json")
assert os.path.exists(metainfo_path), "Datastore missing or not initialized! " \
"File not found: {}".format(metainfo_path)
except:
if init_ok:
with open(metainfo_path, "w") as f:
json.dump({}, f, sort_keys=True, indent=4)
2016-12-26 16:42:48 -08:00
else:
raise
logging.info("Initialized datastore %s at %s", name, self.root_path)
def get_filepath(self, *paths):
return os.path.join(self.root_path, *paths)
class ZConfig(object):
2016-12-26 17:14:28 -08:00
"""
The Z Hypervisor daemon's interface to the on-disk config
"""
2016-12-26 16:42:48 -08:00
def __init__(self, datastore):
self.datastore = datastore
self.machine_data_dir = self.datastore.get_filepath("machines")
2016-12-28 18:43:07 -08:00
self.disk_data_dir = self.datastore.get_filepath("disks")
2016-12-26 16:42:48 -08:00
2016-12-28 18:43:07 -08:00
for d in [self.machine_data_dir, self.disk_data_dir]:
2016-12-26 16:42:48 -08:00
os.makedirs(d, exist_ok=True)
def get_machines(self):
2016-12-26 17:14:28 -08:00
"""
2016-12-28 18:43:07 -08:00
Return list of all machines on hypervisor
2016-12-26 17:14:28 -08:00
"""
2016-12-26 16:42:48 -08:00
machines = []
2016-12-28 18:43:07 -08:00
logging.info("Looking for machine configs in {}".format(self.machine_data_dir))
for f_name in iglob(self.machine_data_dir + '/*.json'):
with open(f_name, "r") as f:
2016-12-26 16:42:48 -08:00
machines.append(json.load(f))
return machines
2016-12-28 18:43:07 -08:00
def write_machine(self, machine_id, machine_spec):
2016-12-26 17:14:28 -08:00
"""
Write a machine's config to the disk. Params similar to elsewhere.
"""
with open(os.path.join(self.machine_data_dir, "{}.json".format(machine_id)), "w") as f:
json.dump({"machine_id": machine_id,
"properties": machine_spec}, f, indent=4, sort_keys=True)
2016-12-26 17:14:28 -08:00
def write_machine_o(self, machine_obj):
"""
Similar to write_machine, but accepts a MachineSpec object
"""
2016-12-28 18:43:07 -08:00
self.write_machine(machine_obj.machine_id, machine_obj.serialize())
2016-12-26 17:14:28 -08:00
def remove_machine(self, machine_id):
"""
Remove a machine from the on disk state
"""
json_path = os.path.join(self.machine_data_dir, "{}.json".format(machine_id))
os.unlink(json_path)
2016-12-28 18:43:07 -08:00
def get_disks(self):
"""
Return list of all disks on the hypervisor
"""
disks = []
logging.info("Looking for disk configs in {}".format(self.disk_data_dir))
for f_name in iglob(self.disk_data_dir + '/*.json'):
with open(f_name, "r") as f:
disks.append(json.load(f))
return disks
def write_disk(self, disk_id, disk_spec):
with open(os.path.join(self.disk_data_dir, "{}.json".format(disk_id)), "w") as f:
disk = {"disk_id": disk_id,
"properties": disk_spec}
2016-12-28 18:43:07 -08:00
json.dump(disk, f, indent=4)
def remove_disk(self, disk_id):
os.unlink(os.path.join(self.disk_data_dir, "{}.json".format(disk_id)))
2016-12-26 16:42:48 -08:00
def main():
setup_logging()
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", default="/etc/zd.json", help="Config file path")
args = parser.parse_args()
if not os.path.exists(args.config):
logging.warning("Config does not exist, attempting to write default config")
with open(args.config, "w") as f:
json.dump({"nodename": "examplenode",
"access": [("root", "toor", 0)],
"state": "/opt/datastore/state/",
"datastores": {
"default": {
2016-12-26 17:14:28 -08:00
"path": "/opt/z/datastore/machines/"
2016-12-26 16:42:48 -08:00
}
}}, f, indent=4, sort_keys=True)
2016-12-26 16:42:48 -08:00
return
with open(args.config) as f:
config = json.load(f)
z = ZHypervisorDaemon(config)
z.run()
2016-12-26 17:14:28 -08:00
print("Z has been shut down")