start handling jobs
Gitea/photolib/pipeline/head This commit looks good Details

This commit is contained in:
dave 2023-01-31 22:58:26 -08:00
parent b582101543
commit 62190f357f
2 changed files with 36 additions and 7 deletions

View File

@ -18,7 +18,7 @@ from photoapp.dbutils import SAEnginePlugin, SATool, db, get_db_engine, date_for
from photoapp.utils import auth, require_auth, photoset_auth_filter, slugify, cherryparam, number_format from photoapp.utils import auth, require_auth, photoset_auth_filter, slugify, cherryparam, number_format
from photoapp.storage import uri_to_storage from photoapp.storage import uri_to_storage
from photoapp.webutils import validate_password, serve_thumbnail_placeholder from photoapp.webutils import validate_password, serve_thumbnail_placeholder
from photoapp.jobs import JobsClient, JobSubscriber, ThreadJobServer from photoapp.jobs import JobsClient, JobSubscriber, ThreadedJobServer
from jinja2 import Environment, FileSystemLoader, select_autoescape from jinja2 import Environment, FileSystemLoader, select_autoescape
from sqlalchemy import desc, asc, func, and_, or_ from sqlalchemy import desc, asc, func, and_, or_
@ -721,7 +721,7 @@ def main():
# set up jobs system - this is temporary, this will be something pluggable later # set up jobs system - this is temporary, this will be something pluggable later
jobs_db_engine = get_db_engine(args.database, debug=args.db_debug) jobs_db_engine = get_db_engine(args.database, debug=args.db_debug)
jobs_server = ThreadJobServer(jobs_db_engine) jobs_server = ThreadedJobServer(jobs_db_engine)
jobs_server.run_background() jobs_server.run_background()
jobs_client = JobsClient(jobs_db_engine, notifier=jobs_server.queue_notify) jobs_client = JobsClient(jobs_db_engine, notifier=jobs_server.queue_notify)
JobSubscriber(jobs_client) JobSubscriber(jobs_client)

View File

@ -1,9 +1,13 @@
import queue import queue
import logging
import cherrypy import cherrypy
from photoapp.dbutils import create_db_sessionmaker, cursorwrap from photoapp.dbutils import create_db_sessionmaker, cursorwrap
from photoapp.types import Job, JobTargetType, JobTarget from photoapp.types import Job, JobTargetType, JobTarget
logger = logging.getLogger("jobs")
class JobSubscriber(object): class JobSubscriber(object):
""" """
adapter between cherrypy bus and JobsClient adapter between cherrypy bus and JobsClient
@ -80,7 +84,12 @@ task_definitions = {
} }
class JobServer(object): class BaseJobServer(object):
#TODO subclass this out in way that supports sqlite - below implementation's logic relies on with_for_update()
pass
class JobServer(BaseJobServer):
""" """
job executor service. This object orchestrates job running by doing: job executor service. This object orchestrates job running by doing:
- in any case, locking the job rows via sql - in any case, locking the job rows via sql
@ -94,11 +103,14 @@ class JobServer(object):
self.session = create_db_sessionmaker(self.engine) self.session = create_db_sessionmaker(self.engine)
@cursorwrap @cursorwrap
def handle_notify(self, db, job_uuid): def handle_notify(self, c, job_uuid):
# query the db for the target job, and lock it # query the db for the target job, and lock it
job = c.query(Job).filter(Job.uuid == job_uuid).first()
logging.info("handle_notify for job %s: %s", job_uuid, job)
# get the task we're running # get the task we're running
task = task_definitions[xxx]() task = task_definitions[job.job_name]()
logging.info("task: %s", task)
# query for Photos targeted by the task and allow the job to filter them # query for Photos targeted by the task and allow the job to filter them
# query... # query...
@ -110,21 +122,38 @@ class JobServer(object):
# end our transaction, thus unlocking the job # end our transaction, thus unlocking the job
class ThreadJobServer(JobServer): class ThreadedJobServer(JobServer):
def __init__(self, dbengine): def __init__(self, dbengine):
""" """
a version of the jobserver that runs as a thread a version of the jobserver that runs as a thread
""" """
super().__init__(dbengine) super().__init__(dbengine)
self.notifyq = queue.Queue() self.notifyq = queue.Queue()
self.work_notifyq = queue.Queue()
def queue_notify(self, job_uuid): def queue_notify(self, job_uuid):
self.notifyq.put(job_uuid) self.notifyq.put(job_uuid)
def run(self): def run_background(self):
from threading import Thread
Thread(target=self.run_notifier, daemon=True).start()
Thread(target=self.run_work, daemon=True).start()
def run_notifier(self):
while True: while True:
try: try:
job_uuid = self.notifyq.get(timeout=5.0) job_uuid = self.notifyq.get(timeout=5.0)
self.handle_notify(job_uuid) self.handle_notify(job_uuid)
self.self.work_notifyq.put(job_uuid)
except queue.Empty: except queue.Empty:
pass pass
def run_work(self):
# start a ThreadPoolExecutor
# in a loop:
# - poll the database for work, locking and then marking rows we claim
# - poll self.work_notifyq
# - submit it to the pool
# - wait for pool results
# - update the db with job results
pass