From 30c641fbea5b7d5c4f64e9a2022916f912e27a38 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 9 Apr 2018 21:52:30 -0700 Subject: [PATCH] podcast downloader features --- pysonic/database.py | 58 +++++++++++++ pysonic/library.py | 2 + pysonic/podcast.py | 194 ++++++++++++++++++++++++++++++++++++++++++++ pysonic/schema.py | 21 ++++- requirements.txt | 15 ++-- 5 files changed, 283 insertions(+), 7 deletions(-) create mode 100644 pysonic/podcast.py diff --git a/pysonic/database.py b/pysonic/database.py index d9b08db..cce1b52 100644 --- a/pysonic/database.py +++ b/pysonic/database.py @@ -434,3 +434,61 @@ class PysonicDatabase(object): cursor.execute("INSERT INTO podcasts (title, url) VALUES (?, ?)", (title if title else url, url, )) cursor.execute("COMMIT") + + @readcursor + def get_podcast_episodes(self, cursor, episode_id=None, podcast_id=None, title=None, status=None, + sortby="pe.date", order="desc", limit=None): + q = """ + SELECT + pe.* + FROM podcast_episodes as pe + INNER JOIN podcasts as p + on pe.podcastid == p.id + """ + + episodes = [] + params = [] + + conditions = [] + if episode_id: + conditions.append("pe.id = ?") + params.append(episode_id) + if podcast_id: + conditions.append("p.id = ?") + params.append(podcast_id) + if title: + conditions.append("pe.title = ?") + params.append(title) + if status: + conditions.append("pe.status = ?") + params.append(status) + if conditions: + q += " WHERE " + " AND ".join(conditions) + + if sortby: + q += " ORDER BY {}".format(sortby) + if order: + q += " {}".format(order) + if order: + order = {"asc": "ASC", "desc": "DESC"}[order] + if limit: + q += " LIMIT {}".format(limit) + + cursor.execute(q, params) + for row in cursor: + episodes.append(row) + return episodes + + @readcursor + def add_podcast_episode(self, cursor, podcast_id, date, title, description, url, mime): + cursor.execute("INSERT INTO podcast_episodes (podcastid, date, title, description, url, format, status) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + (podcast_id, date, title, description, url, mime, "new", )) + cursor.execute("COMMIT") + return cursor.lastrowid + + @readcursor + def set_podcast_episode_status(self, cursor, episode_id, status): + assert status in ["new", "downloading", "completed"] + cursor.execute("UPDATE podcast_episodes SET status=? WHERE id=?", (status, episode_id, )) + cursor.execute("COMMIT") diff --git a/pysonic/library.py b/pysonic/library.py index 4a4d435..783d65a 100644 --- a/pysonic/library.py +++ b/pysonic/library.py @@ -2,6 +2,7 @@ import os import logging from pysonic.scanner import PysonicFilesystemScanner from pysonic.types import MUSIC_TYPES +from pysonic.podcast import PodcastManager LETTER_GROUPS = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", @@ -31,6 +32,7 @@ class NoDataException(Exception): class PysonicLibrary(object): def __init__(self, database): self.db = database + self.podcastmgr = PodcastManager(database) self.get_libraries = self.db.get_libraries self.get_artists = self.db.get_artists diff --git a/pysonic/podcast.py b/pysonic/podcast.py new file mode 100644 index 0000000..e6ed3f4 --- /dev/null +++ b/pysonic/podcast.py @@ -0,0 +1,194 @@ +from threading import Thread, Lock, Timer +from concurrent.futures import ThreadPoolExecutor +from queue import Queue +from contextlib import closing +import shutil +import logging +import os +import requests +import feedparser +import time + + +class PodcastSettings(object): + """seconds between updating podcasts""" + refresh_interval = 3 #60 * 60 + """how many seconds to wait after initialization to start refreshing podcasts""" + startup_delay = 30 + """how many podcasts can be scanned at once""" + scan_threads = 4 + """root path of downloaded podcasts""" + path = "podcasts" + """how many of the most recent episodes to download""" + download_episodes = 2 + + +class PodcastManager(Thread): + def __init__(self, db): + super().__init__() + self.daemon = True + + self.db = db + self.settings = PodcastSettings + self.q = Queue() + self.start() + + def run(self): + """ + In a loop forever, query for podcasts in need of scanning for new episodes. Wait for a scan being requested (aka + a queue item) as the signal to begin scanning. + """ + self.schedule_rescan() + while True: + self.q.get() + self.refresh_podcasts() + + def interval_scan(self): + """ + Schedule the next automated rescan. Request a scan be executed. + """ + self.request_rescan() + #self.schedule_rescan() + + def schedule_rescan(self): + """ + Call the next interval scan later + """ + t = Timer(self.settings.refresh_interval, self.interval_scan) + t.daemon = True + t.start() + + def request_rescan(self): + """ + Add an item to the queue + """ + self.q.put(None) + + def refresh_podcasts(self): + """ + Refresh all the podcasts + """ + logging.info("rescanning podcasts") + # If any episodes are marked as "downloading", it's a lie and left over from before the crash + # TODO this should happen earlier than the scan + for entry in self.db.get_podcast_episodes(status="downloading"): + self.db.set_podcast_episode_status(entry['id'], "new") + + futures = [] + # TODO the TPE doesn't die as a daemon thread :| + with ThreadPoolExecutor(max_workers=self.settings.scan_threads) as pool: + for item in self.db.get_podcasts(): + futures.append(pool.submit(self.refresh_podcast, item, )) + for item in futures: + e = item.exception() + if e: + raise e + # for item in self.db.get_podcasts(): + # self.refresh_podcast(item) + logging.info("podcast refresh complete") + + def refresh_podcast(self, podcast): + """ + Refresh all metadata and episodes of a single podcast + """ + logging.info("updating podcast %s '%s' ", podcast['id'], podcast['title']) + feed = self.get_feed(podcast['url']) + for entry in feed['entries']: + self.refresh_podcast_entry(podcast['id'], entry) + self.refresh_podcast_episodes(podcast['id']) + + #TODO update the feed's description + # self.udpate_feed_meta(feed['feed']) + # 'image': {'href': 'http://sysadministrivia.com/images/1.jpg', + # 'link': 'http://sysadministrivia.com/', + # 'links': [{'href': 'http://sysadministrivia.com/', + # 'rel': 'alternate', + # 'type': 'text/html'}], + # 'title': 'The Sysadministrivia Podcast', + # 'title_detail': {'base': '', + # 'language': 'en', + # 'type': 'text/plain', + # 'value': 'The Sysadministrivia Podcast'}}, + # 'link': 'http://sysadministrivia.com/', + # 'subtitle': 'We podcast all things system administration/engineering/infosec, ' + # 'with a strong focus on GNU/Linux. We use F/OSS software whenever ' + # 'possible in the production of these podcasts. Please be sure to ' + # 'view our show notes on the site!', + # 'title': 'The Sysadministrivia Podcast', + + def refresh_podcast_episodes(self, podcast_id): + """ + Check that the most recent X episodes are downloaded. Start downloads if not. + """ + for entry in self.db.get_podcast_episodes(podcast_id=podcast_id, limit=self.settings.download_episodes): + if entry["status"] == "new": + self.download_episode(entry) + + def download_episode(self, episode): + """ + Download the episode: + - mark status as downloading + - clean up any tmp files from previous failures + - create the dir + - stream the url to temp file + - rename the temp file to final location + - mark episode as downloaded + """ + self.db.set_podcast_episode_status(episode['id'], "downloading") + + ep_dir = os.path.join(self.settings.path, str(episode['podcastid'])) + ep_path = os.path.join(ep_dir, "{}.mp3".format(episode['id'])) + ep_tmppath = os.path.join(ep_dir, ".{}.mp3".format(episode['id'])) + + os.makedirs(ep_dir, exist_ok=True) + if os.path.exists(ep_path): + os.unlink(ep_path) # previous failed downloads + if os.path.exists(ep_tmppath): + os.unlink(ep_tmppath) # previous failed downloads + + logging.info("fetching %s", episode['url']) + r = requests.get(episode['url'], stream=True) + r.raise_for_status() + with open(ep_tmppath, 'wb') as f: + shutil.copyfileobj(r.raw, f) + os.rename(ep_tmppath, ep_path) + # TODO verify or update MIME from that of the url + self.db.set_podcast_episode_status(episode['id'], "completed") + + def get_feed(self, rss_url): + """ + Download the given URL and return a parsed feed + """ + feed_body = requests.get(rss_url, timeout=30) + return feedparser.parse(feed_body.text) + + def refresh_podcast_entry(self, podcast_id, entry): + """ + Update the database for the given podcast entry. Add it to the database if it doesn't exist. Note: we use the + episode TITLE as the uniqueness check against the database + """ + existing = self.db.get_podcast_episodes(podcast_id=podcast_id, title=entry['title']) + if existing: + return + + # find media file url + url = None + mime = None + for link in entry['links']: + if link['type'] in ["audio/mpeg", "audio/mp3"]: # TODO more formats + url = link['href'] + mime = link['type'] + break + + if not url: + logging.warning("could not find url for episode in podcast %s", podcast_id) + return + + # create entry + ep_id = self.db.add_podcast_episode(podcast_id, + time.mktime(entry['published_parsed']), + entry['title'], + entry['summary'], + url, + mime) + logging.info("added episode %s '%s'", ep_id, entry['title']) diff --git a/pysonic/schema.py b/pysonic/schema.py index 93fb0ba..31f5e16 100644 --- a/pysonic/schema.py +++ b/pysonic/schema.py @@ -2,6 +2,7 @@ table_quers = ["""CREATE TABLE 'libraries' ( 'id' INTEGER PRIMARY KEY AUTOINCREMENT, 'name' TEXT, 'path' TEXT UNIQUE);""", + """CREATE TABLE 'dirs' ( 'id' INTEGER PRIMARY KEY AUTOINCREMENT, 'library' INTEGER, @@ -9,14 +10,17 @@ table_quers = ["""CREATE TABLE 'libraries' ( 'name' TEXT, UNIQUE(parent, name) )""", + """CREATE TABLE 'genres' ( 'id' INTEGER PRIMARY KEY AUTOINCREMENT, 'name' TEXT UNIQUE)""", + """CREATE TABLE 'artists' ( 'id' INTEGER PRIMARY KEY AUTOINCREMENT, 'libraryid' INTEGER, 'dir' INTEGER UNIQUE, 'name' TEXT)""", + """CREATE TABLE 'albums' ( 'id' INTEGER PRIMARY KEY AUTOINCREMENT, 'artistid' INTEGER, @@ -27,6 +31,7 @@ table_quers = ["""CREATE TABLE 'libraries' ( 'played' INTEGER, 'plays' INTEGER NOT NULL DEFAULT 0, UNIQUE (artistid, dir));""", + """CREATE TABLE 'songs' ( 'id' INTEGER PRIMARY KEY AUTOINCREMENT, 'library' INTEGER, @@ -42,22 +47,26 @@ table_quers = ["""CREATE TABLE 'libraries' ( 'track' INTEGER, 'year' INTEGER )""", + """CREATE TABLE 'covers' ( 'id' INTEGER PRIMARY KEY AUTOINCREMENT, 'library' INTEGER, 'type' TEXT, 'size' TEXT, 'path' TEXT UNIQUE);""", + """CREATE TABLE 'users' ( 'id' INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, 'username' TEXT UNIQUE NOT NULL, 'password' TEXT NOT NULL, 'admin' BOOLEAN DEFAULT 0, 'email' TEXT)""", + """CREATE TABLE 'stars' ( 'userid' INTEGER, 'songid' INTEGER, primary key ('userid', 'songid'))""", + """CREATE TABLE 'playlists' ( 'id' INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, 'ownerid' INTEGER, @@ -67,10 +76,12 @@ table_quers = ["""CREATE TABLE 'libraries' ( 'changed' INTEGER, 'cover' INTEGER, UNIQUE ('ownerid', 'name'))""", + """CREATE TABLE 'playlist_entries' ( 'playlistid' INTEGER, 'songid' INTEGER, 'order' FLOAT)""", + """CREATE TABLE 'podcasts' ( 'id' INTEGER PRIMARY KEY AUTOINCREMENT, 'lastscan' INTEGER NOT NULL DEFAULT 0, @@ -81,14 +92,20 @@ table_quers = ["""CREATE TABLE 'libraries' ( 'cover' INTEGER, 'rss_cover' TEXT, 'status' TEXT)""", + """CREATE TABLE 'podcast_episodes' ( 'id' INTEGER PRIMARY KEY AUTOINCREMENT, 'podcastid' INTEGER, - 'date' INTEGER + 'date' INTEGER, 'title' TEXT NOT NULL, 'description' TEXT, - 'status' TEXT)""", + 'url' TEXT, + 'format' TEXT, + 'status' TEXT, + UNIQUE('podcastid', 'title'))""", + """CREATE TABLE 'meta' ( 'key' TEXT PRIMARY KEY NOT NULL, 'value' TEXT);""", + """INSERT INTO meta VALUES ('db_version', '1');"""] diff --git a/requirements.txt b/requirements.txt index 2d29bb7..256ad08 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,12 +1,17 @@ +backports.functools-lru-cache==1.5 beautifulsoup4==4.6.0 -bs4==0.0.1 -cheroot==6.0.0 +certifi==2018.1.18 +chardet==3.0.4 +cheroot==6.2.0 CherryPy==14.0.1 -lxml==4.2.1 +feedparser==5.2.1 +idna==2.6 +lxml==3.8.0 more-itertools==4.1.0 -mutagen==1.40.0 +mutagen==1.38 portend==2.2 -pysonic==0.0.1 pytz==2018.3 +requests==2.18.4 six==1.11.0 tempora==1.11 +urllib3==1.22