Compare commits
19 Commits
Author | SHA1 | Date |
---|---|---|
|
97805cf12d | |
|
891bcd4181 | |
|
db86cf3392 | |
|
778905b799 | |
|
39e04a9c7b | |
|
22a661dcf7 | |
|
9607708314 | |
|
7996c829d0 | |
|
e649c988cf | |
|
ab2858cb04 | |
|
62190f357f | |
|
b582101543 | |
|
1e0c323b67 | |
|
f4c708cc82 | |
|
39a7ba4b02 | |
|
d8cac3f641 | |
|
cb328659cc | |
|
6958040ad5 | |
|
7b487e562b |
|
@ -2,8 +2,8 @@ import os
|
|||
import cherrypy
|
||||
import json
|
||||
from datetime import datetime
|
||||
from photoapp.types import Photo, PhotoSet, Tag, TagItem, PhotoStatus, User, known_extensions, known_mimes, \
|
||||
genuuid, generate_storage_path
|
||||
from photoapp.types import Photo, PhotoSet, Tag, TagItem, PhotoStatus, User, JobTargetType, known_extensions, \
|
||||
known_mimes, genuuid, generate_storage_path
|
||||
from photoapp.utils import copysha, get_extension, slugify
|
||||
from photoapp.image import special_magic_fobj
|
||||
from photoapp.storage import StorageAdapter
|
||||
|
@ -38,6 +38,7 @@ class PhotosApiV1(object):
|
|||
self.stats = PhotosApiV1Stats()
|
||||
self.photos = PhotosApiV1Photos()
|
||||
self.tags = PhotosApiV1PhotoTags()
|
||||
self.jobs = JobsApiV1(library)
|
||||
|
||||
def GET(self):
|
||||
cherrypy.response.headers["Content-type"] = "text/plain"
|
||||
|
@ -284,3 +285,76 @@ class PhotosApiV1PhotoTags(object):
|
|||
slug=slugify(tagname)))
|
||||
db.commit()
|
||||
return {}
|
||||
|
||||
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
@cherrypy.popargs("uuid")
|
||||
class JobsApiV1(object):
|
||||
def __init__(self, library):
|
||||
self.library = library
|
||||
|
||||
def GET(self, uuid=None):
|
||||
"""
|
||||
show the list of jobs or the specified job
|
||||
"""
|
||||
return "jobs get " + str(uuid)
|
||||
|
||||
def POST(self):
|
||||
"""
|
||||
create a new job or update an existing one
|
||||
|
||||
JSON body should look like:
|
||||
|
||||
{
|
||||
"name": "blah",
|
||||
"targets": [
|
||||
{
|
||||
"type": "photo",
|
||||
"ids": [
|
||||
1,
|
||||
2,
|
||||
3
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
POSTing without the job_args field will tell you the list of job args you need to submit
|
||||
POSTing with job_args will actually create the job
|
||||
"""
|
||||
|
||||
body = json.loads(cherrypy.request.body.read().decode()) # TODO max size
|
||||
|
||||
# translate target UUIDs to IDs
|
||||
target_ids = []
|
||||
for target_number, target in enumerate(body["targets"]):
|
||||
typ = JobTargetType[target["type"]]
|
||||
|
||||
if typ == JobTargetType.photo:
|
||||
query = db.query(Photo.id).filter(Photo.uuid.in_(target["uuids"]))
|
||||
elif typ == JobTargetType.photoset:
|
||||
query = db.query(PhotoSet.id).filter(PhotoSet.uuid.in_(target["uuids"]))
|
||||
elif typ == JobTargetType.tag:
|
||||
query = db.query(Tag.id).filter(Tag.uuid.in_(target["uuids"]))
|
||||
else:
|
||||
raise Exception()
|
||||
|
||||
ids = [r[0] for r in query.all()]
|
||||
|
||||
if len(target["uuids"]) != len(ids): # TODO would be nice if we would say exactly which
|
||||
raise cherrypy.HTTPError(400, "missing or duplicate UUIDs in target {}".format(target_number))
|
||||
|
||||
target_ids.append(dict(
|
||||
type=JobTargetType[target["type"]],
|
||||
targets=[r[0] for r in query.all()]
|
||||
))
|
||||
|
||||
# output the job's UUID
|
||||
return cherrypy.engine.publish("create-job", body["name"], target_ids)[0]
|
||||
|
||||
def DELETE(self, uuid):
|
||||
"""
|
||||
delete an existing job
|
||||
"""
|
||||
return "jobs delete"
|
||||
|
|
|
@ -9,8 +9,8 @@ from collections import defaultdict
|
|||
from urllib.parse import urlparse
|
||||
from datetime import datetime, timedelta
|
||||
from photoapp.thumbtool import ThumbGenerator
|
||||
from photoapp.types import APPROOT, Photo, PhotoSet, Tag, TagItem, PhotoStatus, User, mime2ext, \
|
||||
regular_mimes, video_mimes, ftypes
|
||||
from photoapp.types import APPROOT, Photo, PhotoSet, Tag, TagItem, PhotoStatus, User, Job, JobTarget, JobTargetStatus, \
|
||||
JobTargetState, mime2ext, regular_mimes, video_mimes, ftypes
|
||||
from photoapp.dbsession import DatabaseSession
|
||||
from photoapp.common import pwhash
|
||||
from photoapp.api import PhotosApi, LibraryManager
|
||||
|
@ -18,6 +18,7 @@ from photoapp.dbutils import SAEnginePlugin, SATool, db, get_db_engine, date_for
|
|||
from photoapp.utils import auth, require_auth, photoset_auth_filter, slugify, cherryparam, number_format
|
||||
from photoapp.storage import uri_to_storage
|
||||
from photoapp.webutils import validate_password, serve_thumbnail_placeholder
|
||||
from photoapp.jobs import JobsClient, JobSubscriber, ThreadedJobServer
|
||||
from jinja2 import Environment, FileSystemLoader, select_autoescape
|
||||
from sqlalchemy import desc, asc, func, and_, or_
|
||||
|
||||
|
@ -44,6 +45,7 @@ class PhotosWeb(object):
|
|||
self.tag = TagView(self)
|
||||
self.album = self.tag
|
||||
self.search = SearchView(self)
|
||||
self.jobs = JobsView(self)
|
||||
|
||||
def render(self, template, **kwargs):
|
||||
"""
|
||||
|
@ -619,25 +621,72 @@ class SearchView(object):
|
|||
)
|
||||
|
||||
|
||||
def setup_webapp(database_url, library_url, cache_url, thumb_service_url, debug=False, max_upload=1024**3):
|
||||
@cherrypy.popargs('uuid')
|
||||
class JobsView(object):
|
||||
def __init__(self, master):
|
||||
self.master = master
|
||||
|
||||
# Get database connection
|
||||
engine = get_db_engine(database_url)
|
||||
@cherrypy.expose
|
||||
def index(self, uuid=None):
|
||||
if uuid is None:
|
||||
return self.index_alljobs()
|
||||
else:
|
||||
return self.index_onejob(uuid)
|
||||
|
||||
def index_alljobs(self):
|
||||
jobs = db.query(Job) \
|
||||
.order_by(Job.created.desc()) \
|
||||
.all()
|
||||
|
||||
yield self.master.render("jobs.html", jobs=jobs)
|
||||
|
||||
def index_onejob(self, uuid):
|
||||
job = db.query(Job).filter(Job.uuid == uuid).first()
|
||||
if not job:
|
||||
raise cherrypy.HTTPError(404)
|
||||
|
||||
statuses = db.query(Photo, PhotoSet, JobTargetStatus, JobTarget) \
|
||||
.join(PhotoSet) \
|
||||
.join(JobTargetStatus) \
|
||||
.join(JobTarget) \
|
||||
.filter(JobTarget.job_id == job.id) \
|
||||
.all()
|
||||
|
||||
status_totals = {i: 0 for i in JobTargetState}
|
||||
for state, count in db.query(JobTargetStatus.status, func.count(JobTargetStatus.status)) \
|
||||
.filter(JobTargetStatus.job_id == job.id) \
|
||||
.group_by(JobTargetStatus.status) \
|
||||
.all():
|
||||
status_totals[state] = count
|
||||
|
||||
yield self.master.render("job.html", job=job, statuses=statuses, status_totals=status_totals)
|
||||
|
||||
|
||||
def setup_webapp(
|
||||
database_engine,
|
||||
library_client,
|
||||
cache_client,
|
||||
thumb_service_url,
|
||||
debug=False,
|
||||
db_debug=False,
|
||||
max_upload=1024**3
|
||||
):
|
||||
# Setup database in web framework
|
||||
cherrypy.tools.db = SATool()
|
||||
SAEnginePlugin(cherrypy.engine, engine).subscribe()
|
||||
SAEnginePlugin(cherrypy.engine, database_engine).subscribe()
|
||||
|
||||
cherrypy.tools.user = UserTool()
|
||||
|
||||
# Create various internal tools
|
||||
library_storage = uri_to_storage(library_url)
|
||||
library_manager = LibraryManager(library_storage)
|
||||
thumbnail_tool = ThumbGenerator(library_manager, uri_to_storage(cache_url), thumb_service_url)
|
||||
#TODO move thumbtool outside of setup_webapp and use the cherrypy bus to connect with it
|
||||
thumbnail_tool = ThumbGenerator(library_client, cache_client, thumb_service_url)
|
||||
|
||||
# Setup and mount web ui
|
||||
tpl_dir = os.path.join(APPROOT, "templates") if not debug else "templates"
|
||||
web = PhotosWeb(library_manager, thumbnail_tool, tpl_dir)
|
||||
web = PhotosWeb(library_client, thumbnail_tool, tpl_dir)
|
||||
cherrypy.tree.mount(web, '/', {'/': {'tools.trailing_slash.on': False,
|
||||
'tools.db.on': True,
|
||||
'tools.user.on': True,
|
||||
'error_page.403': web.error,
|
||||
'error_page.404': web.error},
|
||||
'/static': {"tools.staticdir.on": True,
|
||||
|
@ -650,7 +699,7 @@ def setup_webapp(database_url, library_url, cache_url, thumb_service_url, debug=
|
|||
'tools.auth_basic.checkpassword': validate_password}})
|
||||
|
||||
# Setup and mount API
|
||||
api = PhotosApi(library_manager)
|
||||
api = PhotosApi(library_client)
|
||||
cherrypy.tree.mount(api, '/api', {'/': {'tools.sessions.on': False,
|
||||
'tools.trailing_slash.on': False,
|
||||
'tools.auth_basic.on': True,
|
||||
|
@ -684,6 +733,7 @@ def main():
|
|||
parser.add_argument('-s', '--database', help="sqlalchemy database connection uri",
|
||||
default=os.environ.get("DATABASE_URL")),
|
||||
parser.add_argument('--debug', action="store_true", help="enable development options")
|
||||
parser.add_argument('--db-debug', action="store_true", help="print sql statements")
|
||||
|
||||
tunables = parser.add_argument_group(title="tunables")
|
||||
tunables.add_argument('--max-upload', help="maximum file upload size accepted in bytes",
|
||||
|
@ -706,12 +756,26 @@ def main():
|
|||
if not args.thumb_service:
|
||||
logging.warning("THUMB_SERVICE_URL not set. Video thumbnails will be unavailable")
|
||||
|
||||
# Get database connections
|
||||
library_storage = uri_to_storage(args.library)
|
||||
library_manager = LibraryManager(library_storage)
|
||||
cache_manager = uri_to_storage(args.cache)
|
||||
|
||||
# set up jobs system - this is temporary, this will be something pluggable later
|
||||
jobs_db_engine = get_db_engine(args.database, debug=args.db_debug)
|
||||
jobs_server = ThreadedJobServer(jobs_db_engine)
|
||||
jobs_server.run_background()
|
||||
jobs_client = JobsClient(jobs_db_engine, notifier=jobs_server.notify)
|
||||
JobSubscriber(jobs_client)
|
||||
|
||||
# set up webapp
|
||||
setup_webapp(
|
||||
args.database,
|
||||
args.library,
|
||||
args.cache,
|
||||
get_db_engine(args.database, debug=args.db_debug),
|
||||
library_manager,
|
||||
cache_manager,
|
||||
args.thumb_service,
|
||||
debug=args.debug,
|
||||
db_debug=args.db_debug,
|
||||
max_upload=args.max_upload
|
||||
)
|
||||
|
||||
|
|
|
@ -1,11 +1,14 @@
|
|||
from contextlib import closing
|
||||
import sqlalchemy
|
||||
import cherrypy
|
||||
from cherrypy.process import plugins
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.pool import NullPool
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.orm.session import Session
|
||||
from sqlalchemy import func
|
||||
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
|
@ -29,7 +32,7 @@ def get_db_engine(uri, debug=False):
|
|||
|
||||
|
||||
def create_db_sessionmaker(engine):
|
||||
session = sessionmaker()
|
||||
session = sqlalchemy.orm.scoped_session(sessionmaker(autoflush=True, autocommit=False))
|
||||
session.configure(bind=engine)
|
||||
return session
|
||||
|
||||
|
@ -95,8 +98,7 @@ class SATool(cherrypy.Tool):
|
|||
self.bind_session,
|
||||
priority=49) # slightly earlier than Sessions tool, which is 50 or 60
|
||||
|
||||
self.session = sqlalchemy.orm.scoped_session(
|
||||
sqlalchemy.orm.sessionmaker(autoflush=True, autocommit=False))
|
||||
self.session = sqlalchemy.orm.scoped_session(sessionmaker(autoflush=True, autocommit=False))
|
||||
|
||||
def _setup(self):
|
||||
cherrypy.Tool._setup(self)
|
||||
|
@ -115,3 +117,20 @@ class SATool(cherrypy.Tool):
|
|||
raise
|
||||
finally:
|
||||
self.session.remove()
|
||||
|
||||
|
||||
def cursorwrap(func):
|
||||
"""
|
||||
Provides a cursor to the wrapped method as the first arg. This assumes that the wrapped function belongs to an
|
||||
object because the cursor is sourced from the object's session attribute which is assumed to be a
|
||||
sessionmaker callable.
|
||||
"""
|
||||
def wrapped(*args, **kwargs):
|
||||
self = args[0]
|
||||
# passthru if someone already passed a session
|
||||
if len(args) >= 2 and isinstance(args[1], (Session, sqlalchemy.orm.scoping.scoped_session, DbAlias)):
|
||||
return func(*args, **kwargs)
|
||||
else:
|
||||
with closing(self.session()) as c:
|
||||
return func(self, c, *args[1:], **kwargs)
|
||||
return wrapped
|
||||
|
|
|
@ -0,0 +1,308 @@
|
|||
import queue
|
||||
import logging
|
||||
import traceback
|
||||
import cherrypy
|
||||
from threading import Thread
|
||||
from photoapp.dbutils import create_db_sessionmaker, cursorwrap
|
||||
from photoapp.types import Job, JobTargetType, JobTarget, JobStatus, JobTargetStatus, JobTargetState, Photo, PhotoSet, \
|
||||
Tag, TagItem
|
||||
from sqlalchemy import func
|
||||
|
||||
|
||||
logger = logging.getLogger("jobs")
|
||||
|
||||
|
||||
"""
|
||||
How the job system works:
|
||||
|
||||
|
||||
+--------------------+ +-------------------+
|
||||
| other applications | notify | Job server |
|
||||
| ==============>> |
|
||||
| | http/queue | |
|
||||
+-------------||-----+ +--------||---------s+
|
||||
|| ____________ ||
|
||||
insert || / Database \\ || check db for work (poll/on notify)
|
||||
job, || | | || manage job statuses
|
||||
job targets **========> <========** lock the row
|
||||
\\___________/ execute the work
|
||||
update and unlock the row
|
||||
|
||||
|
||||
job: server-side task that can process one or many photos
|
||||
|
||||
job target: identifies items the job is expected to be ran against. Can target an individual photo, a set, or all within
|
||||
a tag.
|
||||
|
||||
job statuses: the status of one photo targeted by the job. Jobs are executed on a per-photo bases, regardless of how
|
||||
they were originally targeted with the above.
|
||||
|
||||
other applications: any entity that can submit jobs. Needs database access and access to the internal network for
|
||||
notify. The http api server is an example.
|
||||
|
||||
job server: the class defined in this file. the innermost public interface for managing jobs.
|
||||
|
||||
notify: mechanism to notify the jobserver of work. If the job server runs in-process, this is a python queue that is
|
||||
"notified" by pushing an object into it. If the job server runs as a stand-alone process, this is an http endpoint.
|
||||
in either case, JobsClient is the client object to this
|
||||
"""
|
||||
|
||||
|
||||
class JobSubscriber(object):
|
||||
"""
|
||||
adapter between cherrypy bus and JobsClient
|
||||
"""
|
||||
def __init__(self, client):
|
||||
self.client = client
|
||||
cherrypy.engine.subscribe("create-job", self.create_job) # TODO make "create-job" a const somewhere?
|
||||
|
||||
def create_job(self, name, targets):
|
||||
uuid = self.client.create_job(name, targets)
|
||||
self.client.notify_server(uuid)
|
||||
return uuid
|
||||
|
||||
|
||||
class JobsClient(object):
|
||||
def __init__(self, dbengine, notifier):
|
||||
self.engine = dbengine
|
||||
self.session = create_db_sessionmaker(self.engine)
|
||||
self.notifier = notifier
|
||||
|
||||
@cursorwrap
|
||||
def create_job(self, c, name, targets):
|
||||
"""
|
||||
create a new job in the database
|
||||
|
||||
targets: list of dict(
|
||||
type=JobTargetType.TYPE,
|
||||
targets=[1, 2, 3]
|
||||
)
|
||||
"""
|
||||
job_targets = []
|
||||
|
||||
for target in targets:
|
||||
for target_id in target["targets"]:
|
||||
job_targets.append(
|
||||
JobTarget(target_type=target["type"],
|
||||
target=target_id)
|
||||
)
|
||||
|
||||
j = Job(
|
||||
job_name=name,
|
||||
targets=job_targets,
|
||||
)
|
||||
|
||||
c.add(j)
|
||||
c.commit()
|
||||
|
||||
return j.uuid
|
||||
|
||||
def notify_server(self, uuid):
|
||||
"""
|
||||
notify the job execution server of a job that needs processing because it had been added/updated
|
||||
"""
|
||||
self.notifier(uuid)
|
||||
|
||||
|
||||
def run_task(dbsession, task, job, job_status):
|
||||
try:
|
||||
task(dbsession, job, job_status)
|
||||
return True
|
||||
except:
|
||||
#TODO save the job error in the database somewhere?
|
||||
traceback.print_exc()
|
||||
return False
|
||||
|
||||
|
||||
from random import randint
|
||||
from time import sleep
|
||||
|
||||
|
||||
def job_noop(dbsession, job, job_status):
|
||||
"""
|
||||
job for testing the ui/api. Takes a few seconds to do nothing, and may pass or fail
|
||||
"""
|
||||
logger.info("job_noop: this example job does nothing")
|
||||
logger.info("job_noop: target photo: %s", job_status.photo_id)
|
||||
|
||||
fail = randint(0, 1) == 0
|
||||
sleep(5)
|
||||
|
||||
logger.info("job_noop: this time we %s", "fail" if fail else "succeed")
|
||||
|
||||
if fail:
|
||||
raise Exception("lol")
|
||||
|
||||
|
||||
"""
|
||||
this is the list of job actions the system supports. The dict's keys match with the database Job objs' `job_name` column
|
||||
"""
|
||||
task_definitions = {
|
||||
"noop": job_noop
|
||||
}
|
||||
|
||||
|
||||
class BaseJobServer(object):
|
||||
#TODO subclass this out in way that supports sqlite - below implementation's logic relies on with_for_update()
|
||||
pass
|
||||
|
||||
|
||||
class JobServer(BaseJobServer):
|
||||
def __init__(self, dbengine):
|
||||
"""
|
||||
job executor service
|
||||
"""
|
||||
self.engine = dbengine
|
||||
self.session = create_db_sessionmaker(self.engine)
|
||||
|
||||
def notify(self, job_uuid):
|
||||
"""
|
||||
notify the jobserver that a job may need to be processed. The job should be processed according to status
|
||||
as follows:
|
||||
JobStatus.paused: do nothing
|
||||
JobStatus.done: do nothing
|
||||
JobStatus.error: do nothing
|
||||
JobStatus.ready: change to running state, create JobTargetStatus rows, and begin processing
|
||||
JobStatus.running: continue processing.
|
||||
|
||||
calls to this function should not block, it is meant to be called, for example behind an api call in an http api
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class ThreadedJobServer(JobServer):
|
||||
def __init__(self, dbengine):
|
||||
"""
|
||||
a simple version of the jobserver that runs as a single thread
|
||||
|
||||
TODO:
|
||||
- does not set jobs to Completed status if we never process work for it
|
||||
"""
|
||||
super().__init__(dbengine)
|
||||
self.notifyq = queue.Queue()
|
||||
self.work_notifyq = queue.Queue()
|
||||
|
||||
def run_background(self):
|
||||
# run_notifier handles incoming notifications of jobs. When we are notified of a job, we prepare it for running
|
||||
Thread(target=self.run_notifier, daemon=True).start()
|
||||
# run_work_queue periodically searches the database for new work, and starts running it when found
|
||||
Thread(target=self.run_work_queue, daemon=True).start()
|
||||
|
||||
def notify(self, job_uuid):
|
||||
self.notifyq.put(job_uuid)
|
||||
|
||||
def run_notifier(self):
|
||||
while True:
|
||||
try:
|
||||
job_uuid = self.notifyq.get(timeout=30)
|
||||
self.handle_notify(job_uuid)
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
@cursorwrap
|
||||
def handle_notify(self, c, job_uuid):
|
||||
# query the db for the target job, and lock it
|
||||
job = c.query(Job).filter(Job.uuid == job_uuid).first()
|
||||
|
||||
if job.status not in (JobStatus.ready, JobStatus.running, ):
|
||||
# we don't care about jobs that are already done
|
||||
return
|
||||
|
||||
if job.status == JobStatus.ready:
|
||||
# check if JobTargetStatus has been populated for this job
|
||||
statuses_count = c.query(func.count(JobTargetStatus.id)). \
|
||||
join(JobTarget).filter(JobTarget.job_id == job.id).first()[0]
|
||||
|
||||
if statuses_count == 0:
|
||||
for status in self.create_job_statuses(c, job):
|
||||
c.add(status)
|
||||
|
||||
job.status = JobStatus.running
|
||||
|
||||
# if this commit fails due to key constraints, somebody else is doing what we're doing in parallel
|
||||
# so, we'll just crash here
|
||||
c.commit()
|
||||
|
||||
# notify the work queue
|
||||
self.work_notifyq.put(None)
|
||||
|
||||
def create_job_statuses(self, c, job):
|
||||
"""
|
||||
populate job statuses, which also serve as the queue of items to work on
|
||||
"""
|
||||
logger.info("preparing statuses for job %s/%s", job.id, job.uuid)
|
||||
photo_ids = set()
|
||||
|
||||
for target in c.query(JobTarget).filter(JobTarget.job_id == job.id).all():
|
||||
if target.target_type == JobTargetType.photo:
|
||||
photo_ids.update([target.target])
|
||||
elif target.target_type == JobTargetType.photoset:
|
||||
for photo in c.query(Photo.id).filter(Photo.set_id == target.target).all():
|
||||
photo_ids.update([photo[0]])
|
||||
elif target.target_type == JobTargetType.tag:
|
||||
for photo in c.query(Photo.id). \
|
||||
join(PhotoSet).join(TagItem).join(Tag). \
|
||||
filter(Tag.id == target.target). \
|
||||
all():
|
||||
photo_ids.update([photo[0]])
|
||||
|
||||
return [
|
||||
JobTargetStatus(target_id=target.id, job_id=job.id, photo_id=photo_id)
|
||||
for photo_id in photo_ids
|
||||
]
|
||||
|
||||
def run_work_queue(self):
|
||||
while True:
|
||||
try:
|
||||
# spin on search_work(), which returns true if we found and executed work
|
||||
if not self.search_work():
|
||||
self.work_notifyq.get(timeout=30)
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
@cursorwrap
|
||||
def search_work(self, c):
|
||||
# - we "select from JobTargetStatus" to find available work, (and lock it here)
|
||||
# - if work is found, it is executed immediately
|
||||
# - commit (the row is now unlocked)
|
||||
# - if we query and find that there is no more todo/pending work for this job, we mark the job done
|
||||
|
||||
available = c.query(JobTargetStatus, Job) \
|
||||
.join(Job) \
|
||||
.filter(Job.status.in_((JobStatus.ready, JobStatus.running, ))) \
|
||||
.filter(JobTargetStatus.status == JobTargetState.new) \
|
||||
.order_by(func.rand()) \
|
||||
.limit(1) \
|
||||
.with_for_update(skip_locked=True) \
|
||||
.first()
|
||||
|
||||
if not available:
|
||||
return False
|
||||
|
||||
job_status, job = available
|
||||
|
||||
# get the job function()
|
||||
task = task_definitions[job.job_name]
|
||||
|
||||
# run the task and mark the results
|
||||
if run_task(c, task, job, job_status):
|
||||
job_status.status = JobTargetState.complete
|
||||
else:
|
||||
job_status.status = JobTargetState.error
|
||||
|
||||
# save and unlock
|
||||
c.commit()
|
||||
|
||||
# if there is no more work for the job, mark it done too
|
||||
remaining = c.query(func.count(JobTargetStatus.id)) \
|
||||
.join(Job) \
|
||||
.filter(Job.id == job.id) \
|
||||
.filter(JobTargetStatus.status == JobTargetState.new) \
|
||||
.order_by(func.rand()) \
|
||||
.first()
|
||||
|
||||
if remaining[0] == 0:
|
||||
job.status = JobStatus.done
|
||||
c.commit()
|
||||
|
||||
return True
|
|
@ -158,6 +158,8 @@ class Photo(Base):
|
|||
format = Column(String(length=32)) # TODO how long can a mime string be
|
||||
fname = Column(String(length=64)) # seems generous enough
|
||||
|
||||
job_statuses = relationship("JobTargetStatus", back_populates="photo")
|
||||
|
||||
def to_json(self):
|
||||
j = {attr: getattr(self, attr) for attr in
|
||||
{"uuid", "size", "width", "height", "orientation", "format", "hash", "fname"}}
|
||||
|
@ -220,3 +222,85 @@ class User(Base):
|
|||
|
||||
def to_json(self):
|
||||
return dict(name=self.name, status=self.status.name)
|
||||
|
||||
|
||||
class JobStatus(enum.Enum):
|
||||
paused = 0
|
||||
ready = 1
|
||||
running = 2
|
||||
done = 3
|
||||
error = 4
|
||||
|
||||
|
||||
class Job(Base):
|
||||
__tablename__ = 'jobs'
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
uuid = Column(Unicode(length=36), unique=True, nullable=False, default=lambda: str(uuid.uuid4()))
|
||||
created = Column(DateTime, nullable=False, default=lambda: datetime.now())
|
||||
|
||||
job_name = Column(String(length=64), unique=True, nullable=False)
|
||||
status = Column(Enum(JobStatus), nullable=False, default=JobStatus.paused)
|
||||
|
||||
targets = relationship("JobTarget", back_populates="job")
|
||||
target_statuses = relationship("JobTargetStatus", back_populates="job")
|
||||
|
||||
def to_json(self):
|
||||
return {attr: getattr(self, attr) for attr in
|
||||
{"uuid", "created", "job_name", "status"}}
|
||||
|
||||
|
||||
class JobTargetType(enum.Enum):
|
||||
photo = 0
|
||||
photoset = 1
|
||||
tag = 2
|
||||
|
||||
|
||||
class JobTarget(Base):
|
||||
__tablename__ = 'job_targets'
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
job_id = Column(Integer, ForeignKey("jobs.id"), nullable=False)
|
||||
job = relationship("Job", back_populates="targets", foreign_keys=[job_id])
|
||||
target_type = Column(Enum(JobTargetType), nullable=False)
|
||||
target = Column(Integer, nullable=False) #TODO should we refactor this to multiple columns with null and proper constraints?
|
||||
|
||||
statuses = relationship("JobTargetStatus", back_populates="job_target")
|
||||
|
||||
def to_json(self):
|
||||
return {attr: getattr(self, attr) for attr in
|
||||
{"target_type", "target"}}
|
||||
|
||||
|
||||
class JobTargetState(enum.Enum):
|
||||
new = 0
|
||||
running = 1
|
||||
complete = 2
|
||||
error = 3
|
||||
cancelled = 4
|
||||
|
||||
|
||||
class JobTargetStatus(Base):
|
||||
__tablename__ = 'job_target_status'
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
job_id = Column(Integer, ForeignKey("jobs.id"), nullable=False)
|
||||
job = relationship("Job", back_populates="target_statuses", foreign_keys=[job_id])
|
||||
|
||||
target_id = Column(Integer, ForeignKey("job_targets.id"), nullable=False)
|
||||
job_target = relationship("JobTarget", back_populates="statuses", foreign_keys=[target_id])
|
||||
|
||||
"""
|
||||
all jobs are progressed by photo_id. PhotoSets can have many Photos so this is the best logical unit. How this works
|
||||
for the different `JobTargetType`s is broken down as follows:
|
||||
- photo: (obvious)
|
||||
- photoset: the job planner is given the opportunity to create JobTargetStatus for zero-all Photos under
|
||||
a PhotoSet
|
||||
- tag: same as photoset but iterated across all photosets with the tag
|
||||
"""
|
||||
photo_id = Column(Integer, ForeignKey("files.id"), nullable=False)
|
||||
photo = relationship(Photo, back_populates="job_statuses", foreign_keys=[photo_id])
|
||||
|
||||
status = Column(Enum(JobTargetState), nullable=False, default=JobTargetState.new)
|
||||
|
||||
UniqueConstraint(job_id, target_id, photo_id)
|
||||
|
|
|
@ -24,6 +24,15 @@ def delete_user(s, username):
|
|||
print("Deleted user {}".format(u.id))
|
||||
|
||||
|
||||
def update_password(s, username, password):
|
||||
u = s.query(User).filter(User.name == username).first()
|
||||
if not u:
|
||||
raise Exception("user doesn't exist")
|
||||
u.password = pwhash(password)
|
||||
s.commit()
|
||||
print("Updated password for user {}".format(u.id))
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="User manipulation tool")
|
||||
parser.add_argument('-s', '--database', help="sqlalchemy database connection uri",
|
||||
|
@ -40,7 +49,9 @@ def main():
|
|||
p_delete = p_mode.add_parser('delete', help='delete users')
|
||||
p_delete.add_argument("-u", "--username", help="username", required=True)
|
||||
|
||||
#TODO add mode to change password
|
||||
p_pass = p_mode.add_parser('changepassword', help='change user\'s password')
|
||||
p_pass.add_argument("-u", "--username", help="username", required=True)
|
||||
p_pass.add_argument("-p", "--password", help="password", required=True)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
|
@ -55,6 +66,8 @@ def main():
|
|||
list_users(session)
|
||||
elif args.action == "delete":
|
||||
delete_user(session, args.username)
|
||||
elif args.action == "changepassword":
|
||||
update_password(session, args.username, args.password)
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
|
|
|
@ -412,3 +412,50 @@ span.coords {
|
|||
text-align: center;
|
||||
display: block;
|
||||
}
|
||||
|
||||
|
||||
/* progress bars */
|
||||
.progress {
|
||||
display: flex;
|
||||
height: 40px;
|
||||
overflow: hidden;
|
||||
border: 0.5px solid #222;
|
||||
border-radius: 10px;
|
||||
|
||||
text-align: center;
|
||||
background: #666;
|
||||
color: #fff;
|
||||
|
||||
&>div {
|
||||
flex-grow: 1;
|
||||
display: flex;
|
||||
align-items: center;
|
||||
padding: 0px 0px 0px 15px;
|
||||
|
||||
a {
|
||||
color: #fff;
|
||||
}
|
||||
|
||||
&.new {
|
||||
|
||||
}
|
||||
&.running {
|
||||
background-color: #993;
|
||||
}
|
||||
&.complete {
|
||||
background-color: #397;
|
||||
}
|
||||
&.error {
|
||||
background-color: #933;
|
||||
}
|
||||
&.cancelled {
|
||||
background-color: #939;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// .progress .step:not(:last-child) {
|
||||
// border-right: 1px solid rgba(0,0,0,0.8);
|
||||
// }
|
||||
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
{% set title = job.uuid %}
|
||||
{% extends "page.html" %}
|
||||
{% block title %}{{ job.uuid }}{% endblock %}
|
||||
{% block subtitle %}{{ job.status.name | title }}{% endblock %}
|
||||
|
||||
{% block buttons %}
|
||||
<a href="#"><button class="secondary-button pure-button">Pause</button></a>
|
||||
<a href="#"><button class="secondary-button pure-button">Cancel</button></a>
|
||||
<a href="/jobs"><button class="secondary-button pure-button">All Jobs</button></a>
|
||||
{% endblock %}
|
||||
|
||||
{% block head %}
|
||||
lol head
|
||||
{% endblock %}
|
||||
|
||||
{% block body %}
|
||||
{{ job }}<br>
|
||||
|
||||
<h2>Progress</h2>
|
||||
|
||||
<table>
|
||||
{% set ns = namespace(sum=0) %}
|
||||
{% for status, count in status_totals.items(): %}
|
||||
<tr>
|
||||
{%- set ns.sum = ns.sum + count %}
|
||||
<td>{{ status }}</td>
|
||||
<td>{{ count }}</td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
</table>
|
||||
totalcount: {{ ns.sum }}<br>
|
||||
|
||||
|
||||
<div class="progress">
|
||||
{% for status, count in status_totals.items(): %}
|
||||
{% if count > 0 %}
|
||||
<div style="flex-grow:{{ count }}" class="{{ status.name }}">
|
||||
<a href="#targets-{{ status.name }}">{{ status.name }}: {{ count }}</a>
|
||||
</div>
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
</div>
|
||||
|
||||
<h2>Targets</h2>
|
||||
|
||||
Total targets: {{ status_totals.values() | sum }}<br>
|
||||
|
||||
Table of target -> how many photos it selected
|
||||
|
||||
<h2>Target photos</h2>
|
||||
|
||||
{% for status, count in status_totals.items(): %}
|
||||
{% if count > 0 %}
|
||||
<a name="targets-{{ status.name }}"></a>
|
||||
<h3>{{ status.name | title }}</h3>
|
||||
|
||||
<ul>
|
||||
{% for photo, photoset, target_status, target in statuses %}
|
||||
{% if target_status.status == status %}
|
||||
<li>
|
||||
target: {{ target.id }}<br>
|
||||
target_status: {{ target_status.id }}<br>
|
||||
photo: <a href="/thumb/one/big/{{ photo.uuid }}.jpg">{{ photo.uuid }}</a><br />
|
||||
set: <a href="/photo/{{ photoset.uuid }}">{{ photoset.uuid }}</a><br />
|
||||
status: {{ target_status.status }}
|
||||
</li>
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
</ul>
|
||||
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
|
||||
{% endblock %}
|
|
@ -0,0 +1,25 @@
|
|||
{% set title = "title todo" %}
|
||||
{% extends "page.html" %}
|
||||
{% block title %}Jobs{% endblock %}
|
||||
{% block subtitle %}subtitle todo{% endblock %}
|
||||
|
||||
{% block buttons %}
|
||||
<a href="/jobs"><button class="secondary-button pure-button">New</button></a>
|
||||
{% endblock %}
|
||||
|
||||
{% block head %}
|
||||
lol head
|
||||
{% endblock %}
|
||||
|
||||
{% block body %}
|
||||
|
||||
<ul>
|
||||
{% for job in jobs %}
|
||||
<li>
|
||||
uuid: <a href="jobs/{{ job.uuid }}">{{ job.uuid }}</a><br />
|
||||
status: {{ job.status }}
|
||||
</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
|
||||
{% endblock %}
|
|
@ -18,6 +18,7 @@
|
|||
<li class="pure-menu-item"><a href="/search" class="pure-menu-link">Search</a></li>
|
||||
<li class="pure-menu-item"><a href="/date" class="pure-menu-link">Dates</a></li>
|
||||
<li class="pure-menu-item"><a href="/map" class="pure-menu-link">Map</a></li>
|
||||
<li class="pure-menu-item"><a href="/jobs" class="pure-menu-link">Jobs</a></li>
|
||||
<li class="pure-menu-item"><a href="/stats" class="pure-menu-link">Stats</a></li>
|
||||
<li class="pure-menu-heading">Albums</li>
|
||||
{% for tag in all_tags %}{% if tag.is_album %}
|
||||
|
|
|
@ -6,6 +6,9 @@ import cherrypy
|
|||
from cherrypy.test import helper
|
||||
|
||||
from photoapp.daemon import setup_webapp
|
||||
from photoapp.dbutils import get_db_engine
|
||||
from photoapp.api import LibraryManager
|
||||
from photoapp.storage import uri_to_storage
|
||||
|
||||
|
||||
class MockAuth:
|
||||
|
@ -24,9 +27,9 @@ class PhotolibTest(helper.CPWebCase):
|
|||
os.mkdir(cached)
|
||||
|
||||
setup_webapp(
|
||||
"sqlite:///{}".format(os.path.join(cls.tmpd.name, "testing.db")),
|
||||
"file://{}".format(libd),
|
||||
"file://{}".format(cached),
|
||||
get_db_engine("sqlite:///{}".format(os.path.join(cls.tmpd.name, "testing.db"))),
|
||||
LibraryManager(uri_to_storage("file://{}".format(libd))),
|
||||
uri_to_storage("file://{}".format(cached)),
|
||||
None,
|
||||
debug=True,
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue