mediasort/mediaweb/__init__.py

414 lines
15 KiB
Python

import os
import json
import logging
import cherrypy
from time import sleep
from queue import Queue
from threading import Thread
from urllib.parse import urlparse
from dataclasses import dataclass, field
from deluge_client import DelugeRPCClient
from jinja2 import Environment, FileSystemLoader, select_autoescape
from mediaweb import shows
from enum import Enum
APPROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../"))
@dataclass
class Cache:
torrents: dict = field(default_factory=dict)
shows: dict = field(default_factory=dict)
moves: dict = field(default_factory=dict)
@dataclass
class Client:
rpc: DelugeRPCClient
pathmap: (str, str)
class SortResult(Enum):
OK = 0
EXISTED = 1
FAIL = 2
class ClientCache(object):
def __init__(self, options, libpath):
self.options = options
self.clients = {}
self.data = Cache()
self.q = Queue()
self.inflight = False
self.libpath = libpath
self.background_t = Thread(target=self.background, daemon=True)
self.timer_t = Thread(target=self.timer, daemon=True)
def start(self):
self.background_t.start()
self.timer_t.start()
def add_client(self, rpc, pathmap): # not thread safe
cnum = len(self.clients)
self.data.torrents[cnum] = {}
self.data.moves[cnum] = {}
self.clients[cnum] = Client(rpc, pathmap)
def refresh(self):
if self.q.qsize() <= 1: # guarantees a refresh after your call and also deduplicates effort
self.q.put(None)
def background(self):
while True:
self.q.get() # block until we need to do something
self.inflight = True
logging.info("performing background tasks...")
self.build_showindex()
self.build_torrentindex()
self.queue_sorts()
self.q.task_done()
self.inflight = False
logging.info("background tasks complete")
def timer(self):
while True:
self.refresh()
logging.info("scheduling next loop")
sleep(300) # TODO configurable task interval
def build_torrentindex(self):
logging.info("refreshing torrents")
for cid, client in self.clients.items(): # TODO parallelize
self.data.torrents[cid] = client.rpc.core.get_torrents_status({"label": self.options["label"]},
['name', 'label', 'save_path', 'is_seed',
'is_finished', 'progress', 'files',
'paused', 'peers', 'eta'])
newkeys = self.data.torrents[cid].keys()
for key in list(self.data.moves[cid].keys()):
if key not in newkeys:
del self.data.moves[cid][key] # delete precomputed sort operations for torrents that went away
def build_showindex(self):
logging.info("updating show index")
data = shows.create_index([self.libpath])
self.data.shows = sorted(data, key=lambda x: x.name)
def queue_sorts(self):
logging.info("precomputing sorts")
for cid, torrents in self.data.torrents.items():
for thash, torrent in torrents.items():
if thash not in self.data.moves[cid]:
matches = shows.match_episode(get_fname(torrent), self.data.shows)
if matches:
self.data.moves[cid][thash] = matches[0]
@property
def torrents(self):
return {self.format_tkey(cid, thash): torrent
for cid, torrents in self.data.torrents.items()
for thash, torrent in torrents.items()}
@property
def moves(self):
return {self.format_tkey(cid, thash): move
for cid, moves in self.data.moves.items()
for thash, move in moves.items()}
def client(self, tkey):
cid, thash = self.extract_key(tkey)
return thash, self.clients[cid]
@staticmethod
def extract_key(tkey):
cid, thash = tkey.split(":")
return (int(cid), thash)
@staticmethod
def format_tkey(cid, thash):
return f"{cid}:{thash}"
class MediaWeb(object):
def __init__(self, cache, templater, options):
self.tpl = templater
self.cache = cache
self.options = options
def render(self, template, **kwargs):
"""
Render a template
"""
return self.tpl.get_template(template).render(options=self.options,
torrents=self.cache.torrents,
moves=self.cache.moves,
shows=self.cache.data.shows,
user=cherrypy.request.login,
**kwargs,
**self.get_default_vars())
def get_default_vars(self):
return {}
@cherrypy.expose
def index(self):
return self.render("splash.html")
@cherrypy.expose
def home(self, action=None):
if action:
if action == "update":
self.cache.refresh()
raise cherrypy.HTTPRedirect("/home")
return self.render("index.html", inflight=self.cache.inflight)
@cherrypy.expose
def move(self, tkey, dest=None, otherdest=None):
thash, client = self.cache.client(tkey)
torrent = client.rpc.core.get_torrent_status(thash, [])
if cherrypy.request.method == "POST" and (dest or otherdest): # TODO maybe support otherdest list per client
target = os.path.join(client.pathmap[0],
otherdest or dest)
client.rpc.core.move_storage([thash], target)
self.cache.refresh()
raise cherrypy.HTTPRedirect("/home")
return self.render("moveform.html", torrent=torrent, tkey=tkey)
@cherrypy.expose
def sort(self, tkey, score=65, dest=None):
thash, client = self.cache.client(tkey)
torrent = client.rpc.core.get_torrent_status(thash, [])
# find the actual file among the torrent's files - really we just pick the biggest one
fname = get_fname(torrent)
# find candidate dest locations
score = int(score)
matches = shows.match_episode(fname, self.cache.data.shows, minscore=score)
if cherrypy.request.method == "POST" and dest:
# pick the candidate dest the user specified
thematch = None
for m in matches:
if m.dest.dir == dest:
thematch = m
break
result, reason = self.execute_move(tkey, torrent, thematch)
self.cache.refresh()
return self.render("sortform_done.html", success_torrents=[[torrent, result, reason]], failed_torrents=[])
return self.render("sortform.html", torrent=torrent, matches=matches, tkey=tkey, score=score)
def execute_move(self, tkey, torrent, match):
result = SortResult.OK
# resolve the pathmap
thash, client = self.cache.client(tkey)
pmap = client.pathmap
fname = get_fname(torrent)
in_library_path = os.path.join(match.dest.dir, match.subdest, fname) # path relative from the library's root
client_full_path = os.path.join(torrent["save_path"], fname) # absolute storage path in deluge
assert client_full_path[0:len(pmap[0])] == pmap[0] # sanity check: deluge's path must starts with our pathmap
local_torrent_path = os.path.join(pmap[1], client_full_path[len(pmap[0]):].lstrip("/")) # our perspective's path to the file
local_library_path = os.path.join(self.options["library_path"], in_library_path) # where we will place the file in the library
# check if the dest file already exists:
if os.path.exists(local_library_path):
# if the src and dest are already linked to the same file, this is a noop
if os.stat(local_torrent_path).st_ino == os.stat(local_library_path).st_ino:
result = SortResult.EXISTED
logging.info("dest exists, skipping linking %s -> %s", local_torrent_path, local_library_path)
else:
return SortResult.FAIL, "destination file already exists and has different contents"
else:
# hard link into library
showdir = os.path.dirname(local_library_path)
if not os.path.exists(showdir):
os.makedirs(showdir)
os.link(local_torrent_path, local_library_path)
logging.info("linking %s -> %s", local_torrent_path, local_library_path)
client_stashdir = os.path.join(pmap[0],
self.options["stashprefix"],
get_mapped_stashdir(self.options["trackermap"], torrent["trackers"]))
# move deluge path to stash dir
client.rpc.core.move_storage([torrent["hash"]], client_stashdir)
# label torrent as sorted
client.rpc.label.set_torrent(torrent["hash"], self.options["label_done"])
return result, None
@cherrypy.expose
def autosort(self, tkeys):
if not isinstance(tkeys, list):
tkeys = [tkeys]
results = []
for tkey in tkeys:
thash, client = self.cache.client(tkey)
torrent = client.rpc.core.get_torrent_status(thash, []) # TODO reduce to needed fields
res = self.execute_move(tkey, torrent, self.cache.moves[tkey])
results.append([torrent, res[0], res[1]])
self.cache.refresh()
return self.render("sortform_done.html",
success_torrents=[r for r in results if r[1] != SortResult.FAIL],
failed_torrents=[r for r in results if r[1] == SortResult.FAIL])
@cherrypy.expose
def modify(self, action, tkey):
thash, client = self.cache.client(tkey)
if action == "Resume":
client.rpc.core.resume_torrent([thash])
elif action == "Pause":
client.rpc.core.pause_torrent([thash])
elif action == "Delete":
if self.cache.torrents[tkey]["progress"] > 0:
raise Exception("Cowardly refusing to delete a torrent with progress > 0")
else:
client.rpc.core.remove_torrent(thash, True)
else:
raise cherrypy.HTTPError(404)
self.cache.refresh()
raise cherrypy.HTTPRedirect("/home")
def get_fname(torrent):
finfo = None
fsize = 0
for tfile in torrent["files"]:
if tfile["size"] > fsize:
finfo = tfile
return finfo["path"]
def get_mapped_stashdir(mapping, trackers):
tracker = None
for tracker in trackers:
for k, v in mapping.items():
if k in tracker["url"]:
return v
return urlparse(tracker["url"]).hostname.lower() if tracker else "other"
def tsortbyname(dict_items):
return sorted(dict_items, key=lambda x: x[1]["name"].lower())
def main():
logging.basicConfig(level=logging.WARNING,
format="%(asctime)-15s %(levelname)-8s %(filename)s:%(lineno)d %(message)s")
_sentry_dsn = os.environ.get("SENTRY_DSN")
if _sentry_dsn:
try:
import sentry_sdk
sentry_sdk.init(_sentry_dsn)
logging.info("enabled error reporting via sentry")
except ImportError as ie:
logging.error(f"SENTRY_DSN set, but sentry_sdk unavailable: {ie}")
import argparse
import signal
parser = argparse.ArgumentParser(description="mediaweb server")
parser.add_argument("-c", "--config", required=True, help="config file path")
parser.add_argument('--debug', action="store_true", help="enable development options")
args = parser.parse_args()
if args.debug:
logging.getLogger().setLevel(logging.INFO)
with open(args.config) as f:
cfg = json.load(f)
options = { # :|
"movedests": cfg["movedests"],
# "pathmap": args.pathmap.split(":"),
"library_path": cfg["library_path"],
"stashprefix": cfg["stashprefix"],
"trackermap": cfg["trackermap"],
"label": cfg["label"],
"label_done": cfg["label_done"]
}
tpl_dir = os.path.join(APPROOT, "templates")
tpl = Environment(loader=FileSystemLoader(tpl_dir),
autoescape=select_autoescape(['html', 'xml']))
tpl.filters.update(tsortbyname=tsortbyname,
len=len,
jsond=json.dumps)
def validate_password(realm, user, passw):
return user == passw # lol
rpc_cache = ClientCache(options, cfg["library_path"])
for client in cfg["deluges"]:
uri = urlparse(client["uri"])
assert uri.scheme == "deluge"
port = uri.port if uri.port else 58846
rpc_cache.add_client(DelugeRPCClient(uri.hostname, port, uri.username, uri.password, decode_utf8=True),
tuple(client["pathmap"]))
web = MediaWeb(rpc_cache, tpl, options)
cherrypy.tree.mount(web, '/', {'/': {'tools.auth_basic.on': True,
'tools.auth_basic.realm': 'mediaweb',
'tools.auth_basic.checkpassword': validate_password, },
'/index': {'tools.auth_basic.on': False},
'/static': {'tools.auth_basic.on': False,
'tools.staticdir.on': True,
'tools.staticdir.dir': os.path.join(APPROOT, 'assets')}})
# General config options
cherrypy.config.update({
'tools.sessions.on': False,
'request.show_tracebacks': True,
'server.show_tracebacks': True,
'server.socket_port': cfg["port"],
'server.thread_pool': 1 if args.debug else 5,
'server.socket_host': '0.0.0.0',
'log.screen': False,
'engine.autoreload.on': args.debug
})
# Setup signal handling and run it.
def signal_handler(signum, stack):
logging.warning('Got sig {}, exiting...'.format(signum))
cherrypy.engine.exit()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
rpc_cache.start()
cherrypy.engine.start()
cherrypy.engine.block()
finally:
logging.info("API has shut down")
cherrypy.engine.exit()
if __name__ == '__main__':
main()
# https://github.com/deluge-torrent/deluge/blob/1.3-stable/deluge/ui/console/commands/info.py#L46
# https://deluge.readthedocs.io/en/latest/reference/api.html?highlight=rpc