pysonic/pysonic/database.py

306 lines
10 KiB
Python

import os
import json
import sqlite3
import logging
from hashlib import sha512
from contextlib import closing
logging = logging.getLogger("database")
keys_in_table = ["title", "album", "artist", "type", "size"]
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
class NotFoundError(Exception):
pass
class DuplicateRootException(Exception):
pass
def readcursor(func):
"""
Provides a cursor to the wrapped method as the first arg
"""
def wrapped(*args, **kwargs):
self = args[0]
with closing(self.db.cursor()) as cursor:
return func(*[self, cursor], *args[1:], **kwargs)
return wrapped
class PysonicDatabase(object):
def __init__(self, path):
self.sqlite_opts = dict(check_same_thread=False)
self.path = path
self.db = None
self.open()
self.migrate()
def open(self):
self.db = sqlite3.connect(self.path, **self.sqlite_opts)
self.db.row_factory = dict_factory
def migrate(self):
# Create db
queries = ["""CREATE TABLE 'libraries' (
'id' INTEGER PRIMARY KEY AUTOINCREMENT,
'name' TEXT,
'path' TEXT UNIQUE);""",
"""CREATE TABLE 'dirs' (
'id' INTEGER PRIMARY KEY AUTOINCREMENT,
'library' INTEGER,
'parent' INTEGER,
'name' TEXT,
UNIQUE(parent, name)
)""",
"""CREATE TABLE 'artists' (
'id' INTEGER PRIMARY KEY AUTOINCREMENT,
'libraryid' INTEGER,
'dir' INTEGER UNIQUE,
'name' TEXT)""",
"""CREATE TABLE 'albums' (
'id' INTEGER PRIMARY KEY AUTOINCREMENT,
'artistid' INTEGER,
'coverid' INTEGER,
'dir' INTEGER,
'name' TEXT,
UNIQUE (artistid, dir));""",
"""CREATE TABLE 'songs' (
'id' INTEGER PRIMARY KEY AUTOINCREMENT,
'albumid' BOOLEAN,
'file' TEXT UNIQUE, -- path from the library root
'size' INTEGER NOT NULL DEFAULT -1,
'title' TEXT NOT NULL,
'lastscan' INTEGER NOT NULL DEFAULT -1,
'format' TEXT,
'length' INTEGER,
'bitrate' INTEGER,
'track' INTEGER,
'year' INTEGER
)""",
"""CREATE TABLE 'covers' (
'id' INTEGER PRIMARY KEY AUTOINCREMENT,
'type' TEXT,
'size' TEXT,
'path' TEXT UNIQUE);""",
"""CREATE TABLE 'users' (
'id' INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
'username' TEXT UNIQUE NOT NULL,
'password' TEXT NOT NULL,
'admin' BOOLEAN DEFAULT 0,
'email' TEXT)""",
"""CREATE TABLE 'stars' (
'userid' INTEGER,
'songid' INTEGER,
primary key ('userid', 'songid'))""",
"""CREATE TABLE 'meta' (
'key' TEXT PRIMARY KEY NOT NULL,
'value' TEXT);""",
"""INSERT INTO meta VALUES ('db_version', '1');"""]
with closing(self.db.cursor()) as cursor:
cursor.execute("SELECT * FROM sqlite_master WHERE type='table' AND name='meta'")
# Initialize DB
if len(cursor.fetchall()) == 0:
logging.warning("Initializing database")
for query in queries:
print(query)
cursor.execute(query)
cursor.execute("COMMIT")
else:
# Migrate if old db exists
# cursor.execute("""UPDATE meta SET value=? WHERE key="db_version";""", (str(version), ))
# logging.warning("db schema is version {}".format(version))
pass
def add_root(self, path, name="Library"):
"""
Add a new library root. Returns the root ID or raises on collision
:param path: normalized absolute path to add to the library
:type path: str:
:return: int
:raises: sqlite3.IntegrityError
"""
assert path.startswith("/")
with closing(self.db.cursor()) as cursor:
try:
cursor.execute("INSERT INTO libraries ('name', 'path') VALUES (?, ?)", (name, path, ))
cursor.execute("COMMIT")
return cursor.lastrowid
except sqlite3.IntegrityError:
raise DuplicateRootException("Root '{}' already exists".format(path))
@readcursor
def get_libraries(self, cursor):
libs = []
cursor.execute("SELECT * FROM libraries")
for row in cursor:
libs.append(row)
return libs
@readcursor
def get_artists(self, cursor, id=None, dirid=None, sortby=None, order=None):
assert order in ["asc", "desc", None]
artists = []
q = "SELECT * FROM artists"
params = []
conditions = []
if id:
conditions.append("id = ?")
params.append(id)
if dirid:
conditions.append("dir = ?")
params.append(dirid)
if conditions:
q += " WHERE " + " AND ".join(conditions)
if sortby:
q += " ORDER BY {} {}".format(sortby, order.upper() if order else "ASC")
print(q)
print(params)
cursor.execute(q, params)
for row in cursor:
artists.append(row)
return artists
@readcursor
def get_albums(self, cursor, id=None, artist=None, sortby=None, order=None):
assert order in ["asc", "desc", None]
albums = []
q = "SELECT * FROM albums"
params = []
conditions = []
if id:
conditions.append("id = ?")
params.append(id)
if artist:
conditions.append("artistid = ?")
params.append(artist)
if conditions:
q += " WHERE " + " AND ".join(conditions)
if sortby:
q += " ORDER BY {} {}".format(sortby, order.upper() if order else "ASC")
cursor.execute(q, params)
for row in cursor:
albums.append(row)
return albums
@readcursor
def get_song(self, cursor, songid):
for item in cursor.execute("SELECT * FROM songs WHERE id=?", (songid, )):
return item
return None
# @readcursor
# def get_artist_by_dir(self, cursor, dirid):
# for row in cursor.execute("""
# SELECT artists.*
# FROM dirs
# INNER JOIN artists
# ON artists.dir = dirs.id
# WHERE dirs.id=?""", (dirid, )):
# return [row]
# return []
@readcursor
def get_cover(self, cursor, coverid):
cover = None
for cover in cursor.execute("SELECT * FROM covers WHERE id = ?", (coverid, )):
return cover
@readcursor
def get_musicdir(self, cursor, dirid):
"""
The world is a harsh place.
Again, this bullshit exists only to serve subsonic clients. Given a directory ID it returns a dict containing:
- the directory itself
- its parent
- its child dirs
- its child media
that's a lie, it's a tuple and it's full of BS. read the code
"""
# find directory
dirinfo = None
for dirinfo in cursor.execute("SELECT * FROM dirs WHERE id = ?", (dirid, )):
pass
assert dirinfo
ret = None
# see if it matches the artists or albums table
artist = None
for artist in cursor.execute("SELECT * FROM artists WHERE dir = ?", (dirid, )):
pass
# if artist:
# get child albums
if artist:
ret = ("artist", dirinfo, artist)
children = []
for album in cursor.execute("SELECT * FROM albums WHERE artistid = ?", (artist["id"], )):
children.append(("album", album))
ret[2]['children'] = children
return ret
# else if album:
# get child tracks
album = None
for album in cursor.execute("SELECT * FROM albums WHERE dir = ?", (dirid, )):
pass
if album:
ret = ("album", dirinfo, album)
artist_info = cursor.execute("SELECT * FROM artists WHERE id = ?", (album["artistid"], )).fetchall()[0]
children = []
for song in cursor.execute("SELECT * FROM songs WHERE albumid = ?", (album["id"], )):
song["_artist"] = artist_info
children.append(("song", song))
ret[2]['children'] = children
return ret
@readcursor
def add_user(self, cursor, username, password, is_admin=False):
cursor.execute("INSERT INTO users (username, password, admin) VALUES (?, ?, ?)",
(username, self.hashit(password), is_admin))
cursor.execute("COMMIT")
@readcursor
def update_user(self, cursor, username, password, is_admin=False):
cursor.execute("UPDATE users SET password=?, admin=? WHERE username=?;",
(self.hashit(password), is_admin, username))
cursor.execute("COMMIT")
@readcursor
def get_user(self, cursor, user):
try:
column = "id" if type(user) is int else "username"
return cursor.execute("SELECT * FROM users WHERE {}=?;".format(column), (user, )).fetchall()[0]
except IndexError:
raise NotFoundError("User doesn't exist")
def hashit(self, unicode_string):
return sha512(unicode_string.encode('UTF-8')).hexdigest()