refactor jobs a bit and plan out the basic executor
All checks were successful
Gitea/photolib/pipeline/head This commit looks good
All checks were successful
Gitea/photolib/pipeline/head This commit looks good
This commit is contained in:
parent
7996c829d0
commit
9607708314
124
photoapp/jobs.py
124
photoapp/jobs.py
@ -2,7 +2,7 @@ import queue
|
||||
import logging
|
||||
import cherrypy
|
||||
from photoapp.dbutils import create_db_sessionmaker, cursorwrap
|
||||
from photoapp.types import Job, JobTargetType, JobTarget, JobTargetStatus, Photo, PhotoSet, Tag, TagItem
|
||||
from photoapp.types import Job, JobTargetType, JobTarget, JobStatus, JobTargetStatus, Photo, PhotoSet, Tag, TagItem
|
||||
from sqlalchemy import func
|
||||
|
||||
|
||||
@ -100,6 +100,12 @@ class JobServer(BaseJobServer):
|
||||
self.engine = dbengine
|
||||
self.session = create_db_sessionmaker(self.engine)
|
||||
|
||||
def notify(self, job_uuid):
|
||||
"""
|
||||
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def handle_notify(self, c, job_uuid):
|
||||
"""
|
||||
notify the jobserver that a job needs to be processed. The job may be in any state, including locked. The job
|
||||
@ -113,7 +119,7 @@ class JobServer(BaseJobServer):
|
||||
processing:
|
||||
- the job is added to a list of "watched" jobs
|
||||
- we issue a query looking for work
|
||||
- if no work is found, the job status is changed to Done (or error if any done work has error status?)
|
||||
- if no work is found, the job status is changed to Done (or error if any done work has error status?) and stop watching it
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@ -126,49 +132,17 @@ class ThreadedJobServer(JobServer):
|
||||
super().__init__(dbengine)
|
||||
self.notifyq = queue.Queue()
|
||||
self.work_notifyq = queue.Queue()
|
||||
|
||||
@cursorwrap
|
||||
def handle_notify(self, c, job_uuid):
|
||||
# query the db for the target job, and lock it
|
||||
job = c.query(Job).filter(Job.uuid == job_uuid).first()
|
||||
|
||||
# get the task we're running
|
||||
# task = task_definitions[job.job_name]()
|
||||
# logging.info("task: %s", task)
|
||||
|
||||
# check if JobTargetStatus has been populated for this job
|
||||
statuses = c.query(func.count(JobTargetStatus.id)).join(JobTarget).filter(JobTarget.job_id == job.id).first()[0]
|
||||
|
||||
if statuses == 0:
|
||||
# populate statuses
|
||||
logging.info("preparing statuses for job %s/%s", job.id, job.uuid)
|
||||
photo_ids = set()
|
||||
|
||||
for target in c.query(JobTarget).filter(JobTarget.job_id == job.id).all():
|
||||
if target.target_type == JobTargetType.photo:
|
||||
photo_ids.update([target.target])
|
||||
elif target.target_type == JobTargetType.photoset:
|
||||
for photo in c.query(Photo.id).filter(Photo.set_id == target.target).all():
|
||||
photo_ids.update([photo[0]])
|
||||
elif target.target_type == JobTargetType.tag:
|
||||
for photo in c.query(Photo.id). \
|
||||
join(PhotoSet).join(TagItem).join(Tag). \
|
||||
filter(Tag.id == target.target). \
|
||||
all():
|
||||
photo_ids.update([photo[0]])
|
||||
|
||||
for photo_id in photo_ids:
|
||||
c.add(JobTargetStatus(target_id=target.id, job_id=job.id, photo_id=photo_id))
|
||||
|
||||
c.commit() # if this fails, somebody else is handling job
|
||||
|
||||
def queue_notify(self, job_uuid):
|
||||
self.notifyq.put(job_uuid)
|
||||
# self.watched_jobs = set()
|
||||
|
||||
def run_background(self):
|
||||
from threading import Thread
|
||||
# run_notifier handles incoming notifications of jobs. When we are notified of a job, we start running it
|
||||
Thread(target=self.run_notifier, daemon=True).start()
|
||||
Thread(target=self.run_work, daemon=True).start()
|
||||
#
|
||||
Thread(target=self.run_work_queue, daemon=True).start()
|
||||
|
||||
def notify(self, job_uuid):
|
||||
self.notifyq.put(job_uuid)
|
||||
|
||||
def run_notifier(self):
|
||||
while True:
|
||||
@ -179,12 +153,76 @@ class ThreadedJobServer(JobServer):
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
def run_work(self):
|
||||
# start a ThreadPoolExecutor
|
||||
@cursorwrap
|
||||
def handle_notify(self, c, job_uuid):
|
||||
# query the db for the target job, and lock it
|
||||
job = c.query(Job).filter(Job.uuid == job_uuid).first()
|
||||
|
||||
if job.status not in (JobStatus.ready, JobStatus.running, ):
|
||||
# we don't care about jobs that are already done
|
||||
return
|
||||
|
||||
# get the task we're running
|
||||
# task = task_definitions[job.job_name]()
|
||||
# logging.info("task: %s", task)
|
||||
|
||||
if job.status == JobStatus.ready:
|
||||
# check if JobTargetStatus has been populated for this job
|
||||
statuses_count = c.query(func.count(JobTargetStatus.id)). \
|
||||
join(JobTarget).filter(JobTarget.job_id == job.id).first()[0]
|
||||
|
||||
if statuses_count == 0:
|
||||
for status in self.create_job_statuses(c, job):
|
||||
c.add(status)
|
||||
|
||||
job.status = JobStatus.running
|
||||
|
||||
# if this commit fails due to key constraints, somebody else is doing what we're doing in parallel
|
||||
# so, we'll just crash here
|
||||
c.commit()
|
||||
|
||||
# watch the job which will cause us to begin looking for and executing work within the job
|
||||
# self.watched_jobs.add(job.id)
|
||||
|
||||
def create_job_statuses(self, c, job):
|
||||
"""
|
||||
populate job statuses, which also serve as the queue of items to work on
|
||||
"""
|
||||
logging.info("preparing statuses for job %s/%s", job.id, job.uuid)
|
||||
photo_ids = set()
|
||||
|
||||
for target in c.query(JobTarget).filter(JobTarget.job_id == job.id).all():
|
||||
if target.target_type == JobTargetType.photo:
|
||||
photo_ids.update([target.target])
|
||||
elif target.target_type == JobTargetType.photoset:
|
||||
for photo in c.query(Photo.id).filter(Photo.set_id == target.target).all():
|
||||
photo_ids.update([photo[0]])
|
||||
elif target.target_type == JobTargetType.tag:
|
||||
for photo in c.query(Photo.id). \
|
||||
join(PhotoSet).join(TagItem).join(Tag). \
|
||||
filter(Tag.id == target.target). \
|
||||
all():
|
||||
photo_ids.update([photo[0]])
|
||||
|
||||
return [
|
||||
JobTargetStatus(target_id=target.id, job_id=job.id, photo_id=photo_id)
|
||||
for photo_id in photo_ids
|
||||
]
|
||||
|
||||
def run_work_queue(self):
|
||||
# in a loop:
|
||||
# - poll the database for work, locking and then marking rows we claim
|
||||
# - poll self.work_notifyq
|
||||
# - submit it to the pool
|
||||
# - wait for pool results
|
||||
# - update the db with job results
|
||||
|
||||
while True:
|
||||
try:
|
||||
job_uuid = self.work_notifyq.get(timeout=5.0)
|
||||
self.run_job(job_uuid)
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
def run_job(self, job_uuid):
|
||||
pass
|
||||
|
Loading…
x
Reference in New Issue
Block a user