diff --git a/photoapp/daemon.py b/photoapp/daemon.py index 7b98532..cd4150e 100644 --- a/photoapp/daemon.py +++ b/photoapp/daemon.py @@ -18,7 +18,7 @@ from photoapp.dbutils import SAEnginePlugin, SATool, db, get_db_engine, date_for from photoapp.utils import auth, require_auth, photoset_auth_filter, slugify, cherryparam, number_format from photoapp.storage import uri_to_storage from photoapp.webutils import validate_password, serve_thumbnail_placeholder -from photoapp.jobs import JobsClient, JobSubscriber, ThreadJobServer +from photoapp.jobs import JobsClient, JobSubscriber, ThreadedJobServer from jinja2 import Environment, FileSystemLoader, select_autoescape from sqlalchemy import desc, asc, func, and_, or_ @@ -721,7 +721,7 @@ def main(): # set up jobs system - this is temporary, this will be something pluggable later jobs_db_engine = get_db_engine(args.database, debug=args.db_debug) - jobs_server = ThreadJobServer(jobs_db_engine) + jobs_server = ThreadedJobServer(jobs_db_engine) jobs_server.run_background() jobs_client = JobsClient(jobs_db_engine, notifier=jobs_server.queue_notify) JobSubscriber(jobs_client) diff --git a/photoapp/jobs.py b/photoapp/jobs.py index 2e15bfa..554a6d7 100644 --- a/photoapp/jobs.py +++ b/photoapp/jobs.py @@ -1,9 +1,13 @@ import queue +import logging import cherrypy from photoapp.dbutils import create_db_sessionmaker, cursorwrap from photoapp.types import Job, JobTargetType, JobTarget +logger = logging.getLogger("jobs") + + class JobSubscriber(object): """ adapter between cherrypy bus and JobsClient @@ -80,7 +84,12 @@ task_definitions = { } -class JobServer(object): +class BaseJobServer(object): + #TODO subclass this out in way that supports sqlite - below implementation's logic relies on with_for_update() + pass + + +class JobServer(BaseJobServer): """ job executor service. This object orchestrates job running by doing: - in any case, locking the job rows via sql @@ -94,11 +103,14 @@ class JobServer(object): self.session = create_db_sessionmaker(self.engine) @cursorwrap - def handle_notify(self, db, job_uuid): + def handle_notify(self, c, job_uuid): # query the db for the target job, and lock it + job = c.query(Job).filter(Job.uuid == job_uuid).first() + logging.info("handle_notify for job %s: %s", job_uuid, job) # get the task we're running - task = task_definitions[xxx]() + task = task_definitions[job.job_name]() + logging.info("task: %s", task) # query for Photos targeted by the task and allow the job to filter them # query... @@ -110,21 +122,38 @@ class JobServer(object): # end our transaction, thus unlocking the job -class ThreadJobServer(JobServer): +class ThreadedJobServer(JobServer): def __init__(self, dbengine): """ a version of the jobserver that runs as a thread """ super().__init__(dbengine) self.notifyq = queue.Queue() + self.work_notifyq = queue.Queue() def queue_notify(self, job_uuid): self.notifyq.put(job_uuid) - def run(self): + def run_background(self): + from threading import Thread + Thread(target=self.run_notifier, daemon=True).start() + Thread(target=self.run_work, daemon=True).start() + + def run_notifier(self): while True: try: job_uuid = self.notifyq.get(timeout=5.0) self.handle_notify(job_uuid) + self.self.work_notifyq.put(job_uuid) except queue.Empty: pass + + def run_work(self): + # start a ThreadPoolExecutor + # in a loop: + # - poll the database for work, locking and then marking rows we claim + # - poll self.work_notifyq + # - submit it to the pool + # - wait for pool results + # - update the db with job results + pass