183 lines
5.6 KiB
Python
183 lines
5.6 KiB
Python
import queue
|
|
import logging
|
|
import cherrypy
|
|
from photoapp.dbutils import create_db_sessionmaker, cursorwrap
|
|
from photoapp.types import Job, JobTargetType, JobTarget, JobTargetStatus, Photo, PhotoSet, Tag, TagItem
|
|
from sqlalchemy import func
|
|
|
|
|
|
logger = logging.getLogger("jobs")
|
|
|
|
|
|
class JobSubscriber(object):
|
|
"""
|
|
adapter between cherrypy bus and JobsClient
|
|
"""
|
|
def __init__(self, client):
|
|
self.client = client
|
|
cherrypy.engine.subscribe("create-job", self.create_job) # TODO make "create-job" a const somewhere?
|
|
|
|
def create_job(self, name, targets):
|
|
uuid = self.client.create_job(name, targets)
|
|
self.client.notify_server(uuid)
|
|
return uuid
|
|
|
|
|
|
class JobsClient(object):
|
|
def __init__(self, dbengine, notifier):
|
|
self.engine = dbengine
|
|
self.session = create_db_sessionmaker(self.engine)
|
|
self.notifier = notifier
|
|
|
|
@cursorwrap
|
|
def create_job(self, c, name, targets):
|
|
"""
|
|
create a new job in the database
|
|
|
|
targets: list of dict(
|
|
type=JobTargetType.TYPE,
|
|
targets=[1, 2, 3]
|
|
)
|
|
"""
|
|
job_targets = []
|
|
|
|
for target in targets:
|
|
for target_id in target["targets"]:
|
|
job_targets.append(
|
|
JobTarget(target_type=target["type"],
|
|
target=target_id)
|
|
)
|
|
|
|
j = Job(
|
|
job_name=name,
|
|
targets=job_targets,
|
|
)
|
|
|
|
c.add(j)
|
|
c.commit()
|
|
|
|
return j.uuid
|
|
|
|
def notify_server(self, uuid):
|
|
"""
|
|
notify the job execution server of a job that needs processing because it had been added/updated
|
|
"""
|
|
self.notifier(uuid)
|
|
|
|
|
|
class PhotoappTask(object):
|
|
def run_targets(self):
|
|
raise NotImplementedError()
|
|
|
|
|
|
class LolTask(PhotoappTask):
|
|
pass
|
|
|
|
|
|
"""
|
|
this is the list of job actions the system supports. The dict's keys match with the database Job objs' `job_name` column
|
|
"""
|
|
task_definitions = {
|
|
"loljob": LolTask
|
|
}
|
|
|
|
|
|
class BaseJobServer(object):
|
|
#TODO subclass this out in way that supports sqlite - below implementation's logic relies on with_for_update()
|
|
pass
|
|
|
|
|
|
class JobServer(BaseJobServer):
|
|
"""
|
|
job executor service. This object orchestrates job running by doing:
|
|
- in any case, locking the job rows via sql
|
|
- listening to notifications about new jobs
|
|
- populate JobTargetStatus
|
|
- looking for JobTargetStatus that can be claimed and worked on
|
|
- executing the work
|
|
"""
|
|
def __init__(self, dbengine):
|
|
self.engine = dbengine
|
|
self.session = create_db_sessionmaker(self.engine)
|
|
|
|
@cursorwrap
|
|
def handle_notify(self, c, job_uuid):
|
|
# query the db for the target job, and lock it
|
|
job = c.query(Job).filter(Job.uuid == job_uuid).first()
|
|
|
|
# get the task we're running
|
|
# task = task_definitions[job.job_name]()
|
|
# logging.info("task: %s", task)
|
|
|
|
# check if JobTargetStatus has been populated for this job
|
|
statuses = c.query(func.count(JobTargetStatus.id)).join(JobTarget).filter(JobTarget.job_id == job.id).first()[0]
|
|
|
|
if statuses == 0:
|
|
# populate statuses
|
|
logging.info("preparing statuses for job %s/%s", job.id, job.uuid)
|
|
photo_ids = set()
|
|
|
|
for target in c.query(JobTarget).filter(JobTarget.job_id == job.id).all():
|
|
if target.target_type == JobTargetType.photo:
|
|
photo_ids.update([target.target])
|
|
elif target.target_type == JobTargetType.photoset:
|
|
for photo in c.query(Photo.id).filter(Photo.set_id == target.target).all():
|
|
photo_ids.update([photo[0]])
|
|
elif target.target_type == JobTargetType.tag:
|
|
for photo in c.query(Photo.id). \
|
|
join(PhotoSet).join(TagItem).join(Tag). \
|
|
filter(Tag.id == target.target). \
|
|
all():
|
|
photo_ids.update([photo[0]])
|
|
|
|
for photo_id in photo_ids:
|
|
c.add(JobTargetStatus(target_id=target.id, job_id=job.id, photo_id=photo_id))
|
|
|
|
c.commit() # if this fails, somebody else is handling the job
|
|
|
|
# query for Photos targeted by the task and allow the job to filter them
|
|
# query...
|
|
# for each target
|
|
# get photo, but also the associated set (and maybe tag)
|
|
# filter
|
|
# add a JobTargetStatus if the filter allows it
|
|
|
|
# end our transaction, thus unlocking the job
|
|
|
|
|
|
class ThreadedJobServer(JobServer):
|
|
def __init__(self, dbengine):
|
|
"""
|
|
a version of the jobserver that runs as a thread
|
|
"""
|
|
super().__init__(dbengine)
|
|
self.notifyq = queue.Queue()
|
|
self.work_notifyq = queue.Queue()
|
|
|
|
def queue_notify(self, job_uuid):
|
|
self.notifyq.put(job_uuid)
|
|
|
|
def run_background(self):
|
|
from threading import Thread
|
|
Thread(target=self.run_notifier, daemon=True).start()
|
|
Thread(target=self.run_work, daemon=True).start()
|
|
|
|
def run_notifier(self):
|
|
while True:
|
|
try:
|
|
job_uuid = self.notifyq.get(timeout=5.0)
|
|
self.handle_notify(job_uuid)
|
|
self.work_notifyq.put(job_uuid)
|
|
except queue.Empty:
|
|
pass
|
|
|
|
def run_work(self):
|
|
# start a ThreadPoolExecutor
|
|
# in a loop:
|
|
# - poll the database for work, locking and then marking rows we claim
|
|
# - poll self.work_notifyq
|
|
# - submit it to the pool
|
|
# - wait for pool results
|
|
# - update the db with job results
|
|
pass
|