simple job loop
Gitea/photolib/pipeline/head This commit looks good Details

This commit is contained in:
dave 2023-07-05 22:36:26 -07:00
parent 39e04a9c7b
commit 778905b799
2 changed files with 101 additions and 22 deletions

View File

@ -723,7 +723,7 @@ def main():
jobs_db_engine = get_db_engine(args.database, debug=args.db_debug)
jobs_server = ThreadedJobServer(jobs_db_engine)
jobs_server.run_background()
jobs_client = JobsClient(jobs_db_engine, notifier=jobs_server.queue_notify)
jobs_client = JobsClient(jobs_db_engine, notifier=jobs_server.notify)
JobSubscriber(jobs_client)
# set up webapp

View File

@ -1,11 +1,12 @@
import queue
import logging
import traceback
import cherrypy
from photoapp.dbutils import create_db_sessionmaker, cursorwrap
from photoapp.types import Job, JobTargetType, JobTarget, JobStatus, JobTargetStatus, JobTargetState, Photo, PhotoSet, \
Tag, TagItem
from sqlalchemy import func
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
logger = logging.getLogger("jobs")
@ -67,20 +68,46 @@ class JobsClient(object):
self.notifier(uuid)
class PhotoappTask(object):
def run_targets(self):
raise NotImplementedError()
def run_task(dbsession, task, job, job_status):
try:
task(dbsession, job, job_status)
return True
except:
#TODO save the job error in the database somewhere?
traceback.print_exc()
return False
class LolTask(PhotoappTask):
pass
from random import randint
def job_noop(dbsession, job, job_status):
logger.info("job_noop: this example job does nothing")
logger.info("job_noop: target photo: %s", job_status.photo_id)
fail = randint(0, 1) == 0
logger.info("job_noop: this time we %s", "fail" if fail else "succeed")
if fail:
raise Exception("lol")
# class PhotoappTask(object):
# def run(self):
# raise NotImplementedError()
# class LolTask(PhotoappTask):
# def run(self):
# pass
"""
this is the list of job actions the system supports. The dict's keys match with the database Job objs' `job_name` column
"""
task_definitions = {
"loljob": LolTask
"noop": job_noop
}
@ -135,15 +162,16 @@ class ThreadedJobServer(JobServer):
self.notifyq = queue.Queue()
self.work_notifyq = queue.Queue()
self.pool = ThreadPoolExecutor(max_workers=5)
self.futures = set()
def run_background(self):
from threading import Thread
# run_notifier handles incoming notifications of jobs. When we are notified of a job, we prepare it for unning
# run_notifier handles incoming notifications of jobs. When we are notified of a job, we prepare it for running
Thread(target=self.run_notifier, daemon=True).start()
# run_work_queue periodically searches the database for new work, and starts running it when found
Thread(target=self.run_work_queue, daemon=True).start()
# pool_watcher watches the ThreadPoolExecutor for completed jobs
Thread(target=self.pool_watcher, daemon=True).start()
# Thread(target=self.pool_watcher, daemon=True).start()
def notify(self, job_uuid):
self.notifyq.put(job_uuid)
@ -165,10 +193,6 @@ class ThreadedJobServer(JobServer):
# we don't care about jobs that are already done
return
# get the task we're running
# task = task_definitions[job.job_name]()
# logging.info("task: %s", task)
if job.status == JobStatus.ready:
# check if JobTargetStatus has been populated for this job
statuses_count = c.query(func.count(JobTargetStatus.id)). \
@ -194,7 +218,7 @@ class ThreadedJobServer(JobServer):
"""
populate job statuses, which also serve as the queue of items to work on
"""
logging.info("preparing statuses for job %s/%s", job.id, job.uuid)
logger.info("preparing statuses for job %s/%s", job.id, job.uuid)
photo_ids = set()
for target in c.query(JobTarget).filter(JobTarget.job_id == job.id).all():
@ -218,8 +242,9 @@ class ThreadedJobServer(JobServer):
def run_work_queue(self):
while True:
try:
if self.search_work():
continue
self.work_notifyq.get(timeout=30)
self.search_work()
except queue.Empty:
pass
@ -237,18 +262,72 @@ class ThreadedJobServer(JobServer):
# - when the job finishes, something watching the finished futures or whatever updates the job status again
# - if we query and find that there is no more todo/pending work for this job, we mark the job done
available = c.query(JobTargetStatus) \
available = c.query(JobTargetStatus, Job) \
.join(Job) \
.filter(Job.status.in_((JobStatus.ready, JobStatus.running, ))) \
.filter(JobTargetStatus.status == JobTargetState.new) \
.order_by(func.rand()) \
.limit(5) \
.limit(1) \
.with_for_update(skip_locked=True) \
.all()
.first()
# for each available work unit
if not available:
return False
# stuff it into the threadpool
job_status, job = available
# get the job function()
task = task_definitions[job.job_name]
# run the task and mark the results
if run_task(c, task, job, job_status):
job_status.status = JobTargetState.complete
else:
job_status.status = JobTargetState.error
# save and unlock
c.commit()
# if there is no more work for the job, mark it done too
remaining = c.query(func.count(JobTargetStatus.id)) \
.join(Job) \
.filter(Job.id == job.id) \
.filter(JobTargetStatus.status == JobTargetState.new) \
.order_by(func.rand()) \
.first()
if remaining[0] == 0:
job.status = JobStatus.done
c.commit()
return True
"""
# mark as running?
# stuff it into the threadpool
# self.pool.submit(self.run_job, task, job.to_json(), job_status.to_json())
def run_job(self, task, job, job_status):
# TODO execute task
pass
def pool_watcher(self):
pass
while True:
try:
for future in as_completed(self.futures, timeout=5):
self.futures.remove(future)
self.handle_completed(future)
except TimeoutError:
pass
def handle_completed(self, future):
try:
logger.info("handle completed job:", future)
job_result = future.result()
#TODO update the database with the job result status
except Exception as e:
logger.info("handle failed job:", future)
"""