309 lines
10 KiB
Python
309 lines
10 KiB
Python
import queue
|
|
import logging
|
|
import traceback
|
|
import cherrypy
|
|
from threading import Thread
|
|
from photoapp.dbutils import create_db_sessionmaker, cursorwrap
|
|
from photoapp.types import Job, JobTargetType, JobTarget, JobStatus, JobTargetStatus, JobTargetState, Photo, PhotoSet, \
|
|
Tag, TagItem
|
|
from sqlalchemy import func
|
|
|
|
|
|
logger = logging.getLogger("jobs")
|
|
|
|
|
|
"""
|
|
How the job system works:
|
|
|
|
|
|
+--------------------+ +-------------------+
|
|
| other applications | notify | Job server |
|
|
| ==============>> |
|
|
| | http/queue | |
|
|
+-------------||-----+ +--------||---------s+
|
|
|| ____________ ||
|
|
insert || / Database \\ || check db for work (poll/on notify)
|
|
job, || | | || manage job statuses
|
|
job targets **========> <========** lock the row
|
|
\\___________/ execute the work
|
|
update and unlock the row
|
|
|
|
|
|
job: server-side task that can process one or many photos
|
|
|
|
job target: identifies items the job is expected to be ran against. Can target an individual photo, a set, or all within
|
|
a tag.
|
|
|
|
job statuses: the status of one photo targeted by the job. Jobs are executed on a per-photo bases, regardless of how
|
|
they were originally targeted with the above.
|
|
|
|
other applications: any entity that can submit jobs. Needs database access and access to the internal network for
|
|
notify. The http api server is an example.
|
|
|
|
job server: the class defined in this file. the innermost public interface for managing jobs.
|
|
|
|
notify: mechanism to notify the jobserver of work. If the job server runs in-process, this is a python queue that is
|
|
"notified" by pushing an object into it. If the job server runs as a stand-alone process, this is an http endpoint.
|
|
in either case, JobsClient is the client object to this
|
|
"""
|
|
|
|
|
|
class JobSubscriber(object):
|
|
"""
|
|
adapter between cherrypy bus and JobsClient
|
|
"""
|
|
def __init__(self, client):
|
|
self.client = client
|
|
cherrypy.engine.subscribe("create-job", self.create_job) # TODO make "create-job" a const somewhere?
|
|
|
|
def create_job(self, name, targets):
|
|
uuid = self.client.create_job(name, targets)
|
|
self.client.notify_server(uuid)
|
|
return uuid
|
|
|
|
|
|
class JobsClient(object):
|
|
def __init__(self, dbengine, notifier):
|
|
self.engine = dbengine
|
|
self.session = create_db_sessionmaker(self.engine)
|
|
self.notifier = notifier
|
|
|
|
@cursorwrap
|
|
def create_job(self, c, name, targets):
|
|
"""
|
|
create a new job in the database
|
|
|
|
targets: list of dict(
|
|
type=JobTargetType.TYPE,
|
|
targets=[1, 2, 3]
|
|
)
|
|
"""
|
|
job_targets = []
|
|
|
|
for target in targets:
|
|
for target_id in target["targets"]:
|
|
job_targets.append(
|
|
JobTarget(target_type=target["type"],
|
|
target=target_id)
|
|
)
|
|
|
|
j = Job(
|
|
job_name=name,
|
|
targets=job_targets,
|
|
)
|
|
|
|
c.add(j)
|
|
c.commit()
|
|
|
|
return j.uuid
|
|
|
|
def notify_server(self, uuid):
|
|
"""
|
|
notify the job execution server of a job that needs processing because it had been added/updated
|
|
"""
|
|
self.notifier(uuid)
|
|
|
|
|
|
def run_task(dbsession, task, job, job_status):
|
|
try:
|
|
task(dbsession, job, job_status)
|
|
return True
|
|
except:
|
|
#TODO save the job error in the database somewhere?
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
|
|
from random import randint
|
|
from time import sleep
|
|
|
|
|
|
def job_noop(dbsession, job, job_status):
|
|
"""
|
|
job for testing the ui/api. Takes a few seconds to do nothing, and may pass or fail
|
|
"""
|
|
logger.info("job_noop: this example job does nothing")
|
|
logger.info("job_noop: target photo: %s", job_status.photo_id)
|
|
|
|
fail = randint(0, 1) == 0
|
|
sleep(5)
|
|
|
|
logger.info("job_noop: this time we %s", "fail" if fail else "succeed")
|
|
|
|
if fail:
|
|
raise Exception("lol")
|
|
|
|
|
|
"""
|
|
this is the list of job actions the system supports. The dict's keys match with the database Job objs' `job_name` column
|
|
"""
|
|
task_definitions = {
|
|
"noop": job_noop
|
|
}
|
|
|
|
|
|
class BaseJobServer(object):
|
|
#TODO subclass this out in way that supports sqlite - below implementation's logic relies on with_for_update()
|
|
pass
|
|
|
|
|
|
class JobServer(BaseJobServer):
|
|
def __init__(self, dbengine):
|
|
"""
|
|
job executor service
|
|
"""
|
|
self.engine = dbengine
|
|
self.session = create_db_sessionmaker(self.engine)
|
|
|
|
def notify(self, job_uuid):
|
|
"""
|
|
notify the jobserver that a job may need to be processed. The job should be processed according to status
|
|
as follows:
|
|
JobStatus.paused: do nothing
|
|
JobStatus.done: do nothing
|
|
JobStatus.error: do nothing
|
|
JobStatus.ready: change to running state, create JobTargetStatus rows, and begin processing
|
|
JobStatus.running: continue processing.
|
|
|
|
calls to this function should not block, it is meant to be called, for example behind an api call in an http api
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
|
|
class ThreadedJobServer(JobServer):
|
|
def __init__(self, dbengine):
|
|
"""
|
|
a simple version of the jobserver that runs as a single thread
|
|
|
|
TODO:
|
|
- does not set jobs to Completed status if we never process work for it
|
|
"""
|
|
super().__init__(dbengine)
|
|
self.notifyq = queue.Queue()
|
|
self.work_notifyq = queue.Queue()
|
|
|
|
def run_background(self):
|
|
# run_notifier handles incoming notifications of jobs. When we are notified of a job, we prepare it for running
|
|
Thread(target=self.run_notifier, daemon=True).start()
|
|
# run_work_queue periodically searches the database for new work, and starts running it when found
|
|
Thread(target=self.run_work_queue, daemon=True).start()
|
|
|
|
def notify(self, job_uuid):
|
|
self.notifyq.put(job_uuid)
|
|
|
|
def run_notifier(self):
|
|
while True:
|
|
try:
|
|
job_uuid = self.notifyq.get(timeout=30)
|
|
self.handle_notify(job_uuid)
|
|
except queue.Empty:
|
|
pass
|
|
|
|
@cursorwrap
|
|
def handle_notify(self, c, job_uuid):
|
|
# query the db for the target job, and lock it
|
|
job = c.query(Job).filter(Job.uuid == job_uuid).first()
|
|
|
|
if job.status not in (JobStatus.ready, JobStatus.running, ):
|
|
# we don't care about jobs that are already done
|
|
return
|
|
|
|
if job.status == JobStatus.ready:
|
|
# check if JobTargetStatus has been populated for this job
|
|
statuses_count = c.query(func.count(JobTargetStatus.id)). \
|
|
join(JobTarget).filter(JobTarget.job_id == job.id).first()[0]
|
|
|
|
if statuses_count == 0:
|
|
for status in self.create_job_statuses(c, job):
|
|
c.add(status)
|
|
|
|
job.status = JobStatus.running
|
|
|
|
# if this commit fails due to key constraints, somebody else is doing what we're doing in parallel
|
|
# so, we'll just crash here
|
|
c.commit()
|
|
|
|
# notify the work queue
|
|
self.work_notifyq.put(None)
|
|
|
|
def create_job_statuses(self, c, job):
|
|
"""
|
|
populate job statuses, which also serve as the queue of items to work on
|
|
"""
|
|
logger.info("preparing statuses for job %s/%s", job.id, job.uuid)
|
|
photo_ids = set()
|
|
|
|
for target in c.query(JobTarget).filter(JobTarget.job_id == job.id).all():
|
|
if target.target_type == JobTargetType.photo:
|
|
photo_ids.update([target.target])
|
|
elif target.target_type == JobTargetType.photoset:
|
|
for photo in c.query(Photo.id).filter(Photo.set_id == target.target).all():
|
|
photo_ids.update([photo[0]])
|
|
elif target.target_type == JobTargetType.tag:
|
|
for photo in c.query(Photo.id). \
|
|
join(PhotoSet).join(TagItem).join(Tag). \
|
|
filter(Tag.id == target.target). \
|
|
all():
|
|
photo_ids.update([photo[0]])
|
|
|
|
return [
|
|
JobTargetStatus(target_id=target.id, job_id=job.id, photo_id=photo_id)
|
|
for photo_id in photo_ids
|
|
]
|
|
|
|
def run_work_queue(self):
|
|
while True:
|
|
try:
|
|
# spin on search_work(), which returns true if we found and executed work
|
|
if not self.search_work():
|
|
self.work_notifyq.get(timeout=30)
|
|
except queue.Empty:
|
|
pass
|
|
|
|
@cursorwrap
|
|
def search_work(self, c):
|
|
# - we "select from JobTargetStatus" to find available work, (and lock it here)
|
|
# - if work is found, it is executed immediately
|
|
# - commit (the row is now unlocked)
|
|
# - if we query and find that there is no more todo/pending work for this job, we mark the job done
|
|
|
|
available = c.query(JobTargetStatus, Job) \
|
|
.join(Job) \
|
|
.filter(Job.status.in_((JobStatus.ready, JobStatus.running, ))) \
|
|
.filter(JobTargetStatus.status == JobTargetState.new) \
|
|
.order_by(func.rand()) \
|
|
.limit(1) \
|
|
.with_for_update(skip_locked=True) \
|
|
.first()
|
|
|
|
if not available:
|
|
return False
|
|
|
|
job_status, job = available
|
|
|
|
# get the job function()
|
|
task = task_definitions[job.job_name]
|
|
|
|
# run the task and mark the results
|
|
if run_task(c, task, job, job_status):
|
|
job_status.status = JobTargetState.complete
|
|
else:
|
|
job_status.status = JobTargetState.error
|
|
|
|
# save and unlock
|
|
c.commit()
|
|
|
|
# if there is no more work for the job, mark it done too
|
|
remaining = c.query(func.count(JobTargetStatus.id)) \
|
|
.join(Job) \
|
|
.filter(Job.id == job.id) \
|
|
.filter(JobTargetStatus.status == JobTargetState.new) \
|
|
.order_by(func.rand()) \
|
|
.first()
|
|
|
|
if remaining[0] == 0:
|
|
job.status = JobStatus.done
|
|
c.commit()
|
|
|
|
return True
|