photolib/photoapp/jobs.py

191 lines
6.0 KiB
Python

import queue
import logging
import cherrypy
from photoapp.dbutils import create_db_sessionmaker, cursorwrap
from photoapp.types import Job, JobTargetType, JobTarget, JobTargetStatus, Photo, PhotoSet, Tag, TagItem
from sqlalchemy import func
logger = logging.getLogger("jobs")
class JobSubscriber(object):
"""
adapter between cherrypy bus and JobsClient
"""
def __init__(self, client):
self.client = client
cherrypy.engine.subscribe("create-job", self.create_job) # TODO make "create-job" a const somewhere?
def create_job(self, name, targets):
uuid = self.client.create_job(name, targets)
self.client.notify_server(uuid)
return uuid
class JobsClient(object):
def __init__(self, dbengine, notifier):
self.engine = dbengine
self.session = create_db_sessionmaker(self.engine)
self.notifier = notifier
@cursorwrap
def create_job(self, c, name, targets):
"""
create a new job in the database
targets: list of dict(
type=JobTargetType.TYPE,
targets=[1, 2, 3]
)
"""
job_targets = []
for target in targets:
for target_id in target["targets"]:
job_targets.append(
JobTarget(target_type=target["type"],
target=target_id)
)
j = Job(
job_name=name,
targets=job_targets,
)
c.add(j)
c.commit()
return j.uuid
def notify_server(self, uuid):
"""
notify the job execution server of a job that needs processing because it had been added/updated
"""
self.notifier(uuid)
class PhotoappTask(object):
def run_targets(self):
raise NotImplementedError()
class LolTask(PhotoappTask):
pass
"""
this is the list of job actions the system supports. The dict's keys match with the database Job objs' `job_name` column
"""
task_definitions = {
"loljob": LolTask
}
class BaseJobServer(object):
#TODO subclass this out in way that supports sqlite - below implementation's logic relies on with_for_update()
pass
class JobServer(BaseJobServer):
"""
job executor service. This object orchestrates job running by doing:
- in any case, locking the job rows via sql
- listening to notifications about new jobs
- populate JobTargetStatus
- looking for JobTargetStatus that can be claimed and worked on
- executing the work
"""
def __init__(self, dbengine):
self.engine = dbengine
self.session = create_db_sessionmaker(self.engine)
def handle_notify(self, c, job_uuid):
"""
notify the jobserver that a job needs to be processed. The job may be in any state, including locked. The job
should be processed as follows:
JobStatus.paused: do nothing
JobStatus.done: do nothing
JobStatus.error: do nothing
JobStatus.ready: change to running state and begin processing
JobStatus.running: begin processing.
processing:
- the job is added to a list of "watched" jobs
- we issue a query looking for work
- if no work is found, the job status is changed to Done (or error if any done work has error status?)
"""
raise NotImplementedError()
class ThreadedJobServer(JobServer):
def __init__(self, dbengine):
"""
a version of the jobserver that runs as a thread
"""
super().__init__(dbengine)
self.notifyq = queue.Queue()
self.work_notifyq = queue.Queue()
@cursorwrap
def handle_notify(self, c, job_uuid):
# query the db for the target job, and lock it
job = c.query(Job).filter(Job.uuid == job_uuid).first()
# get the task we're running
# task = task_definitions[job.job_name]()
# logging.info("task: %s", task)
# check if JobTargetStatus has been populated for this job
statuses = c.query(func.count(JobTargetStatus.id)).join(JobTarget).filter(JobTarget.job_id == job.id).first()[0]
if statuses == 0:
# populate statuses
logging.info("preparing statuses for job %s/%s", job.id, job.uuid)
photo_ids = set()
for target in c.query(JobTarget).filter(JobTarget.job_id == job.id).all():
if target.target_type == JobTargetType.photo:
photo_ids.update([target.target])
elif target.target_type == JobTargetType.photoset:
for photo in c.query(Photo.id).filter(Photo.set_id == target.target).all():
photo_ids.update([photo[0]])
elif target.target_type == JobTargetType.tag:
for photo in c.query(Photo.id). \
join(PhotoSet).join(TagItem).join(Tag). \
filter(Tag.id == target.target). \
all():
photo_ids.update([photo[0]])
for photo_id in photo_ids:
c.add(JobTargetStatus(target_id=target.id, job_id=job.id, photo_id=photo_id))
c.commit() # if this fails, somebody else is handling job
def queue_notify(self, job_uuid):
self.notifyq.put(job_uuid)
def run_background(self):
from threading import Thread
Thread(target=self.run_notifier, daemon=True).start()
Thread(target=self.run_work, daemon=True).start()
def run_notifier(self):
while True:
try:
job_uuid = self.notifyq.get(timeout=5.0)
self.handle_notify(job_uuid)
self.work_notifyq.put(job_uuid)
except queue.Empty:
pass
def run_work(self):
# start a ThreadPoolExecutor
# in a loop:
# - poll the database for work, locking and then marking rows we claim
# - poll self.work_notifyq
# - submit it to the pool
# - wait for pool results
# - update the db with job results
pass