job executor progress
All checks were successful
Gitea/photolib/pipeline/head This commit looks good
All checks were successful
Gitea/photolib/pipeline/head This commit looks good
This commit is contained in:
parent
9607708314
commit
22a661dcf7
@ -2,8 +2,10 @@ import queue
|
||||
import logging
|
||||
import cherrypy
|
||||
from photoapp.dbutils import create_db_sessionmaker, cursorwrap
|
||||
from photoapp.types import Job, JobTargetType, JobTarget, JobStatus, JobTargetStatus, Photo, PhotoSet, Tag, TagItem
|
||||
from photoapp.types import Job, JobTargetType, JobTarget, JobStatus, JobTargetStatus, JobTargetState, Photo, PhotoSet, \
|
||||
Tag, TagItem
|
||||
from sqlalchemy import func
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
|
||||
logger = logging.getLogger("jobs")
|
||||
@ -132,14 +134,16 @@ class ThreadedJobServer(JobServer):
|
||||
super().__init__(dbengine)
|
||||
self.notifyq = queue.Queue()
|
||||
self.work_notifyq = queue.Queue()
|
||||
# self.watched_jobs = set()
|
||||
self.pool = ThreadPoolExecutor(max_workers=5)
|
||||
|
||||
def run_background(self):
|
||||
from threading import Thread
|
||||
# run_notifier handles incoming notifications of jobs. When we are notified of a job, we start running it
|
||||
# run_notifier handles incoming notifications of jobs. When we are notified of a job, we prepare it for unning
|
||||
Thread(target=self.run_notifier, daemon=True).start()
|
||||
#
|
||||
# run_work_queue periodically searches the database for new work, and starts running it when found
|
||||
Thread(target=self.run_work_queue, daemon=True).start()
|
||||
# pool_watcher watches the ThreadPoolExecutor for completed jobs
|
||||
Thread(target=self.pool_watcher, daemon=True).start()
|
||||
|
||||
def notify(self, job_uuid):
|
||||
self.notifyq.put(job_uuid)
|
||||
@ -149,7 +153,6 @@ class ThreadedJobServer(JobServer):
|
||||
try:
|
||||
job_uuid = self.notifyq.get(timeout=5.0)
|
||||
self.handle_notify(job_uuid)
|
||||
self.work_notifyq.put(job_uuid)
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
@ -184,6 +187,9 @@ class ThreadedJobServer(JobServer):
|
||||
# watch the job which will cause us to begin looking for and executing work within the job
|
||||
# self.watched_jobs.add(job.id)
|
||||
|
||||
# notify the work queue
|
||||
self.work_notifyq.put(None)
|
||||
|
||||
def create_job_statuses(self, c, job):
|
||||
"""
|
||||
populate job statuses, which also serve as the queue of items to work on
|
||||
@ -210,19 +216,39 @@ class ThreadedJobServer(JobServer):
|
||||
]
|
||||
|
||||
def run_work_queue(self):
|
||||
# in a loop:
|
||||
# - poll the database for work, locking and then marking rows we claim
|
||||
# - poll self.work_notifyq
|
||||
# - submit it to the pool
|
||||
# - wait for pool results
|
||||
# - update the db with job results
|
||||
|
||||
while True:
|
||||
try:
|
||||
job_uuid = self.work_notifyq.get(timeout=5.0)
|
||||
self.run_job(job_uuid)
|
||||
self.work_notifyq.get(timeout=30)
|
||||
self.search_work()
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
def run_job(self, job_uuid):
|
||||
@cursorwrap
|
||||
def search_work(self, c):
|
||||
# - we "select from JobTargetStatus" to find available work, (and lock it here)
|
||||
# - if no work is found, we read from the "internal queue"
|
||||
# - if we have not been notified of new work, this sleeps for X seconds before
|
||||
# returning Empty and looping again
|
||||
# - if we have, we immediately select for new work again.
|
||||
# - if work is found, it is immediately worked on
|
||||
# - we change the JobTargetStatus Status to running (so other instances of this loop won't pick this item up)
|
||||
# - commit (the row is now unlocked)
|
||||
# - we start the jobstatus in the ThreadPoolExecutor or whatever
|
||||
# - when the job finishes, something watching the finished futures or whatever updates the job status again
|
||||
# - if we query and find that there is no more todo/pending work for this job, we mark the job done
|
||||
|
||||
available = c.query(JobTargetStatus) \
|
||||
.join(Job) \
|
||||
.filter(Job.status.in_((JobStatus.ready, JobStatus.running, ))) \
|
||||
.filter(JobTargetStatus.status == JobTargetState.new) \
|
||||
.order_by(func.rand()) \
|
||||
.limit(5) \
|
||||
.with_for_update(skip_locked=True) \
|
||||
.all()
|
||||
|
||||
# for each available work unit
|
||||
|
||||
# stuff it into the threadpool
|
||||
|
||||
def pool_watcher(self):
|
||||
pass
|
||||
|
Loading…
x
Reference in New Issue
Block a user