From 29c030addd927a28842ce3c1ccdaf092e1e8da24 Mon Sep 17 00:00:00 2001 From: dave Date: Sun, 20 Jan 2019 17:53:03 -0800 Subject: [PATCH] Initial commit --- .dockerignore | 1 + Dockerfile | 19 ++++ README.md | 34 ++++++ examples/example.py | 11 ++ examples/example2.py | 20 ++++ examples/example_auth.py | 17 +++ examples/git.py | 11 ++ examples/gitea.py | 10 ++ requirements.txt | 22 ++++ setup.py | 22 ++++ shipper/__init__.py | 160 ++++++++++++++++++++++++++ shipper/lib.py | 239 +++++++++++++++++++++++++++++++++++++++ shipper/runjob.py | 33 ++++++ 13 files changed, 599 insertions(+) create mode 100644 .dockerignore create mode 100644 Dockerfile create mode 100644 README.md create mode 100644 examples/example.py create mode 100644 examples/example2.py create mode 100644 examples/example_auth.py create mode 100644 examples/git.py create mode 100644 examples/gitea.py create mode 100644 requirements.txt create mode 100644 setup.py create mode 100644 shipper/__init__.py create mode 100644 shipper/lib.py create mode 100644 shipper/runjob.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..68090d1 --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +testenv diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..8afb376 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,19 @@ +FROM ubuntu:bionic + +RUN apt-get update && \ + apt-get install -y python3-pip rsync openssh-client curl wget git && \ + rm -rf /var/lib/apt/lists/* && \ + useradd app + +ADD . /tmp/code + +RUN cd /tmp/code && \ + pip3 install -r requirements.txt + +RUN cd /tmp/code && \ + python3 setup.py install + +EXPOSE 8080 +VOLUME /scripts +USER app +ENTRYPOINT ["shipperd", "-p", "8080", "-t", "/scripts"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..2264633 --- /dev/null +++ b/README.md @@ -0,0 +1,34 @@ +Shipper +======= + +Automation API server + +Jobs are written into individual python files. Their content will look like: + + +``` +# instantiate job object +job = ShipperJob() + +# Set the default information used with making connections (ssh, rsync, git, etc) +job.default_connection(SshConnection("192.168.1.60", "dave", key="foo.pem")) + +# Check out some repo +job.add_task(GitCheckoutTask("ssh://git@git.davepedu.com:223/dave/shipper.git", "code", branch="testing")) + +# Copy the files to a remote host +job.add_task(RsyncTask("./code/", "/tmp/deploy/")) + +# SSH to the host and run some commands +job.add_task(SshTask("ls -la /tmp/deploy/")) +job.add_task(SshTask("uname -a")) +``` + +For more examples, see `examples/`. + +If the above file is named "foo.py", this job would be triggered by making a request to http://host:port/task/foo. POST, +GET, and JSON Body data is made available to the job. + +To run the server, install this module and execute: + +* `shipperd -t jobfiledir/` diff --git a/examples/example.py b/examples/example.py new file mode 100644 index 0000000..82d93a8 --- /dev/null +++ b/examples/example.py @@ -0,0 +1,11 @@ +from shipper.lib import ShipperJob, SshConnection, SshTask + + +job = ShipperJob() + +# We have a username/password combination we can SSH with +job.default_connection(SshConnection("192.168.1.60", "dave", password="foobar")) + +# Run some commands on a remote host +job.add_task(SshTask("uptime")) +job.add_task(SshTask("uname -a")) diff --git a/examples/example2.py b/examples/example2.py new file mode 100644 index 0000000..a9b64c8 --- /dev/null +++ b/examples/example2.py @@ -0,0 +1,20 @@ +from shipper.lib import ShipperJob, SshConnection, CmdTask, RsyncTask, GitCheckoutTask + + +job = ShipperJob() + +# We have a username and private key file this time. Keyfile paths are relative to the script dir. +job.default_connection(SshConnection("192.168.1.60", "dave", key="keyfile.pem")) + +# Checkout some code +# Note that this will use the keyfile specified above. +# connection=SshConnection(...) can also be specified on GitCheckoutTask. Clone URLs starting with 'ssh' will require +# a private key; urls starting with 'https' will require a username & password. +job.add_task(GitCheckoutTask("ssh://git@git.davepedu.com:223/dave/shipper.git", "code", branch="master")) + +# Inspect the code locally +job.add_task(CmdTask("ls -la code/")) +job.add_task(CmdTask("du -sh code")) + +# Copy the code to some other host (using the ssh key above and username here for auth) +job.add_task(RsyncTask("./code/", "user@host:/var/deploy/")) diff --git a/examples/example_auth.py b/examples/example_auth.py new file mode 100644 index 0000000..c2144ec --- /dev/null +++ b/examples/example_auth.py @@ -0,0 +1,17 @@ +from shipper.lib import ShipperJob, PythonTask + +# By default, jobs require no auth to run. +# Setting the 'auth' variable to a set of sets containing username and password pairs, passing one of these pairs +# becomes require when triggering the job +auth = (("foo", "password"), + ("dave", "baz")) + + +def localfunc(job): + print("Job props:", job.props) + + +job = ShipperJob() + +# This task accepts a callback that is called during the job's execution. The job is passed as the only parameter +job.add_task(PythonTask(localfunc)) diff --git a/examples/git.py b/examples/git.py new file mode 100644 index 0000000..9f76c4d --- /dev/null +++ b/examples/git.py @@ -0,0 +1,11 @@ +from shipper.lib import ShipperJob, GitCheckoutTask, SshConnection + + +job = ShipperJob() + +# SSH private key files are used to authenticate against a git repo +job.default_connection(SshConnection(None, None, key="/Users/dave/.ssh/id_rsa")) +job.add_task(GitCheckoutTask("ssh://git@git.davepedu.com:223/dave/shipper.git", "code1", branch="master")) + +# Git clone URLs with a username and password can also be used +job.add_task(GitCheckoutTask("https://dave:mypassword@git.davepedu.com/dave/shipper.git", "code2", branch="master")) diff --git a/examples/gitea.py b/examples/gitea.py new file mode 100644 index 0000000..a44fe9f --- /dev/null +++ b/examples/gitea.py @@ -0,0 +1,10 @@ +from shipper.lib import ShipperJob, SshConnection, GiteaCheckoutTask, CmdTask + + +job = ShipperJob() +job.default_connection(SshConnection(None, None, key="testkey.pem")) + +# GiteaCheckoutTask will check out the repo and branch referenced by Gitea's webhook payload data +# Optionally, the job will be terminated unless the webhook references a branch in the allow_branches list. +job.add_task(GiteaCheckoutTask("code", allow_branches=["master"])) +job.add_task(CmdTask("ls -la")) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..40e684a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,22 @@ +asn1crypto==0.24.0 +backports.functools-lru-cache==1.5 +bcrypt==3.1.6 +cffi==1.11.5 +cheroot==6.5.4 +CherryPy==18.1.0 +cryptography==2.4.2 +gitdb2==2.0.5 +GitPython==2.1.11 +idna==2.8 +jaraco.functools==2.0 +more-itertools==5.0.0 +paramiko==2.4.2 +portend==2.3 +pyasn1==0.4.5 +pycparser==2.19 +PyNaCl==1.3.0 +pytz==2018.9 +six==1.12.0 +smmap2==2.0.5 +tempora==1.14 +zc.lockfile==1.4 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..dc3005b --- /dev/null +++ b/setup.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 +from setuptools import setup +import os + +__version__ = "0.0.1" +reqs = open(os.path.join(os.path.dirname(__file__), "requirements.txt")).read().split() + + +setup(name='shipper', + version=__version__, + description='Modular API server', + url='http://git.davepedu.com/dave/shipper', + author='dpedu', + author_email='dave@davepedu.com', + packages=['shipper'], + install_requires=reqs, + entry_points={ + "console_scripts": [ + "shipperd = shipper:main", + ] + }, + zip_safe=False) diff --git a/shipper/__init__.py b/shipper/__init__.py new file mode 100644 index 0000000..445488c --- /dev/null +++ b/shipper/__init__.py @@ -0,0 +1,160 @@ +import os +import cherrypy +import logging +import json +import queue +import importlib.util +from concurrent.futures import ThreadPoolExecutor +from threading import Thread +import traceback +import base64 +import sys +import subprocess + + +class AppWeb(object): + def __init__(self): + self.task = TaskWeb() + + @cherrypy.expose + def index(self): + yield "Hi! Welcome to the Shipper API server." + + +@cherrypy.popargs("task") +class TaskWeb(object): + def __init__(self): + pass + + @cherrypy.expose + def index(self, task, **kwargs): + cherrypy.engine.publish('task-queue', (task, kwargs)) + yield "OK" + + +class QueuedJob(object): + def __init__(self, name, args): + self.name = name + self.args = args + + +class TaskExecutor(object): + def __init__(self, runnerpath): + self.q = queue.Queue() + self.runnerpath = runnerpath + self.runner = Thread(target=self.run, daemon=True) + self.runner.start() + + def load_task(self, taskname): + srcfile = taskname + ".py" + spec = importlib.util.spec_from_file_location("job", srcfile) + job = importlib.util.module_from_spec(spec) + spec.loader.exec_module(job) + return job + + def enqueue(self, taskname, params): + """ + validate & load task object, append to the work queue + """ + # Load the job object + job = self.load_task(taskname) + + # Extract post body if present - we decode json and pass through other types as-is + payload = None + print("Login: ", cherrypy.request.login) + if cherrypy.request.method == "POST": + cl = cherrypy.request.headers.get('Content-Length', None) + if cl: + payload = cherrypy.request.body.read(int(cl)) + ctype = cherrypy.request.headers.get('Content-Type', None) + if ctype == "application/json": + payload = json.loads(payload) + if payload: + params["payload"] = payload + + # check auth if required by the job + if hasattr(job, "auth"): + auth = None + auth_header = cherrypy.request.headers.get('authorization') + if auth_header: + authtype, rest = auth_header.split(maxsplit=1) + if authtype.lower() == "basic": + auth = tuple(base64.standard_b64decode(rest.encode("ascii")).decode("utf-8").split(":")) + + print("{} not in {}: {}".format(auth, job.auth, auth not in job.auth)) + if auth not in job.auth: + cherrypy.serving.response.headers['www-authenticate'] = 'Basic realm="{}"'.format(taskname) + raise cherrypy.HTTPError(401, 'You are not authorized to access that job') + params["auth"] = auth + + print("Queueing: {} with params {}".format(taskname, params)) + self.q.put(QueuedJob(taskname, params)) + + def run(self): + with ThreadPoolExecutor(max_workers=5) as pool: + while True: + pool.submit(self.run_job, self.q.get()) + + def run_job(self, job): + try: + print("Executing task from {}.py".format(job.name)) + p = subprocess.Popen([sys.executable, self.runnerpath, job.name + ".py", json.dumps(job.args)]) + p.wait() + except: + print(traceback.format_exc()) + # TODO job logging and exception logging + print("Task complete") + + +def main(): + import argparse + import signal + + parser = argparse.ArgumentParser(description="Shipper API server") + + parser.add_argument('-p', '--port', default=8080, type=int, help="tcp port to listen on") + parser.add_argument('-t', '--tasks', default="./", help="dir containing task files") + parser.add_argument('--debug', action="store_true", help="enable development options") + + args = parser.parse_args() + + logging.basicConfig(level=logging.INFO if args.debug else logging.WARNING, + format="%(asctime)-15s %(levelname)-8s %(filename)s:%(lineno)d %(message)s") + + runnerpath = os.path.join(os.path.abspath(os.path.dirname(__file__)), "runjob.py") + os.chdir(args.tasks) + + web = AppWeb() + cherrypy.tree.mount(web, '/', {'/': {'tools.trailing_slash.on': False}}) + cherrypy.config.update({ + 'tools.sessions.on': False, + # 'tools.sessions.locking': 'explicit', + # 'tools.sessions.timeout': 525600, + 'request.show_tracebacks': True, + 'server.socket_port': args.port, + 'server.thread_pool': 5, + 'server.socket_host': '0.0.0.0', + # 'log.screen': False, + 'engine.autoreload.on': args.debug + }) + + executor = TaskExecutor(runnerpath) + cherrypy.engine.subscribe('task-queue', lambda e: executor.enqueue(*e)) + + def signal_handler(signum, stack): + logging.critical('Got sig {}, exiting...'.format(signum)) + cherrypy.engine.exit() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + try: + cherrypy.engine.start() + cherrypy.engine.block() + finally: + logging.info("API has shut down") + cherrypy.engine.exit() + + +if __name__ == '__main__': + main() diff --git a/shipper/lib.py b/shipper/lib.py new file mode 100644 index 0000000..df64263 --- /dev/null +++ b/shipper/lib.py @@ -0,0 +1,239 @@ +from contextlib import closing +import paramiko +import os +import subprocess + + +class ShipperJob(object): + """ + Job representation class + """ + def __init__(self): + self.tasks = [] + self.props = {} + + def default_connection(self, connection): + self.props["connection"] = connection + + def add_task(self, task): + task.validate(self) + self.tasks.append(task) + + def run(self, args): + self.props.update(**args) + while self.tasks: + task = self.tasks.pop(0) + print("******************************************************************************\n" + + "* {: <74} *\n".format(str(task)) + + "******************************************************************************") + task.run(self) + print() + + +class StopJob(Exception): + pass + + +class ShipperConnection(object): + pass + + +class SshConnection(ShipperConnection): + """ + Connection description used for tasks reliant on SSH + """ + def __init__(self, host, username, key=None, password=None, port=22): + self.host = host + self.username = username + self.key = os.path.abspath(key) + if key: + assert os.path.exists(key) + self.paramiko_key = paramiko.RSAKey.from_private_key_file(key) if key else None + self.password = password + self.port = port + + +class ShipperTask(object): + def __init__(self): + pass + + def validate(self, job): + pass + + def run(self, job): + raise NotImplementedError() + + +class CmdTask(ShipperTask): + """ + Execute a command locally + """ + def __init__(self, command): + super().__init__() + self.command = command + + def validate(self, job): + assert self.command + + def run(self, job): + subprocess.check_call(self.command, shell=not isinstance(self.command, list)) + + def __repr__(self): + return "".format(str(self.command)[0:50]) + + +class SshTask(ShipperTask): + """ + Execute a command over SSH + """ + def __init__(self, command, connection=None): + super().__init__() + self.connection = connection + self.command = command + + def validate(self, job): + self.conn = self.connection or job.props.get("connection") + assert self.conn + + def run(self, job): + with closing(paramiko.SSHClient()) as client: + client.set_missing_host_key_policy(paramiko.WarningPolicy) + connargs = {"pkey": self.conn.paramiko_key} if self.conn.paramiko_key else {"password": self.conn.password} + client.connect(hostname=self.conn.host, port=self.conn.port, username=self.conn.username, **connargs) + # stdin, stdout, stderr = client.exec_command('ls -l') + chan = client.get_transport().open_session() + chan.set_combine_stderr(True) + chan.get_pty() + f = chan.makefile() + chan.exec_command(self.command) + print(f.read().decode("utf-8")) + + def __repr__(self): + return "".format(self.command[0:50]) +# TODO something like SshTask that transfers a script file and executes it? A la jenkins ssh + + +from git import Repo + + +class GitCheckoutTask(SshTask): + """ + Check out a git repo to the local disk + """ + def __init__(self, repo, dest, branch="master", connection=None, gitopts=None): + super().__init__(None, connection) + self.repo = repo + self.dest = dest + self.gitopts = gitopts + self.branch = branch + + def validate(self, job): + assert (self.repo.startswith("ssh") and (self.connection or job.props.get("connection"))) \ + or self.repo.startswith("http") + super().validate(job) + + def run(self, job): + os.makedirs(self.dest, exist_ok=True) + repo = Repo.init(self.dest) + origin = repo.create_remote('origin', self.repo) + fetch_env = {"GIT_TERMINAL_PROMPT": "0"} + if self.repo.startswith("ssh"): + fetch_env["GIT_SSH_COMMAND"] = "ssh -i {} -o StrictHostKeyChecking=no".format(self.conn.key) + + with repo.git.custom_environment(**fetch_env): + origin.fetch() + origin.pull(origin.refs[0].remote_head) + + repo.git.checkout(self.branch) + print(repo.git.execute(["git", "log", "-1"])) + print() + print(repo.git.execute(["git", "log", "--pretty=oneline", "-10"])) + + def __repr__(self): + return "".format(self.repo) + + +class RsyncTask(SshTask): + """ + Rsync a file tree from the local disk to some remote system + """ + def __init__(self, src, dest, connection=None, exclude=None, delete=False, flags=None): + super().__init__(None, connection) + self.src = src + self.dest = dest + self.exclude = exclude + self.delete = delete + self.flags = flags + + def run(self, job): + rsync_cmd = ["rsync", "-avzr"] + + if self.conn and self.conn.key: + rsync_cmd += ["-e", "ssh -i '{}' -o StrictHostKeyChecking=no".format(self.conn.key)] + + if self.exclude: + for item in self.exclude: + rsync_cmd += ["--exclude={}".format(item)] + + if self.delete: + rsync_cmd += ["--delete"] + + if self.flags: + rsync_cmd += self.flags + + rsync_cmd += [self.src, self.dest] + + print(' '.join(rsync_cmd)) + subprocess.check_call(rsync_cmd) + + def __repr__(self): + return "".format(self.dest) + + +class PythonTask(ShipperTask): + """ + Call an arbitrary function passing a reference to the ShipperJob being executed + """ + def __init__(self, func): + super().__init__() + self.func = func + + def run(self, job): + self.func(job) + + def __repr__(self): + return "".format(self.func) + + +class GiteaCheckoutTask(GitCheckoutTask): + """ + Check out whatever git repo and branch the incoming data from Gitea referenced + """ + def __init__(self, dest, connection=None, gitopts=None, allow_branches=None): + super().__init__(None, dest, None, None, None) + self.allow_branches = allow_branches + + def validate(self, job): + self.conn = self.connection or job.props.get("connection") + assert self.conn + + def run(self, job): + data = job.props["payload"] + if self.conn.key: + self.repo = data["repository"]["ssh_url"] + else: + self.repo = data["repository"]["clone_url"]. \ + replace("://", "://{}:{}@".format(self.conn.username, self.conn.password)) + self.branch = data["ref"] + if self.allow_branches: + branch = self.branch + if branch.startswith("refs/heads/"): + branch = branch[len("refs/heads/"):] + if branch not in self.allow_branches: + raise StopJob("Branch '{}' is not whitelisted".format(branch)) + + print(self.repo) + super().run(job) + + def __repr__(self): + return "" diff --git a/shipper/runjob.py b/shipper/runjob.py new file mode 100644 index 0000000..0245eef --- /dev/null +++ b/shipper/runjob.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 +import os +import importlib +import argparse +import json +from tempfile import TemporaryDirectory + + +def load_task(srcfile): + spec = importlib.util.spec_from_file_location("job", srcfile) + job = importlib.util.module_from_spec(spec) + spec.loader.exec_module(job) + return job + + +def main(): + parser = argparse.ArgumentParser(description="Shipper task runner") + + parser.add_argument('jobfile', help="Job file to run") + parser.add_argument('args', help="JSON args") + + args = parser.parse_args() + + job = load_task(args.jobfile) + params = json.loads(args.args) + + with TemporaryDirectory() as d: + os.chdir(d) + job.job.run(params) + + +if __name__ == '__main__': + main()