from threading import Thread, Timer from concurrent.futures import ThreadPoolExecutor from queue import Queue import shutil import logging import os import requests import feedparser import time class PodcastSettings(object): """seconds between updating podcasts""" refresh_interval = 3 #60 * 60 """how many seconds to wait after initialization to start refreshing podcasts""" startup_delay = 30 """how many podcasts can be scanned at once""" scan_threads = 4 """root path of downloaded podcasts""" path = "podcasts" """how many of the most recent episodes to download""" download_episodes = 2 class PodcastManager(Thread): def __init__(self, db): super().__init__() self.daemon = True self.db = db self.settings = PodcastSettings self.q = Queue() self.start() def run(self): """ In a loop forever, query for podcasts in need of scanning for new episodes. Wait for a scan being requested (aka a queue item) as the signal to begin scanning. """ self.schedule_rescan() while True: self.q.get() self.refresh_podcasts() def interval_scan(self): """ Schedule the next automated rescan. Request a scan be executed. """ self.request_rescan() #self.schedule_rescan() def schedule_rescan(self): """ Call the next interval scan later """ t = Timer(self.settings.refresh_interval, self.interval_scan) t.daemon = True t.start() def request_rescan(self): """ Add an item to the queue """ self.q.put(None) def refresh_podcasts(self): """ Refresh all the podcasts """ logging.info("rescanning podcasts") # If any episodes are marked as "downloading", it's a lie and left over from before the crash # TODO this should happen earlier than the scan for entry in self.db.get_podcast_episodes(status="downloading"): self.db.set_podcast_episode_status(entry['id'], "new") futures = [] # TODO the TPE doesn't die as a daemon thread :| with ThreadPoolExecutor(max_workers=self.settings.scan_threads) as pool: for item in self.db.get_podcasts(): futures.append(pool.submit(self.refresh_podcast, item, )) for item in futures: e = item.exception() if e: raise e # for item in self.db.get_podcasts(): # self.refresh_podcast(item) logging.info("podcast refresh complete") #TODO all episodes in 'new' status change to 'skipped' def refresh_podcast(self, podcast): """ Refresh all metadata and episodes of a single podcast """ logging.info("updating podcast %s '%s' ", podcast['id'], podcast['title']) feed = self.get_feed(podcast['url']) for entry in feed['entries']: self.refresh_podcast_entry(podcast['id'], entry) self.refresh_podcast_episodes(podcast['id']) #TODO update the feed's description # self.udpate_feed_meta(feed['feed']) # 'image': {'href': 'http://sysadministrivia.com/images/1.jpg', # 'link': 'http://sysadministrivia.com/', # 'links': [{'href': 'http://sysadministrivia.com/', # 'rel': 'alternate', # 'type': 'text/html'}], # 'title': 'The Sysadministrivia Podcast', # 'title_detail': {'base': '', # 'language': 'en', # 'type': 'text/plain', # 'value': 'The Sysadministrivia Podcast'}}, # 'link': 'http://sysadministrivia.com/', # 'subtitle': 'We podcast all things system administration/engineering/infosec, ' # 'with a strong focus on GNU/Linux. We use F/OSS software whenever ' # 'possible in the production of these podcasts. Please be sure to ' # 'view our show notes on the site!', # 'title': 'The Sysadministrivia Podcast', def refresh_podcast_episodes(self, podcast_id): """ Check that the most recent X episodes are downloaded. Start downloads if not. """ for entry in self.db.get_podcast_episodes(podcast_id=podcast_id, limit=self.settings.download_episodes): if entry["status"] == "new": self.download_episode(entry) def download_episode(self, episode): """ Download the episode: - mark status as downloading - clean up any tmp files from previous failures - create the dir - stream the url to temp file - rename the temp file to final location - mark episode as downloaded """ self.db.set_podcast_episode_status(episode['id'], "downloading") ep_dir = os.path.join(self.settings.path, str(episode['podcastid'])) ep_path = os.path.join(ep_dir, "{}.mp3".format(episode['id'])) ep_tmppath = os.path.join(ep_dir, ".{}.mp3".format(episode['id'])) os.makedirs(ep_dir, exist_ok=True) if os.path.exists(ep_path): os.unlink(ep_path) # previous failed downloads if os.path.exists(ep_tmppath): os.unlink(ep_tmppath) # previous failed downloads logging.info("fetching %s", episode['url']) r = requests.get(episode['url'], stream=True) r.raise_for_status() with open(ep_tmppath, 'wb') as f: shutil.copyfileobj(r.raw, f) os.rename(ep_tmppath, ep_path) # TODO verify or update MIME from that of the url self.db.set_podcast_episode_status(episode['id'], "completed") def get_feed(self, rss_url): """ Download the given URL and return a parsed feed """ feed_body = requests.get(rss_url, timeout=30) return feedparser.parse(feed_body.text) def refresh_podcast_entry(self, podcast_id, entry): """ Update the database for the given podcast entry. Add it to the database if it doesn't exist. Note: we use the episode TITLE as the uniqueness check against the database """ existing = self.db.get_podcast_episodes(podcast_id=podcast_id, title=entry['title']) if existing: return # find media file url url = None mime = None for link in entry['links']: if link['type'] in ["audio/mpeg", "audio/mp3"]: # TODO more formats url = link['href'] mime = link['type'] break if not url: logging.warning("could not find url for episode in podcast %s", podcast_id) return # create entry ep_id = self.db.add_podcast_episode(podcast_id, time.mktime(entry['published_parsed']), entry['title'], entry['summary'], url, mime) logging.info("added episode %s '%s'", ep_id, entry['title'])