Initial commit

This commit is contained in:
dave 2019-01-20 17:53:03 -08:00
commit 29c030addd
13 changed files with 599 additions and 0 deletions

1
.dockerignore Normal file
View File

@ -0,0 +1 @@
testenv

19
Dockerfile Normal file
View File

@ -0,0 +1,19 @@
FROM ubuntu:bionic
RUN apt-get update && \
apt-get install -y python3-pip rsync openssh-client curl wget git && \
rm -rf /var/lib/apt/lists/* && \
useradd app
ADD . /tmp/code
RUN cd /tmp/code && \
pip3 install -r requirements.txt
RUN cd /tmp/code && \
python3 setup.py install
EXPOSE 8080
VOLUME /scripts
USER app
ENTRYPOINT ["shipperd", "-p", "8080", "-t", "/scripts"]

34
README.md Normal file
View File

@ -0,0 +1,34 @@
Shipper
=======
Automation API server
Jobs are written into individual python files. Their content will look like:
```
# instantiate job object
job = ShipperJob()
# Set the default information used with making connections (ssh, rsync, git, etc)
job.default_connection(SshConnection("192.168.1.60", "dave", key="foo.pem"))
# Check out some repo
job.add_task(GitCheckoutTask("ssh://git@git.davepedu.com:223/dave/shipper.git", "code", branch="testing"))
# Copy the files to a remote host
job.add_task(RsyncTask("./code/", "/tmp/deploy/"))
# SSH to the host and run some commands
job.add_task(SshTask("ls -la /tmp/deploy/"))
job.add_task(SshTask("uname -a"))
```
For more examples, see `examples/`.
If the above file is named "foo.py", this job would be triggered by making a request to http://host:port/task/foo. POST,
GET, and JSON Body data is made available to the job.
To run the server, install this module and execute:
* `shipperd -t jobfiledir/`

11
examples/example.py Normal file
View File

@ -0,0 +1,11 @@
from shipper.lib import ShipperJob, SshConnection, SshTask
job = ShipperJob()
# We have a username/password combination we can SSH with
job.default_connection(SshConnection("192.168.1.60", "dave", password="foobar"))
# Run some commands on a remote host
job.add_task(SshTask("uptime"))
job.add_task(SshTask("uname -a"))

20
examples/example2.py Normal file
View File

@ -0,0 +1,20 @@
from shipper.lib import ShipperJob, SshConnection, CmdTask, RsyncTask, GitCheckoutTask
job = ShipperJob()
# We have a username and private key file this time. Keyfile paths are relative to the script dir.
job.default_connection(SshConnection("192.168.1.60", "dave", key="keyfile.pem"))
# Checkout some code
# Note that this will use the keyfile specified above.
# connection=SshConnection(...) can also be specified on GitCheckoutTask. Clone URLs starting with 'ssh' will require
# a private key; urls starting with 'https' will require a username & password.
job.add_task(GitCheckoutTask("ssh://git@git.davepedu.com:223/dave/shipper.git", "code", branch="master"))
# Inspect the code locally
job.add_task(CmdTask("ls -la code/"))
job.add_task(CmdTask("du -sh code"))
# Copy the code to some other host (using the ssh key above and username here for auth)
job.add_task(RsyncTask("./code/", "user@host:/var/deploy/"))

17
examples/example_auth.py Normal file
View File

@ -0,0 +1,17 @@
from shipper.lib import ShipperJob, PythonTask
# By default, jobs require no auth to run.
# Setting the 'auth' variable to a set of sets containing username and password pairs, passing one of these pairs
# becomes require when triggering the job
auth = (("foo", "password"),
("dave", "baz"))
def localfunc(job):
print("Job props:", job.props)
job = ShipperJob()
# This task accepts a callback that is called during the job's execution. The job is passed as the only parameter
job.add_task(PythonTask(localfunc))

11
examples/git.py Normal file
View File

@ -0,0 +1,11 @@
from shipper.lib import ShipperJob, GitCheckoutTask, SshConnection
job = ShipperJob()
# SSH private key files are used to authenticate against a git repo
job.default_connection(SshConnection(None, None, key="/Users/dave/.ssh/id_rsa"))
job.add_task(GitCheckoutTask("ssh://git@git.davepedu.com:223/dave/shipper.git", "code1", branch="master"))
# Git clone URLs with a username and password can also be used
job.add_task(GitCheckoutTask("https://dave:mypassword@git.davepedu.com/dave/shipper.git", "code2", branch="master"))

10
examples/gitea.py Normal file
View File

@ -0,0 +1,10 @@
from shipper.lib import ShipperJob, SshConnection, GiteaCheckoutTask, CmdTask
job = ShipperJob()
job.default_connection(SshConnection(None, None, key="testkey.pem"))
# GiteaCheckoutTask will check out the repo and branch referenced by Gitea's webhook payload data
# Optionally, the job will be terminated unless the webhook references a branch in the allow_branches list.
job.add_task(GiteaCheckoutTask("code", allow_branches=["master"]))
job.add_task(CmdTask("ls -la"))

22
requirements.txt Normal file
View File

@ -0,0 +1,22 @@
asn1crypto==0.24.0
backports.functools-lru-cache==1.5
bcrypt==3.1.6
cffi==1.11.5
cheroot==6.5.4
CherryPy==18.1.0
cryptography==2.4.2
gitdb2==2.0.5
GitPython==2.1.11
idna==2.8
jaraco.functools==2.0
more-itertools==5.0.0
paramiko==2.4.2
portend==2.3
pyasn1==0.4.5
pycparser==2.19
PyNaCl==1.3.0
pytz==2018.9
six==1.12.0
smmap2==2.0.5
tempora==1.14
zc.lockfile==1.4

22
setup.py Normal file
View File

@ -0,0 +1,22 @@
#!/usr/bin/env python3
from setuptools import setup
import os
__version__ = "0.0.1"
reqs = open(os.path.join(os.path.dirname(__file__), "requirements.txt")).read().split()
setup(name='shipper',
version=__version__,
description='Modular API server',
url='http://git.davepedu.com/dave/shipper',
author='dpedu',
author_email='dave@davepedu.com',
packages=['shipper'],
install_requires=reqs,
entry_points={
"console_scripts": [
"shipperd = shipper:main",
]
},
zip_safe=False)

160
shipper/__init__.py Normal file
View File

@ -0,0 +1,160 @@
import os
import cherrypy
import logging
import json
import queue
import importlib.util
from concurrent.futures import ThreadPoolExecutor
from threading import Thread
import traceback
import base64
import sys
import subprocess
class AppWeb(object):
def __init__(self):
self.task = TaskWeb()
@cherrypy.expose
def index(self):
yield "Hi! Welcome to the Shipper API server."
@cherrypy.popargs("task")
class TaskWeb(object):
def __init__(self):
pass
@cherrypy.expose
def index(self, task, **kwargs):
cherrypy.engine.publish('task-queue', (task, kwargs))
yield "OK"
class QueuedJob(object):
def __init__(self, name, args):
self.name = name
self.args = args
class TaskExecutor(object):
def __init__(self, runnerpath):
self.q = queue.Queue()
self.runnerpath = runnerpath
self.runner = Thread(target=self.run, daemon=True)
self.runner.start()
def load_task(self, taskname):
srcfile = taskname + ".py"
spec = importlib.util.spec_from_file_location("job", srcfile)
job = importlib.util.module_from_spec(spec)
spec.loader.exec_module(job)
return job
def enqueue(self, taskname, params):
"""
validate & load task object, append to the work queue
"""
# Load the job object
job = self.load_task(taskname)
# Extract post body if present - we decode json and pass through other types as-is
payload = None
print("Login: ", cherrypy.request.login)
if cherrypy.request.method == "POST":
cl = cherrypy.request.headers.get('Content-Length', None)
if cl:
payload = cherrypy.request.body.read(int(cl))
ctype = cherrypy.request.headers.get('Content-Type', None)
if ctype == "application/json":
payload = json.loads(payload)
if payload:
params["payload"] = payload
# check auth if required by the job
if hasattr(job, "auth"):
auth = None
auth_header = cherrypy.request.headers.get('authorization')
if auth_header:
authtype, rest = auth_header.split(maxsplit=1)
if authtype.lower() == "basic":
auth = tuple(base64.standard_b64decode(rest.encode("ascii")).decode("utf-8").split(":"))
print("{} not in {}: {}".format(auth, job.auth, auth not in job.auth))
if auth not in job.auth:
cherrypy.serving.response.headers['www-authenticate'] = 'Basic realm="{}"'.format(taskname)
raise cherrypy.HTTPError(401, 'You are not authorized to access that job')
params["auth"] = auth
print("Queueing: {} with params {}".format(taskname, params))
self.q.put(QueuedJob(taskname, params))
def run(self):
with ThreadPoolExecutor(max_workers=5) as pool:
while True:
pool.submit(self.run_job, self.q.get())
def run_job(self, job):
try:
print("Executing task from {}.py".format(job.name))
p = subprocess.Popen([sys.executable, self.runnerpath, job.name + ".py", json.dumps(job.args)])
p.wait()
except:
print(traceback.format_exc())
# TODO job logging and exception logging
print("Task complete")
def main():
import argparse
import signal
parser = argparse.ArgumentParser(description="Shipper API server")
parser.add_argument('-p', '--port', default=8080, type=int, help="tcp port to listen on")
parser.add_argument('-t', '--tasks', default="./", help="dir containing task files")
parser.add_argument('--debug', action="store_true", help="enable development options")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO if args.debug else logging.WARNING,
format="%(asctime)-15s %(levelname)-8s %(filename)s:%(lineno)d %(message)s")
runnerpath = os.path.join(os.path.abspath(os.path.dirname(__file__)), "runjob.py")
os.chdir(args.tasks)
web = AppWeb()
cherrypy.tree.mount(web, '/', {'/': {'tools.trailing_slash.on': False}})
cherrypy.config.update({
'tools.sessions.on': False,
# 'tools.sessions.locking': 'explicit',
# 'tools.sessions.timeout': 525600,
'request.show_tracebacks': True,
'server.socket_port': args.port,
'server.thread_pool': 5,
'server.socket_host': '0.0.0.0',
# 'log.screen': False,
'engine.autoreload.on': args.debug
})
executor = TaskExecutor(runnerpath)
cherrypy.engine.subscribe('task-queue', lambda e: executor.enqueue(*e))
def signal_handler(signum, stack):
logging.critical('Got sig {}, exiting...'.format(signum))
cherrypy.engine.exit()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
cherrypy.engine.start()
cherrypy.engine.block()
finally:
logging.info("API has shut down")
cherrypy.engine.exit()
if __name__ == '__main__':
main()

239
shipper/lib.py Normal file
View File

@ -0,0 +1,239 @@
from contextlib import closing
import paramiko
import os
import subprocess
class ShipperJob(object):
"""
Job representation class
"""
def __init__(self):
self.tasks = []
self.props = {}
def default_connection(self, connection):
self.props["connection"] = connection
def add_task(self, task):
task.validate(self)
self.tasks.append(task)
def run(self, args):
self.props.update(**args)
while self.tasks:
task = self.tasks.pop(0)
print("******************************************************************************\n" +
"* {: <74} *\n".format(str(task)) +
"******************************************************************************")
task.run(self)
print()
class StopJob(Exception):
pass
class ShipperConnection(object):
pass
class SshConnection(ShipperConnection):
"""
Connection description used for tasks reliant on SSH
"""
def __init__(self, host, username, key=None, password=None, port=22):
self.host = host
self.username = username
self.key = os.path.abspath(key)
if key:
assert os.path.exists(key)
self.paramiko_key = paramiko.RSAKey.from_private_key_file(key) if key else None
self.password = password
self.port = port
class ShipperTask(object):
def __init__(self):
pass
def validate(self, job):
pass
def run(self, job):
raise NotImplementedError()
class CmdTask(ShipperTask):
"""
Execute a command locally
"""
def __init__(self, command):
super().__init__()
self.command = command
def validate(self, job):
assert self.command
def run(self, job):
subprocess.check_call(self.command, shell=not isinstance(self.command, list))
def __repr__(self):
return "<CmdTask cmd='{}'>".format(str(self.command)[0:50])
class SshTask(ShipperTask):
"""
Execute a command over SSH
"""
def __init__(self, command, connection=None):
super().__init__()
self.connection = connection
self.command = command
def validate(self, job):
self.conn = self.connection or job.props.get("connection")
assert self.conn
def run(self, job):
with closing(paramiko.SSHClient()) as client:
client.set_missing_host_key_policy(paramiko.WarningPolicy)
connargs = {"pkey": self.conn.paramiko_key} if self.conn.paramiko_key else {"password": self.conn.password}
client.connect(hostname=self.conn.host, port=self.conn.port, username=self.conn.username, **connargs)
# stdin, stdout, stderr = client.exec_command('ls -l')
chan = client.get_transport().open_session()
chan.set_combine_stderr(True)
chan.get_pty()
f = chan.makefile()
chan.exec_command(self.command)
print(f.read().decode("utf-8"))
def __repr__(self):
return "<SshTask cmd='{}'>".format(self.command[0:50])
# TODO something like SshTask that transfers a script file and executes it? A la jenkins ssh
from git import Repo
class GitCheckoutTask(SshTask):
"""
Check out a git repo to the local disk
"""
def __init__(self, repo, dest, branch="master", connection=None, gitopts=None):
super().__init__(None, connection)
self.repo = repo
self.dest = dest
self.gitopts = gitopts
self.branch = branch
def validate(self, job):
assert (self.repo.startswith("ssh") and (self.connection or job.props.get("connection"))) \
or self.repo.startswith("http")
super().validate(job)
def run(self, job):
os.makedirs(self.dest, exist_ok=True)
repo = Repo.init(self.dest)
origin = repo.create_remote('origin', self.repo)
fetch_env = {"GIT_TERMINAL_PROMPT": "0"}
if self.repo.startswith("ssh"):
fetch_env["GIT_SSH_COMMAND"] = "ssh -i {} -o StrictHostKeyChecking=no".format(self.conn.key)
with repo.git.custom_environment(**fetch_env):
origin.fetch()
origin.pull(origin.refs[0].remote_head)
repo.git.checkout(self.branch)
print(repo.git.execute(["git", "log", "-1"]))
print()
print(repo.git.execute(["git", "log", "--pretty=oneline", "-10"]))
def __repr__(self):
return "<GitCheckoutTask repo='{}'>".format(self.repo)
class RsyncTask(SshTask):
"""
Rsync a file tree from the local disk to some remote system
"""
def __init__(self, src, dest, connection=None, exclude=None, delete=False, flags=None):
super().__init__(None, connection)
self.src = src
self.dest = dest
self.exclude = exclude
self.delete = delete
self.flags = flags
def run(self, job):
rsync_cmd = ["rsync", "-avzr"]
if self.conn and self.conn.key:
rsync_cmd += ["-e", "ssh -i '{}' -o StrictHostKeyChecking=no".format(self.conn.key)]
if self.exclude:
for item in self.exclude:
rsync_cmd += ["--exclude={}".format(item)]
if self.delete:
rsync_cmd += ["--delete"]
if self.flags:
rsync_cmd += self.flags
rsync_cmd += [self.src, self.dest]
print(' '.join(rsync_cmd))
subprocess.check_call(rsync_cmd)
def __repr__(self):
return "<RsyncTask dest='{}'>".format(self.dest)
class PythonTask(ShipperTask):
"""
Call an arbitrary function passing a reference to the ShipperJob being executed
"""
def __init__(self, func):
super().__init__()
self.func = func
def run(self, job):
self.func(job)
def __repr__(self):
return "<PythonTask func='{}'>".format(self.func)
class GiteaCheckoutTask(GitCheckoutTask):
"""
Check out whatever git repo and branch the incoming data from Gitea referenced
"""
def __init__(self, dest, connection=None, gitopts=None, allow_branches=None):
super().__init__(None, dest, None, None, None)
self.allow_branches = allow_branches
def validate(self, job):
self.conn = self.connection or job.props.get("connection")
assert self.conn
def run(self, job):
data = job.props["payload"]
if self.conn.key:
self.repo = data["repository"]["ssh_url"]
else:
self.repo = data["repository"]["clone_url"]. \
replace("://", "://{}:{}@".format(self.conn.username, self.conn.password))
self.branch = data["ref"]
if self.allow_branches:
branch = self.branch
if branch.startswith("refs/heads/"):
branch = branch[len("refs/heads/"):]
if branch not in self.allow_branches:
raise StopJob("Branch '{}' is not whitelisted".format(branch))
print(self.repo)
super().run(job)
def __repr__(self):
return "<GiteaCheckoutTask>"

33
shipper/runjob.py Normal file
View File

@ -0,0 +1,33 @@
#!/usr/bin/env python3
import os
import importlib
import argparse
import json
from tempfile import TemporaryDirectory
def load_task(srcfile):
spec = importlib.util.spec_from_file_location("job", srcfile)
job = importlib.util.module_from_spec(spec)
spec.loader.exec_module(job)
return job
def main():
parser = argparse.ArgumentParser(description="Shipper task runner")
parser.add_argument('jobfile', help="Job file to run")
parser.add_argument('args', help="JSON args")
args = parser.parse_args()
job = load_task(args.jobfile)
params = json.loads(args.args)
with TemporaryDirectory() as d:
os.chdir(d)
job.job.run(params)
if __name__ == '__main__':
main()