diff --git a/photoapp/jobs.py b/photoapp/jobs.py index 6b1269c..2e15bfa 100644 --- a/photoapp/jobs.py +++ b/photoapp/jobs.py @@ -1,3 +1,4 @@ +import queue import cherrypy from photoapp.dbutils import create_db_sessionmaker, cursorwrap from photoapp.types import Job, JobTargetType, JobTarget @@ -58,3 +59,72 @@ class JobsClient(object): """ self.notifier(uuid) + +class PhotoappTask(object): + def get_targets(self): + raise NotImplementedError() + + def run_targets(self): + raise NotImplementedError() + + +class LolTask(PhotoappTask): + pass + + +""" +this is the list of job actions the system supports. The dict's keys match with the database Job objs' `job_name` column +""" +task_definitions = { + "loljob": LolTask +} + + +class JobServer(object): + """ + job executor service. This object orchestrates job running by doing: + - in any case, locking the job rows via sql + - listening to notifications about new jobs + - populate JobTargetStatus + - looking for JobTargetStatus that can be claimed and worked on + - executing the work + """ + def __init__(self, dbengine): + self.engine = dbengine + self.session = create_db_sessionmaker(self.engine) + + @cursorwrap + def handle_notify(self, db, job_uuid): + # query the db for the target job, and lock it + + # get the task we're running + task = task_definitions[xxx]() + + # query for Photos targeted by the task and allow the job to filter them + # query... + # for each target + # get photo, but also the associated set (and maybe tag) + # filter + # add a JobTargetStatus if the filter allows it + + # end our transaction, thus unlocking the job + + +class ThreadJobServer(JobServer): + def __init__(self, dbengine): + """ + a version of the jobserver that runs as a thread + """ + super().__init__(dbengine) + self.notifyq = queue.Queue() + + def queue_notify(self, job_uuid): + self.notifyq.put(job_uuid) + + def run(self): + while True: + try: + job_uuid = self.notifyq.get(timeout=5.0) + self.handle_notify(job_uuid) + except queue.Empty: + pass