131 lines
3.4 KiB
Python
131 lines
3.4 KiB
Python
import queue
|
|
import cherrypy
|
|
from photoapp.dbutils import create_db_sessionmaker, cursorwrap
|
|
from photoapp.types import Job, JobTargetType, JobTarget
|
|
|
|
|
|
class JobSubscriber(object):
|
|
"""
|
|
adapter between cherrypy bus and JobsClient
|
|
"""
|
|
def __init__(self, client):
|
|
self.client = client
|
|
cherrypy.engine.subscribe("create-job", self.create_job) # TODO make "create-job" a const somewhere?
|
|
|
|
def create_job(self, name, targets):
|
|
uuid = self.client.create_job(name, targets)
|
|
self.client.notify_server(uuid)
|
|
return uuid
|
|
|
|
|
|
class JobsClient(object):
|
|
def __init__(self, dbengine, notifier):
|
|
self.engine = dbengine
|
|
self.session = create_db_sessionmaker(self.engine)
|
|
self.notifier = notifier
|
|
|
|
@cursorwrap
|
|
def create_job(self, c, name, targets):
|
|
"""
|
|
create a new job in the database
|
|
|
|
targets: list of dict(
|
|
type=JobTargetType.TYPE,
|
|
targets=[1, 2, 3]
|
|
)
|
|
"""
|
|
job_targets = []
|
|
|
|
for target in targets:
|
|
for target_id in target["targets"]:
|
|
job_targets.append(
|
|
JobTarget(target_type=target["type"],
|
|
target=target_id)
|
|
)
|
|
|
|
j = Job(
|
|
job_name=name,
|
|
targets=job_targets,
|
|
)
|
|
|
|
c.add(j)
|
|
c.commit()
|
|
|
|
return j.uuid
|
|
|
|
def notify_server(self, uuid):
|
|
"""
|
|
notify the job execution server of a job that needs processing because it had been added/updated
|
|
"""
|
|
self.notifier(uuid)
|
|
|
|
|
|
class PhotoappTask(object):
|
|
def get_targets(self):
|
|
raise NotImplementedError()
|
|
|
|
def run_targets(self):
|
|
raise NotImplementedError()
|
|
|
|
|
|
class LolTask(PhotoappTask):
|
|
pass
|
|
|
|
|
|
"""
|
|
this is the list of job actions the system supports. The dict's keys match with the database Job objs' `job_name` column
|
|
"""
|
|
task_definitions = {
|
|
"loljob": LolTask
|
|
}
|
|
|
|
|
|
class JobServer(object):
|
|
"""
|
|
job executor service. This object orchestrates job running by doing:
|
|
- in any case, locking the job rows via sql
|
|
- listening to notifications about new jobs
|
|
- populate JobTargetStatus
|
|
- looking for JobTargetStatus that can be claimed and worked on
|
|
- executing the work
|
|
"""
|
|
def __init__(self, dbengine):
|
|
self.engine = dbengine
|
|
self.session = create_db_sessionmaker(self.engine)
|
|
|
|
@cursorwrap
|
|
def handle_notify(self, db, job_uuid):
|
|
# query the db for the target job, and lock it
|
|
|
|
# get the task we're running
|
|
task = task_definitions[xxx]()
|
|
|
|
# query for Photos targeted by the task and allow the job to filter them
|
|
# query...
|
|
# for each target
|
|
# get photo, but also the associated set (and maybe tag)
|
|
# filter
|
|
# add a JobTargetStatus if the filter allows it
|
|
|
|
# end our transaction, thus unlocking the job
|
|
|
|
|
|
class ThreadJobServer(JobServer):
|
|
def __init__(self, dbengine):
|
|
"""
|
|
a version of the jobserver that runs as a thread
|
|
"""
|
|
super().__init__(dbengine)
|
|
self.notifyq = queue.Queue()
|
|
|
|
def queue_notify(self, job_uuid):
|
|
self.notifyq.put(job_uuid)
|
|
|
|
def run(self):
|
|
while True:
|
|
try:
|
|
job_uuid = self.notifyq.get(timeout=5.0)
|
|
self.handle_notify(job_uuid)
|
|
except queue.Empty:
|
|
pass
|