job docs and cleanup
Gitea/photolib/pipeline/head This commit looks good Details

This commit is contained in:
dave 2023-07-05 23:29:15 -07:00
parent 778905b799
commit db86cf3392
1 changed files with 58 additions and 86 deletions

View File

@ -2,16 +2,52 @@ import queue
import logging
import traceback
import cherrypy
from threading import Thread
from photoapp.dbutils import create_db_sessionmaker, cursorwrap
from photoapp.types import Job, JobTargetType, JobTarget, JobStatus, JobTargetStatus, JobTargetState, Photo, PhotoSet, \
Tag, TagItem
from sqlalchemy import func
from concurrent.futures import ThreadPoolExecutor, as_completed
logger = logging.getLogger("jobs")
"""
How the job system works:
+--------------------+ +-------------------+
| other applications | notify | Job server |
| ==============>> |
| | http/queue | |
+-------------||-----+ +--------||---------s+
|| ____________ ||
insert || / Database \\ || check db for work (poll/on notify)
job, || | | || manage job statuses
job targets **========> <========** lock the row
\\___________/ execute the work
update and unlock the row
job: server-side task that can process one or many photos
job target: identifies items the job is expected to be ran against. Can target an individual photo, a set, or all within
a tag.
job statuses: the status of one photo targeted by the job. Jobs are executed on a per-photo bases, regardless of how
they were originally targeted with the above.
other applications: any entity that can submit jobs. Needs database access and access to the internal network for
notify. The http api server is an example.
job server: the class defined in this file. the innermost public interface for managing jobs.
notify: mechanism to notify the jobserver of work. If the job server runs in-process, this is a python queue that is
"notified" by pushing an object into it. If the job server runs as a stand-alone process, this is an http endpoint.
in either case, JobsClient is the client object to this
"""
class JobSubscriber(object):
"""
adapter between cherrypy bus and JobsClient
@ -79,6 +115,7 @@ def run_task(dbsession, task, job, job_status):
from random import randint
from time import sleep
def job_noop(dbsession, job, job_status):
@ -86,6 +123,7 @@ def job_noop(dbsession, job, job_status):
logger.info("job_noop: target photo: %s", job_status.photo_id)
fail = randint(0, 1) == 0
sleep(10)
logger.info("job_noop: this time we %s", "fail" if fail else "succeed")
@ -93,16 +131,6 @@ def job_noop(dbsession, job, job_status):
raise Exception("lol")
# class PhotoappTask(object):
# def run(self):
# raise NotImplementedError()
# class LolTask(PhotoappTask):
# def run(self):
# pass
"""
this is the list of job actions the system supports. The dict's keys match with the database Job objs' `job_name` column
"""
@ -117,38 +145,24 @@ class BaseJobServer(object):
class JobServer(BaseJobServer):
"""
job executor service. This object orchestrates job running by doing:
- in any case, locking the job rows via sql
- listening to notifications about new jobs
- populate JobTargetStatus
- looking for JobTargetStatus that can be claimed and worked on
- executing the work
"""
def __init__(self, dbengine):
"""
job executor service
"""
self.engine = dbengine
self.session = create_db_sessionmaker(self.engine)
def notify(self, job_uuid):
"""
"""
raise NotImplementedError()
def handle_notify(self, c, job_uuid):
"""
notify the jobserver that a job needs to be processed. The job may be in any state, including locked. The job
should be processed as follows:
notify the jobserver that a job may need to be processed. The job should be processed according to status
as follows:
JobStatus.paused: do nothing
JobStatus.done: do nothing
JobStatus.error: do nothing
JobStatus.ready: change to running state and begin processing
JobStatus.running: begin processing.
JobStatus.ready: change to running state, create JobTargetStatus rows, and begin processing
JobStatus.running: continue processing.
processing:
- the job is added to a list of "watched" jobs
- we issue a query looking for work
- if no work is found, the job status is changed to Done (or error if any done work has error status?) and stop watching it
calls to this function should not block, it is meant to be called, for example behind an api call in an http api
"""
raise NotImplementedError()
@ -156,22 +170,20 @@ class JobServer(BaseJobServer):
class ThreadedJobServer(JobServer):
def __init__(self, dbengine):
"""
a version of the jobserver that runs as a thread
a simple version of the jobserver that runs as a single thread
TODO:
- does not set jobs to Completed status if we never process work for it
"""
super().__init__(dbengine)
self.notifyq = queue.Queue()
self.work_notifyq = queue.Queue()
self.pool = ThreadPoolExecutor(max_workers=5)
self.futures = set()
def run_background(self):
from threading import Thread
# run_notifier handles incoming notifications of jobs. When we are notified of a job, we prepare it for running
Thread(target=self.run_notifier, daemon=True).start()
# run_work_queue periodically searches the database for new work, and starts running it when found
Thread(target=self.run_work_queue, daemon=True).start()
# pool_watcher watches the ThreadPoolExecutor for completed jobs
# Thread(target=self.pool_watcher, daemon=True).start()
def notify(self, job_uuid):
self.notifyq.put(job_uuid)
@ -179,7 +191,7 @@ class ThreadedJobServer(JobServer):
def run_notifier(self):
while True:
try:
job_uuid = self.notifyq.get(timeout=5.0)
job_uuid = self.notifyq.get(timeout=30)
self.handle_notify(job_uuid)
except queue.Empty:
pass
@ -208,9 +220,6 @@ class ThreadedJobServer(JobServer):
# so, we'll just crash here
c.commit()
# watch the job which will cause us to begin looking for and executing work within the job
# self.watched_jobs.add(job.id)
# notify the work queue
self.work_notifyq.put(None)
@ -242,25 +251,18 @@ class ThreadedJobServer(JobServer):
def run_work_queue(self):
while True:
try:
if self.search_work():
continue
self.work_notifyq.get(timeout=30)
# spin on search_work(), which returns true if we found and executed work
if not self.search_work():
self.work_notifyq.get(timeout=30)
except queue.Empty:
pass
@cursorwrap
def search_work(self, c):
# - we "select from JobTargetStatus" to find available work, (and lock it here)
# - if no work is found, we read from the "internal queue"
# - if we have not been notified of new work, this sleeps for X seconds before
# returning Empty and looping again
# - if we have, we immediately select for new work again.
# - if work is found, it is immediately worked on
# - we change the JobTargetStatus Status to running (so other instances of this loop won't pick this item up)
# - commit (the row is now unlocked)
# - we start the jobstatus in the ThreadPoolExecutor or whatever
# - when the job finishes, something watching the finished futures or whatever updates the job status again
# - if we query and find that there is no more todo/pending work for this job, we mark the job done
# - if work is found, it is executed immediately
# - commit (the row is now unlocked)
# - if we query and find that there is no more todo/pending work for this job, we mark the job done
available = c.query(JobTargetStatus, Job) \
.join(Job) \
@ -301,33 +303,3 @@ class ThreadedJobServer(JobServer):
c.commit()
return True
"""
# mark as running?
# stuff it into the threadpool
# self.pool.submit(self.run_job, task, job.to_json(), job_status.to_json())
def run_job(self, task, job, job_status):
# TODO execute task
pass
def pool_watcher(self):
while True:
try:
for future in as_completed(self.futures, timeout=5):
self.futures.remove(future)
self.handle_completed(future)
except TimeoutError:
pass
def handle_completed(self, future):
try:
logger.info("handle completed job:", future)
job_result = future.result()
#TODO update the database with the job result status
except Exception as e:
logger.info("handle failed job:", future)
"""