import os import json import sqlite3 import logging from hashlib import sha512 from contextlib import closing logging = logging.getLogger("database") keys_in_table = ["title", "album", "artist", "type", "size"] def dict_factory(cursor, row): d = {} for idx, col in enumerate(cursor.description): d[col[0]] = row[idx] return d class NotFoundError(Exception): pass class DuplicateRootException(Exception): pass def readcursor(func): """ Provides a cursor to the wrapped method as the first arg """ def wrapped(*args, **kwargs): self = args[0] with closing(self.db.cursor()) as cursor: return func(*[self, cursor], *args[1:], **kwargs) return wrapped class PysonicDatabase(object): def __init__(self, path): self.sqlite_opts = dict(check_same_thread=False) self.path = path self.db = None self.open() self.migrate() def open(self): self.db = sqlite3.connect(self.path, **self.sqlite_opts) self.db.row_factory = dict_factory def migrate(self): # Create db queries = ["""CREATE TABLE 'libraries' ( 'id' INTEGER PRIMARY KEY AUTOINCREMENT, 'name' TEXT, 'path' TEXT UNIQUE);""", """CREATE TABLE 'artists' ( 'id' INTEGER PRIMARY KEY AUTOINCREMENT, 'libraryid' INTEGER, 'dir' TEXT UNIQUE, 'name' TEXT)""", """CREATE TABLE 'albums' ( 'id' INTEGER PRIMARY KEY AUTOINCREMENT, 'artistid' INTEGER, 'coverid' INTEGER, 'dir' TEXT, 'name' TEXT, UNIQUE (artistid, dir));""", """CREATE TABLE 'songs' ( 'id' INTEGER PRIMARY KEY AUTOINCREMENT, 'albumid' BOOLEAN, 'file' TEXT UNIQUE, -- path from the library root 'size' INTEGER NOT NULL DEFAULT -1, 'title' TEXT NOT NULL, 'lastscan' INTEGER NOT NULL DEFAULT -1, 'format' TEXT, 'length' INTEGER, 'bitrate' INTEGER, 'track' INTEGER, 'year' INTEGER )""", """CREATE TABLE 'covers' ( 'id' INTEGER PRIMARY KEY AUTOINCREMENT, 'type' TEXT, 'size' TEXT, 'path' TEXT UNIQUE);""", """CREATE TABLE 'users' ( 'id' INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, 'username' TEXT UNIQUE NOT NULL, 'password' TEXT NOT NULL, 'admin' BOOLEAN DEFAULT 0, 'email' TEXT)""", """CREATE TABLE 'stars' ( 'userid' INTEGER, 'songid' INTEGER, primary key ('userid', 'songid'))""", """CREATE TABLE 'meta' ( 'key' TEXT PRIMARY KEY NOT NULL, 'value' TEXT);""", """INSERT INTO meta VALUES ('db_version', '1');"""] with closing(self.db.cursor()) as cursor: cursor.execute("SELECT * FROM sqlite_master WHERE type='table' AND name='meta'") # Initialize DB if len(cursor.fetchall()) == 0: logging.warning("Initializing database") for query in queries: print(query) cursor.execute(query) cursor.execute("COMMIT") else: # Migrate if old db exists # cursor.execute("""UPDATE meta SET value=? WHERE key="db_version";""", (str(version), )) # logging.warning("db schema is version {}".format(version)) pass def add_root(self, path, name="Library"): """ Add a new library root. Returns the root ID or raises on collision :param path: normalized absolute path to add to the library :type path: str: :return: int :raises: sqlite3.IntegrityError """ assert path.startswith("/") with closing(self.db.cursor()) as cursor: try: cursor.execute("INSERT INTO libraries ('name', 'path') VALUES (?, ?)", (name, path, )) cursor.execute("COMMIT") return cursor.lastrowid except sqlite3.IntegrityError: raise DuplicateRootException("Root '{}' already exists".format(path)) @readcursor def get_libraries(self, cursor): libs = [] cursor.execute("SELECT * FROM libraries") for row in cursor: libs.append(row) return libs @readcursor def add_user(self, cursor, username, password, is_admin=False): cursor.execute("INSERT INTO users (username, password, admin) VALUES (?, ?, ?)", (username, self.hashit(password), is_admin)) cursor.execute("COMMIT") @readcursor def update_user(self, cursor, username, password, is_admin=False): cursor.execute("UPDATE users SET password=?, admin=? WHERE username=?;", (self.hashit(password), is_admin, username)) cursor.execute("COMMIT") @readcursor def get_user(self, cursor, user): try: column = "id" if type(user) is int else "username" return cursor.execute("SELECT * FROM users WHERE {}=?;".format(column), (user, )).fetchall()[0] except IndexError: raise NotFoundError("User doesn't exist") def hashit(self, unicode_string): return sha512(unicode_string.encode('UTF-8')).hexdigest()