160 lines
5.1 KiB
Python
160 lines
5.1 KiB
Python
import os
|
|
import logging
|
|
from pysonic.scanner import PysonicFilesystemScanner
|
|
from pysonic.types import MUSIC_TYPES
|
|
|
|
|
|
LETTER_GROUPS = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t",
|
|
"u", "v", "w", "xyz", "0123456789"]
|
|
|
|
|
|
logging = logging.getLogger("library")
|
|
|
|
|
|
def memoize(function):
|
|
memo = {}
|
|
|
|
def wrapper(*args):
|
|
if args in memo:
|
|
return memo[args]
|
|
else:
|
|
rv = function(*args)
|
|
memo[args] = rv
|
|
return rv
|
|
return wrapper
|
|
|
|
|
|
class NoDataException(Exception):
|
|
pass
|
|
|
|
|
|
class PysonicLibrary(object):
|
|
def __init__(self, database):
|
|
self.db = database
|
|
|
|
self.get_libraries = self.db.get_libraries
|
|
self.get_artists = self.db.get_artists
|
|
self.get_albums = self.db.get_albums
|
|
# self.get_song = self.db.get_song
|
|
# self.get_cover = self.db.get_cover
|
|
|
|
self.scanner = PysonicFilesystemScanner(self)
|
|
logging.info("library ready")
|
|
|
|
def update(self):
|
|
"""
|
|
Start the library media scanner ands
|
|
"""
|
|
self.scanner.init_scan()
|
|
|
|
def add_root_dir(self, path):
|
|
"""
|
|
The music library consists of a number of root dirs. This adds a new root
|
|
"""
|
|
path = os.path.abspath(os.path.normpath(path))
|
|
self.db.add_root(path)
|
|
|
|
# def get_artists(self, *args, **kwargs):
|
|
# artists = self.db.get_artists(*args, **kwargs)
|
|
# for item in artists:
|
|
# item["parent"] = item["libraryid"]
|
|
# return artists
|
|
|
|
# def get_albums(self, *args, **kwargs):
|
|
# albums = self.db.get_albums(*args, **kwargs)
|
|
# for item in albums:
|
|
# item["parent"] = item["artistid"]
|
|
# return albums
|
|
|
|
def get_artist_info(self, item_id):
|
|
#TODO
|
|
return {"biography": "placeholder biography",
|
|
"musicBrainzId": "playerholder",
|
|
"lastFmUrl": "https://www.last.fm/music/Placeholder",
|
|
"smallImageUrl": "",
|
|
"mediumImageUrl": "",
|
|
"largeImageUrl": "",
|
|
"similarArtists": []}
|
|
|
|
def get_cover(self, cover_id):
|
|
cover = self.db.get_cover(cover_id)
|
|
library = self.db.get_libraries(cover["library"])[0]
|
|
cover['_fullpath'] = os.path.join(library["path"], cover["path"])
|
|
return cover
|
|
|
|
def get_song(self, song_id):
|
|
song = self.db.get_song(song_id)
|
|
library = self.db.get_libraries(song["library"])[0]
|
|
song['_fullpath'] = os.path.join(library["path"], song["file"])
|
|
return song
|
|
|
|
# #@memoize
|
|
# def get_libraries(self):
|
|
# """
|
|
# Libraries are top-level nodes
|
|
# """
|
|
# return self.db.getnodes(-1)
|
|
|
|
# #@memoize
|
|
# def get_artists(self):
|
|
# # Assume artists are second level dirs
|
|
# return self.db.getnodes(*[item["id"] for item in self.get_libraries()])
|
|
|
|
# def get_dir(self, dirid):
|
|
# return self.db.getnode(dirid)
|
|
|
|
# def get_dir_children(self, dirid):
|
|
# return self.db.getnodes(dirid)
|
|
|
|
# #@memoize
|
|
# def get_albums(self):
|
|
# return self.db.getnodes(*[item["id"] for item in self.get_artists()])
|
|
|
|
# #@memoize
|
|
# def get_filepath(self, nodeid):
|
|
# parents = [self.db.getnode(nodeid)]
|
|
# while parents[-1]['parent'] != -1:
|
|
# parents.append(self.db.getnode(parents[-1]['parent']))
|
|
# root = parents.pop()
|
|
# parents.reverse()
|
|
# return os.path.join(root['metadata']['fspath'], *[i['name'] for i in parents])
|
|
|
|
# def get_file_metadata(self, nodeid):
|
|
# return self.db.get_metadata(nodeid)
|
|
|
|
# def get_artist_info(self, item_id):
|
|
# # artist = self.db.getnode(item_id)
|
|
# return {"biography": "placeholder biography",
|
|
# "musicBrainzId": "playerholder",
|
|
# "lastFmUrl": "https://www.last.fm/music/Placeholder",
|
|
# "smallImageUrl": "",
|
|
# "mediumImageUrl": "",
|
|
# "largeImageUrl": "",
|
|
# "similarArtists": []}
|
|
|
|
# def set_starred(self, username, node_id, starred):
|
|
# self.db.set_starred(self.db.get_user(username)["id"], node_id, starred)
|
|
|
|
# def get_stars(self, user, user_id):
|
|
# self.db.get_stars()
|
|
|
|
# def get_user(self, user):
|
|
# return self.db.get_user(user)
|
|
|
|
# def get_starred(self, username):
|
|
# return self.db.get_starred_items(self.db.get_user(username)["id"])
|
|
|
|
# def get_songs(self, limit=50, shuffle=True):
|
|
# return self.db.getnodes(types=MUSIC_TYPES, limit=limit, order="rand")
|
|
|
|
# def get_song(self, id=None):
|
|
# if id:
|
|
# return self.db.getnode(id)
|
|
# else:
|
|
# return self.db.getnodes(types=MUSIC_TYPES, limit=1, order="rand")
|
|
|
|
# def report_transcode(self, item_id, bitrate, num_bytes):
|
|
# assert type(bitrate) is int and bitrate > 0 and bitrate <= 320
|
|
# logging.info("Got transcode report of {} for item {} @ {}".format(num_bytes, item_id, bitrate))
|
|
# self.db.update_metadata(item_id, {"transcoded_{}_size".format(bitrate):int(num_bytes)})
|