New database and scanner

This commit is contained in:
dave 2018-04-02 21:58:48 -07:00
parent 7d727c832d
commit afd5476ea8
5 changed files with 332 additions and 411 deletions

View File

@ -3,8 +3,8 @@ import logging
import cherrypy import cherrypy
from sqlite3 import IntegrityError from sqlite3 import IntegrityError
from pysonic.api import PysonicApi from pysonic.api import PysonicApi
from pysonic.library import PysonicLibrary, DuplicateRootException from pysonic.library import PysonicLibrary
from pysonic.database import PysonicDatabase from pysonic.database import PysonicDatabase, DuplicateRootException
def main(): def main():

View File

@ -21,12 +21,26 @@ class NotFoundError(Exception):
pass pass
class DuplicateRootException(Exception):
pass
def readcursor(func):
"""
Provides a cursor to the wrapped method as the first arg
"""
def wrapped(*args, **kwargs):
self = args[0]
with closing(self.db.cursor()) as cursor:
return func(*[self, cursor], *args[1:], **kwargs)
return wrapped
class PysonicDatabase(object): class PysonicDatabase(object):
def __init__(self, path): def __init__(self, path):
self.sqlite_opts = dict(check_same_thread=False, cached_statements=0, isolation_level=None) self.sqlite_opts = dict(check_same_thread=False)
self.path = path self.path = path
self.db = None self.db = None
self.open() self.open()
self.migrate() self.migrate()
@ -36,212 +50,92 @@ class PysonicDatabase(object):
def migrate(self): def migrate(self):
# Create db # Create db
queries = ["""CREATE TABLE 'meta' ( queries = ["""CREATE TABLE 'libraries' (
'id' INTEGER PRIMARY KEY AUTOINCREMENT,
'name' TEXT,
'path' TEXT UNIQUE);""",
"""CREATE TABLE 'artists' (
'id' INTEGER PRIMARY KEY AUTOINCREMENT,
'libraryid' INTEGER,
'dir' TEXT UNIQUE,
'name' TEXT)""",
"""CREATE TABLE 'albums' (
'id' INTEGER PRIMARY KEY AUTOINCREMENT,
'artistid' INTEGER,
'coverid' INTEGER,
'dir' TEXT,
'name' TEXT,
UNIQUE (artistid, dir));""",
"""CREATE TABLE 'songs' (
'id' INTEGER PRIMARY KEY AUTOINCREMENT,
'albumid' BOOLEAN,
'file' TEXT UNIQUE, -- path from the library root
'size' INTEGER NOT NULL DEFAULT -1,
'title' TEXT NOT NULL,
'lastscan' INTEGER NOT NULL DEFAULT -1,
'format' TEXT,
'length' INTEGER,
'bitrate' INTEGER,
'track' INTEGER,
'year' INTEGER
)""",
"""CREATE TABLE 'covers' (
'id' INTEGER PRIMARY KEY AUTOINCREMENT,
'type' TEXT,
'size' TEXT,
'path' TEXT UNIQUE);""",
"""CREATE TABLE 'users' (
'id' INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
'username' TEXT UNIQUE NOT NULL,
'password' TEXT NOT NULL,
'admin' BOOLEAN DEFAULT 0,
'email' TEXT)""",
"""CREATE TABLE 'stars' (
'userid' INTEGER,
'songid' INTEGER,
primary key ('userid', 'songid'))""",
"""CREATE TABLE 'meta' (
'key' TEXT PRIMARY KEY NOT NULL, 'key' TEXT PRIMARY KEY NOT NULL,
'value' TEXT);""", 'value' TEXT);""",
"""INSERT INTO meta VALUES ('db_version', '3');""", """INSERT INTO meta VALUES ('db_version', '1');"""]
"""CREATE TABLE 'nodes' (
'id' INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
'parent' INTEGER NOT NULL,
'isdir' BOOLEAN NOT NULL,
'size' INTEGER NOT NULL DEFAULT -1,
'name' TEXT NOT NULL,
'type' TEXT,
'title' TEXT,
'album' TEXT,
'artist' TEXT,
'metadata' TEXT
)""",
"""CREATE TABLE 'users' (
'id' INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
'username' TEXT UNIQUE NOT NULL,
'password' TEXT NOT NULL,
'admin' BOOLEAN DEFAULT 0,
'email' TEXT)""",
"""CREATE TABLE 'stars' (
'userid' INTEGER,
'nodeid' INTEGER,
primary key ('userid', 'nodeid'))"""]
with closing(self.db.cursor()) as cursor: with closing(self.db.cursor()) as cursor:
cursor.execute("SELECT * FROM sqlite_master WHERE type='table' AND name='meta';") cursor.execute("SELECT * FROM sqlite_master WHERE type='table' AND name='meta'")
# Initialize DB # Initialize DB
if len(cursor.fetchall()) == 0: if len(cursor.fetchall()) == 0:
logging.warning("Initializing database") logging.warning("Initializing database")
for query in queries: for query in queries:
print(query)
cursor.execute(query) cursor.execute(query)
cursor.execute("COMMIT")
else: else:
# Migrate if old db exists # Migrate if old db exists
version = int(cursor.execute("SELECT * FROM meta WHERE key='db_version';").fetchone()['value']) # cursor.execute("""UPDATE meta SET value=? WHERE key="db_version";""", (str(version), ))
if version < 1: # logging.warning("db schema is version {}".format(version))
logging.warning("migrating database to v1 from %s", version)
users_table = """CREATE TABLE 'users' (
'id' INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
'username' TEXT UNIQUE NOT NULL,
'password' TEXT NOT NULL,
'admin' BOOLEAN DEFAULT 0,
'email' TEXT)"""
cursor.execute(users_table)
version = 1
if version < 2:
logging.warning("migrating database to v2 from %s", version)
stars_table = """CREATE TABLE 'stars' (
'userid' INTEGER,
'nodeid' INTEGER,
primary key ('userid', 'nodeid'))"""
cursor.execute(stars_table)
version = 2
if version < 3:
logging.warning("migrating database to v3 from %s", version)
size_col = """ALTER TABLE nodes ADD 'size' INTEGER NOT NULL DEFAULT -1;"""
cursor.execute(size_col)
version = 3
cursor.execute("""UPDATE meta SET value=? WHERE key="db_version";""", (str(version), ))
logging.warning("db schema is version {}".format(version))
# Virtual file tree
def getnode(self, node_id):
return self.getnodes(node_id=node_id)[0]
def _populate_meta(self, node):
node['metadata'] = self.decode_metadata(node['metadata'])
return node
def getnodes(self, *parent_ids, node_id=None, types=None, limit=None, order=None):
"""
Find nodes that match the passed paramters.
:param parent_ids: one or more parents to find children of
:type parent_ids: int
:param node_id: single node id to return
:type node_id: int
:param types: filter by type column
:type types: list
:param limit: number of records to limit to
:param order: one of ("rand") to select ordering mode
"""
query = "SELECT * FROM nodes WHERE "
qargs = []
def add_filter(name, values):
nonlocal query
nonlocal qargs
query += "{} in (".format(name)
for value in (values if type(values) in [list, tuple] else [values]):
query += "?, "
qargs += [value]
query = query.rstrip(", ")
query += ") AND"
if node_id:
add_filter("id", node_id)
if parent_ids:
add_filter("parent", parent_ids)
if types:
add_filter("type", types)
query = query.rstrip(" AND").rstrip("WHERE ")
if order:
query += "ORDER BY "
if order == "rand":
query += "RANDOM()"
if limit: # TODO 2-item tuple limit
query += " limit {}".format(limit)
with closing(self.db.cursor()) as cursor:
return list(map(self._populate_meta, cursor.execute(query, qargs).fetchall()))
def addnode(self, parent_id, fspath, name, size=-1):
fullpath = os.path.join(fspath, name)
is_dir = os.path.isdir(fullpath)
return self._addnode(parent_id, name, is_dir, size=size)
def _addnode(self, parent_id, name, is_dir=True, size=-1):
with closing(self.db.cursor()) as cursor:
cursor.execute("INSERT INTO nodes (parent, isdir, name, size) VALUES (?, ?, ?, ?);",
(parent_id, 1 if is_dir else 0, name, size))
return self.getnode(cursor.lastrowid)
def delnode(self, node_id):
deleted = 1
for child in self.getnodes(node_id):
deleted += self.delnode(child["id"])
with closing(self.db.cursor()) as cursor:
cursor.execute("DELETE FROM nodes WHERE id=?;", (node_id, ))
return deleted
def update_metadata(self, node_id, mergedict=None, **kwargs):
mergedict = mergedict if mergedict else {}
mergedict.update(kwargs)
with closing(self.db.cursor()) as cursor:
for table_key in keys_in_table:
if table_key in mergedict:
cursor.execute("UPDATE nodes SET {}=? WHERE id=?;".format(table_key),
(mergedict[table_key], node_id))
other_meta = {k: v for k, v in mergedict.items() if k not in keys_in_table}
if other_meta:
metadata = self.get_metadata(node_id)
metadata.update(other_meta)
cursor.execute("UPDATE nodes SET metadata=? WHERE id=?;", (json.dumps(metadata), node_id, ))
def get_metadata(self, node_id):
node = self.getnode(node_id)
meta = node["metadata"]
meta.update({item: node[item] for item in keys_in_table})
return meta
def decode_metadata(self, metadata):
if metadata:
return json.loads(metadata)
return {}
def hashit(self, unicode_string):
return sha512(unicode_string.encode('UTF-8')).hexdigest()
def validate_password(self, realm, username, password):
with closing(self.db.cursor()) as cursor:
users = cursor.execute("SELECT * FROM users WHERE username=? AND password=?;",
(username, self.hashit(password))).fetchall()
return bool(users)
def add_user(self, username, password, is_admin=False):
with closing(self.db.cursor()) as cursor:
cursor.execute("INSERT INTO users (username, password, admin) VALUES (?, ?, ?)",
(username, self.hashit(password), is_admin))
def update_user(self, username, password, is_admin=False):
with closing(self.db.cursor()) as cursor:
cursor.execute("UPDATE users SET password=?, admin=? WHERE username=?;",
(self.hashit(password), is_admin, username))
def get_user(self, user):
with closing(self.db.cursor()) as cursor:
try:
column = "id" if type(user) is int else "username"
return cursor.execute("SELECT * FROM users WHERE {}=?;".format(column), (user, )).fetchall()[0]
except IndexError:
raise NotFoundError("User doesn't exist")
def set_starred(self, user_id, node_id, starred=True):
with closing(self.db.cursor()) as cursor:
if starred:
query = "INSERT INTO stars (userid, nodeid) VALUES (?, ?);"
else:
query = "DELETE FROM stars WHERE userid=? and nodeid=?;"
try:
cursor.execute(query, (user_id, node_id))
except sqlite3.IntegrityError:
pass pass
def get_starred_items(self, for_user_id=None): def add_root(self, path, name="Library"):
"""
Add a new library root. Returns the root ID or raises on collision
:param path: normalized absolute path to add to the library
:type path: str:
:return: int
:raises: sqlite3.IntegrityError
"""
assert path.startswith("/")
with closing(self.db.cursor()) as cursor: with closing(self.db.cursor()) as cursor:
q = """SELECT n.* FROM nodes as n INNER JOIN stars as s ON s.nodeid = n.id""" try:
qargs = [] cursor.execute("INSERT INTO libraries ('name', 'path') VALUES (?, ?)", (name, path, ))
if for_user_id: cursor.execute("COMMIT")
q += """ AND userid=?""" return cursor.lastrowid
qargs += [int(for_user_id)] except sqlite3.IntegrityError:
return list(map(self._populate_meta, raise DuplicateRootException("Root '{}' already exists".format(path))
cursor.execute(q, qargs).fetchall()))
@readcursor
def get_libraries(self, cursor):
libs = []
cursor.execute("SELECT * FROM libraries")
for row in cursor:
libs.append(row)
return libs

View File

@ -28,10 +28,6 @@ class NoDataException(Exception):
pass pass
class DuplicateRootException(Exception):
pass
class PysonicLibrary(object): class PysonicLibrary(object):
def __init__(self, database): def __init__(self, database):
self.db = database self.db = database
@ -39,83 +35,19 @@ class PysonicLibrary(object):
logging.info("library ready") logging.info("library ready")
def update(self): def update(self):
"""
Start the library media scanner and
"""
self.scanner.init_scan() self.scanner.init_scan()
def add_dir(self, dir_path): def add_root_dir(self, path):
dir_path = os.path.abspath(os.path.normpath(dir_path))
libraries = [i['metadata']['fspath'] for i in self.db.getnodes(-1)]
if dir_path in libraries:
raise DuplicateRootException("Dir already in library")
else:
new_root = self.db._addnode(-1, 'New Library', is_dir=True)
self.db.update_metadata(new_root['id'], fspath=dir_path)
#@memoize
def get_libraries(self):
""" """
Libraries are top-level nodes The music library consists of a number of root dirs. This adds a new root
""" """
return self.db.getnodes(-1) path = os.path.abspath(os.path.normpath(path))
self.db.add_root(path)
#@memoize
def get_artists(self):
# Assume artists are second level dirs
return self.db.getnodes(*[item["id"] for item in self.get_libraries()])
def get_dir(self, dirid):
return self.db.getnode(dirid)
def get_dir_children(self, dirid):
return self.db.getnodes(dirid)
#@memoize
def get_albums(self):
return self.db.getnodes(*[item["id"] for item in self.get_artists()])
#@memoize
def get_filepath(self, nodeid):
parents = [self.db.getnode(nodeid)]
while parents[-1]['parent'] != -1:
parents.append(self.db.getnode(parents[-1]['parent']))
root = parents.pop()
parents.reverse()
return os.path.join(root['metadata']['fspath'], *[i['name'] for i in parents])
def get_file_metadata(self, nodeid):
return self.db.get_metadata(nodeid)
def get_artist_info(self, item_id):
# artist = self.db.getnode(item_id)
return {"biography": "placeholder biography",
"musicBrainzId": "playerholder",
"lastFmUrl": "https://www.last.fm/music/Placeholder",
"smallImageUrl": "",
"mediumImageUrl": "",
"largeImageUrl": "",
"similarArtists": []}
def set_starred(self, username, node_id, starred):
self.db.set_starred(self.db.get_user(username)["id"], node_id, starred)
def get_stars(self, user, user_id):
self.db.get_stars()
def get_user(self, user):
return self.db.get_user(user)
def get_starred(self, username):
return self.db.get_starred_items(self.db.get_user(username)["id"])
def get_songs(self, limit=50, shuffle=True):
return self.db.getnodes(types=MUSIC_TYPES, limit=limit, order="rand")
def get_song(self, id=None):
if id:
return self.db.getnode(id)
else:
return self.db.getnodes(types=MUSIC_TYPES, limit=1, order="rand")
def report_transcode(self, item_id, bitrate, num_bytes):
assert type(bitrate) is int and bitrate > 0 and bitrate <= 320
logging.info("Got transcode report of {} for item {} @ {}".format(num_bytes, item_id, bitrate))
self.db.update_metadata(item_id, {"transcoded_{}_size".format(bitrate):int(num_bytes)})

View File

@ -1,10 +1,11 @@
import os import os
import re import re
import logging import logging
from contextlib import closing
import mimetypes import mimetypes
from time import time from time import time
from threading import Thread from threading import Thread
from pysonic.types import KNOWN_MIMES, MUSIC_TYPES, MPX_TYPES, FLAC_TYPES, WAV_TYPES from pysonic.types import KNOWN_MIMES, MUSIC_TYPES, MPX_TYPES, FLAC_TYPES, WAV_TYPES, MUSIC_EXTENSIONS, IMAGE_EXTENSIONS, IMAGE_TYPES
from mutagen.id3 import ID3 from mutagen.id3 import ID3
from mutagen import MutagenError from mutagen import MutagenError
from mutagen.id3._util import ID3NoHeaderError from mutagen.id3._util import ID3NoHeaderError
@ -25,151 +26,236 @@ class PysonicFilesystemScanner(object):
self.scanner.start() self.scanner.start()
def rescan(self): def rescan(self):
# Perform directory scan """
logging.warning("Beginning library rescan") Perform a full scan of the media library's files
"""
start = time() start = time()
for parent in self.library.get_libraries(): logging.warning("Beginning library rescan")
meta = parent["metadata"] for parent in self.library.db.get_libraries():
logging.info("Scanning {}".format(meta["fspath"])) logging.info("Scanning {}".format(parent["path"]))
self.scan_root(parent["id"], parent["path"])
logging.warning("Rescan complete in %ss", round(time() - start, 3))
def recurse_dir(path, parent): def scan_root(self, pid, root):
logging.info("Scanning {}".format(path)) """
# create or update the database of nodes by comparing sets of names Scan a single root the library
fs_entries = set(os.listdir(path)) :param pid: parent ID
db_entires = self.library.db.getnodes(parent["id"]) :param root: absolute path to scan
db_entires_names = set([i['name'] for i in db_entires]) """
to_delete = db_entires_names - fs_entries logging.warning("Beginning file scan for library %s", pid)
to_create = fs_entries - db_entires_names root_depth = len(self.split_path(root))
for path, dirs, files in os.walk(root):
child = self.split_path(path)[root_depth:]
self.scan_dir(pid, root, child, dirs, files)
# If any size have changed, mark the file to be rescanned logging.warning("Beginning metadata scan for library %s", pid)
for entry in db_entires: self.scan_metadata(pid, root, freshonly=True)
finfo = os.stat(os.path.join(path, entry["name"]))
if finfo.st_size != entry["size"]:
logging.info("{} has changed in size, marking for meta rescan".format(entry["id"]))
self.library.db.update_metadata(entry['id'], id3_done=False, size=finfo.st_size)
# Create any nodes not found in the db logging.warning("Finished scan for library %s", pid)
for create in to_create:
new_finfo = os.stat(os.path.join(path, create))
new_node = self.library.db.addnode(parent["id"], path, create, size=new_finfo.st_size)
logging.info("Added {}".format(os.path.join(path, create)))
db_entires.append(new_node)
# Delete any db nodes not found on disk def scan_dir(self, pid, root, path, dirs, files):
for delete in to_delete: """
logging.info("Prune ", delete, "in parent", path) Scan a single directory in the library.
node = [i for i in db_entires if i["name"] == delete] :param pid: parent id
if node: :param root: library root path
deleted = self.library.db.delnode(node[0]["id"]) :param path: scan location path, as a list of subdirs within the root
logging.info("Pruned {}, deleting total of {}".format(node, deleted)) :param dirs: dirs in the current path
:param files: files in the current path
"""
# If there are no files then just bail
if not files:
return
# If it is the library root just bail
if len(path) == 0:
return
for entry in db_entires: # Guess an artist from the dir
if entry["name"] in to_delete: artist = path[0]
# Guess an album from the dir, if possible
album = None
if len(path) > 1:
album = path[-1]
with closing(self.library.db.db.cursor()) as cursor:
# Create artist entry
cursor.execute("SELECT * FROM artists WHERE dir = ?", (artist, ))
row = cursor.fetchone()
if row:
artist_id = row['id']
else:
cursor.execute("INSERT INTO artists (libraryid, dir, name) VALUES (?, ?, ?)",
(pid, artist, artist))
artist_id = cursor.lastrowid
# Create album entry
album_id = None
libpath = os.path.join(*path)
if album:
cursor.execute("SELECT * FROM albums WHERE artistid = ? AND dir = ?", (artist_id, libpath, ))
row = cursor.fetchone()
if row:
album_id = row['id']
else:
cursor.execute("INSERT INTO albums (artistid, dir, name) VALUES (?, ?, ?)",
(artist_id, libpath, path[-1]))
album_id = cursor.lastrowid
new_files = False
for file in files:
if not any([file.endswith(".{}".format(i)) for i in MUSIC_EXTENSIONS]):
continue
fpath = os.path.join(libpath, file)
cursor.execute("SELECT id FROM songs WHERE file=?", (fpath, ))
if not cursor.fetchall():
# We leave most fields blank now and return later
cursor.execute("INSERT INTO songs (albumid, file, size, title) "
"VALUES (?, ?, ?, ?)",
(album_id,
fpath,
os.stat(os.path.join(root, fpath)).st_size,
file, ))
new_files = True
# Create cover entry TODO we can probably skip this if there were no new audio files?
if album_id:
for file in files:
if not any([file.endswith(".{}".format(i)) for i in IMAGE_EXTENSIONS]):
continue continue
if int(entry['isdir']): # 1 means dir fpath = os.path.join(libpath, file)
recurse_dir(os.path.join(path, entry["name"]), entry) cursor.execute("SELECT id FROM covers WHERE path=?", (fpath, ))
if not cursor.fetchall():
# We leave most fields blank now and return later
cursor.execute("INSERT INTO covers (path) VALUES (?);", (fpath, ))
cursor.execute("UPDATE albums SET coverid=? WHERE id=?", (cursor.lastrowid, album_id))
break
# Populate all files for this top-level root if new_files: # Commit after each dir IF audio files were found. no audio == dump the artist
recurse_dir(meta["fspath"], parent) cursor.execute("COMMIT")
#
#
#
# Add simple metadata
for artist_dir in self.library.db.getnodes(parent["id"]):
artist = artist_dir["name"]
for album_dir in self.library.db.getnodes(artist_dir["id"]):
album = album_dir["name"]
album_meta = album_dir["metadata"]
for track_file in self.library.db.getnodes(album_dir["id"]):
title = track_file["name"]
if not track_file["title"]:
self.library.db.update_metadata(track_file["id"], artist=artist, album=album, title=title)
logging.info("Adding simple metadata for {}/{}/{} #{}".format(artist, album,
title, track_file["id"]))
if not album_dir["album"]:
self.library.db.update_metadata(album_dir["id"], artist=artist, album=album)
logging.info("Adding simple metadata for {}/{} #{}".format(artist, album, album_dir["id"]))
if not artist_dir["artist"]:
self.library.db.update_metadata(artist_dir["id"], artist=artist)
logging.info("Adding simple metadata for {} #{}".format(artist, artist_dir["id"]))
if title in ["cover.jpg", "cover.png"] and 'cover' not in album_meta:
# // add cover art
self.library.db.update_metadata(album_dir["id"], cover=track_file["id"])
logging.info("added cover for {}".format(album_dir['id']))
if track_file["type"] is None: def split_path(self, path):
fpath = self.library.get_filepath(track_file['id']) """
ftype, extra = mimetypes.guess_type(fpath) Given a path like /foo/bar, return ['foo', 'bar']
"""
parts = []
head = path
while True:
head, tail = os.path.split(head)
if tail:
parts.append(tail)
else:
break
parts.reverse()
return parts
if ftype in KNOWN_MIMES: def scan_metadata(self, pid, root, freshonly=False):
self.library.db.update_metadata(track_file["id"], type=ftype) """
logging.info("added type {} for {}".format(ftype, track_file['id'])) Iterate through files in the library and update metadata
else: :param freshonly: only update metadata on files that have never been scanned before
logging.warning("Ignoring unreadable file at {}, unknown ftype ({}, {})" """
.format(fpath, ftype, extra)) q = "SELECT * FROM songs "
# if freshonly:
# q += "WHERE lastscan = -1 "
# q += "ORDER BY albumid"
# Add advanced id3 / media info metadata
for artist_dir in self.library.db.getnodes(parent["id"]):
artist = artist_dir["name"]
for album_dir in self.library.db.getnodes(artist_dir["id"]):
album = album_dir["name"]
album_meta = album_dir["metadata"]
for track_file in self.library.db.getnodes(album_dir["id"]):
track_meta = track_file['metadata']
title = track_file["name"]
fpath = self.library.get_filepath(track_file["id"])
if track_meta.get('id3_done', False) or track_file.get("type", None) not in MUSIC_TYPES:
continue
tags = {'id3_done': True}
try:
audio = None
if track_file.get("type", None) in MPX_TYPES:
audio = MP3(fpath)
if audio.info.sketchy:
logging.warning("media reported as sketchy: %s", fpath)
elif track_file.get("type", None) in FLAC_TYPES:
audio = FLAC(fpath)
else:
audio = ID3(fpath)
# print(audio.pprint())
try:
tags["media_length"] = int(audio.info.length)
except (ValueError, AttributeError):
pass
try:
bitrate = int(audio.info.bitrate)
tags["media_bitrate"] = bitrate
tags["media_kbitrate"] = int(bitrate / 1024)
except (ValueError, AttributeError):
pass
try:
tags["track"] = int(RE_NUMBERS.findall(''.join(audio['TRCK'].text))[0])
except (KeyError, IndexError):
pass
try:
tags["id3_artist"] = ''.join(audio['TPE1'].text)
except KeyError:
pass
try:
tags["id3_album"] = ''.join(audio['TALB'].text)
except KeyError:
pass
try:
tags["id3_title"] = ''.join(audio['TIT2'].text)
except KeyError:
pass
try:
tags["id3_year"] = audio['TDRC'].text[0].year
except (KeyError, IndexError):
pass
logging.info("got all media info from %s", fpath)
except ID3NoHeaderError:
pass
except MutagenError as m:
logging.error("failed to read audio information: %s", m)
continue
self.library.db.update_metadata(track_file["id"], **tags)
logging.warning("Library scan complete in {}s".format(round(time() - start, 2))) with closing(self.library.db.db.cursor()) as reader, \
closing(self.library.db.db.cursor()) as writer:
processed = 0 # commit batching counter
for row in reader.execute(q):
# Find meta, bail if the file was unreadable
# TODO file metadata scanning could be done in parallel
meta = self.scan_file_metadata(os.path.join(root, row['file']))
if not meta:
continue
# Meta may have additional keys that arent in the songs table, omit them
song_attrs = ["title", "lastscan", "format", "length", "bitrate", "track", "year"]
song_meta = {k: v for k, v in meta.items() if k in song_attrs}
# Update the song row
q = "UPDATE songs SET "
params = []
for key, value in song_meta.items():
q += "{}=?, ".format(key)
params.append(value)
q += "lastscan=? WHERE id=?"
params += [int(time()), row["id"]]
writer.execute(q, params)
# If the metadata has an artist or album name, update the relevant items
if "album" in meta:
writer.execute("UPDATE albums SET name=? WHERE id=?", (meta["album"], row["albumid"]))
if "artist" in meta:
album = writer.execute("SELECT artistid FROM albums WHERE id=?", (row['albumid'], )).fetchone()
writer.execute("UPDATE artists SET name=? WHERE id=?", (meta["artist"], album["artistid"]))
# Commit every 50 items
processed += 1
if processed > 50:
writer.execute("COMMIT")
processed = 0
if processed != 0:
writer.execute("COMMIT")
def scan_file_metadata(self, fpath):
"""
Scan the file for metadata.
:param fpath: path to the file to scan
"""
ftype, extra = mimetypes.guess_type(fpath)
if ftype in MUSIC_TYPES:
return self.scan_mutagen_metadata(fpath, ftype)
def scan_mutagen_metadata(self, fpath, ftype):
meta = {"format": ftype}
try:
# Open file with mutagen
if ftype in MPX_TYPES:
audio = MP3(fpath)
if audio.info.sketchy:
logging.warning("media reported as sketchy: %s", fpath)
elif ftype in FLAC_TYPES:
audio = FLAC(fpath)
else:
audio = ID3(fpath)
except ID3NoHeaderError:
return
except MutagenError as m:
logging.error("failed to read audio information: %s", m)
return
try:
meta["length"] = int(audio.info.length)
except (ValueError, AttributeError):
pass
try:
bitrate = int(audio.info.bitrate)
meta["bitrate"] = bitrate
# meta["kbitrate"] = int(bitrate / 1024)
except (ValueError, AttributeError):
pass
try:
meta["track"] = int(RE_NUMBERS.findall(''.join(audio['TRCK'].text))[0])
except (KeyError, IndexError):
pass
try:
meta["artist"] = ''.join(audio['TPE1'].text)
except KeyError:
pass
try:
meta["album"] = ''.join(audio['TALB'].text)
except KeyError:
pass
try:
meta["title"] = ''.join(audio['TIT2'].text)
except KeyError:
pass
try:
meta["year"] = audio['TDRC'].text[0].year
except (KeyError, IndexError):
pass
logging.info("got all media info from %s", fpath)
return meta

View File

@ -1,7 +1,16 @@
KNOWN_MIMES = ["audio/mpeg", "audio/flac", "audio/x-wav", "image/jpeg", "image/png"] KNOWN_MIMES = ["audio/mpeg", "audio/flac", "audio/x-wav", "image/jpeg", "image/png"]
MUSIC_TYPES = ["audio/mpeg", "audio/flac", "audio/x-wav"] MUSIC_TYPES = ["audio/mpeg", "audio/flac", "audio/x-wav"]
MPX_TYPES = ["audio/mpeg"] MPX_TYPES = ["audio/mpeg"]
FLAC_TYPES = ["audio/flac"] FLAC_TYPES = ["audio/flac"]
WAV_TYPES = ["audio/x-wav"] WAV_TYPES = ["audio/x-wav"]
IMAGE_TYPES = ["image/jpeg", "image/png", "image/gif"] IMAGE_TYPES = ["image/jpeg", "image/png", "image/gif"]
IMAGE_EXTENSIONS = ["jpg", "jpeg", "png", "gif"]
MUSIC_EXTENSIONS = ["mp3", "flac", "wav"]