359 lines
11 KiB
Python
359 lines
11 KiB
Python
import sqlite3
|
|
from pathlib import Path
|
|
from hashlib import sha1
|
|
from PIL import Image, ImageOps
|
|
from io import BytesIO
|
|
from datetime import datetime
|
|
from dateutil.parser import parse as dateparse
|
|
|
|
from engine_sync.track import Track
|
|
from engine_sync.playlist import Playlist
|
|
|
|
|
|
class EngineInterface:
|
|
def __init__(self, engine_prefix_path: Path, junction_point: Path) -> None:
|
|
self.engine_prefix_path = engine_prefix_path
|
|
self.junction_point = junction_point
|
|
|
|
def add_track(self, track: Track, cur: sqlite3.Cursor) -> None:
|
|
"""
|
|
Add a track to the Engine DJ library.
|
|
|
|
Track ID will be added to the track object
|
|
"""
|
|
|
|
album_art_id = None
|
|
|
|
if track.cover is not None:
|
|
# add the track cover to the database
|
|
album_art_id = self.add_album_cover(track.cover, cur)
|
|
|
|
# get the filename
|
|
filename = track.file.name
|
|
|
|
# generate the path Engine wants to see
|
|
fixed_path = self.engine_prefix_path / track.file.relative_to(
|
|
self.junction_point
|
|
)
|
|
|
|
# merge the artist string
|
|
artist_str = ", ".join(track.artists)
|
|
|
|
# get the current UNIX timestamp date
|
|
current_date = int(datetime.now().timestamp())
|
|
|
|
# translate data type for explicit lyrics
|
|
explicit_int = 1 if track.explicit else 0
|
|
|
|
# add the track to the database
|
|
res = cur.execute(
|
|
"""
|
|
INSERT INTO Track (
|
|
length,
|
|
bpm,
|
|
year,
|
|
path,
|
|
filename,
|
|
bitrate,
|
|
albumArtId,
|
|
fileBytes,
|
|
title,
|
|
artist,
|
|
album,
|
|
genre,
|
|
label,
|
|
fileType,
|
|
dateCreated,
|
|
dateAdded,
|
|
explicitLyrics,
|
|
lastEditTime,
|
|
playOrder,
|
|
rating,
|
|
albumArt,
|
|
isPlayed,
|
|
isAnalyzed,
|
|
isAvailable,
|
|
isMetadataOfPackedTrackChanged,
|
|
isPerfomanceDataOfPackedTrackChanged,
|
|
isMetadataImported,
|
|
pdbImportKey,
|
|
streamingFlags
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 0, 'image://planck/0', 0, 0, 1, 0, 0, 1, 0, 0)
|
|
RETURNING id
|
|
""",
|
|
(
|
|
track.length,
|
|
track.bpm,
|
|
track.release_date.year,
|
|
str(fixed_path),
|
|
filename,
|
|
track.bitrate,
|
|
album_art_id,
|
|
track.file_size,
|
|
track.title,
|
|
artist_str,
|
|
track.album,
|
|
track.genre,
|
|
track.label,
|
|
track.file.suffix[1:],
|
|
track.creation_date,
|
|
current_date,
|
|
explicit_int,
|
|
current_date,
|
|
),
|
|
)
|
|
|
|
track.engine_id = res.fetchone()[0]
|
|
|
|
def add_album_cover(self, cover: bytes, cur: sqlite3.Cursor) -> int:
|
|
"""
|
|
Add an album cover to the track database, skip if already in there.
|
|
Return the ID of the column in the database.
|
|
"""
|
|
|
|
# if this is a duplicate, skip it
|
|
duplicate, hash, id = self.check_cover_in_database(cover, cur)
|
|
|
|
if duplicate:
|
|
return id
|
|
|
|
cover_im = Image.open(BytesIO(cover))
|
|
|
|
# resize the image to 256x256 (Engine requirement)
|
|
# keep the aspect ratio
|
|
cover_im = ImageOps.contain(cover_im, (256, 256))
|
|
|
|
to_db_bytes = b""
|
|
|
|
# convert to PNG
|
|
with BytesIO() as f:
|
|
cover_im.save(f, format="PNG")
|
|
f.seek(0)
|
|
to_db_bytes = f.getvalue()
|
|
|
|
# save to the database and readback ID
|
|
res = cur.execute(
|
|
"INSERT INTO AlbumArt (hash, albumArt) VALUES (?, ?) RETURNING id",
|
|
(
|
|
hash,
|
|
to_db_bytes,
|
|
),
|
|
)
|
|
|
|
# return the ID
|
|
return res.fetchone()[0]
|
|
|
|
def check_cover_in_database(
|
|
self, cover: bytes, cur: sqlite3.Cursor
|
|
) -> tuple[bool, str, int]:
|
|
"""
|
|
Check if the given image exists in the album art table.
|
|
|
|
While the exact way Engine calculates the image hash is unknown,
|
|
we can at least de-duplicate with tracks that have been imported
|
|
through this tool.
|
|
|
|
The hash is a simple sha1 sum of the original cover image.
|
|
"""
|
|
|
|
image_hash = sha1(cover).hexdigest()
|
|
|
|
res = cur.execute("SELECT id FROM AlbumArt WHERE hash=?", (image_hash,))
|
|
|
|
found = False
|
|
id = -1
|
|
|
|
data = res.fetchone()
|
|
|
|
if data is not None:
|
|
found = True
|
|
id = data[0]
|
|
|
|
return found, image_hash, id
|
|
|
|
def check_track_in_database(self, track: Track, cur: sqlite3.Cursor) -> bool:
|
|
"""
|
|
Check if a track is the Engine database / collection.
|
|
Track object only needs a file path.
|
|
If true is returned, the track object will have the Engine database ID filled.
|
|
"""
|
|
|
|
# generate the path Engine wants to see
|
|
fixed_path = self.engine_prefix_path / track.file.relative_to(
|
|
self.junction_point
|
|
)
|
|
|
|
res = cur.execute(
|
|
"SELECT (id) FROM Track WHERE path=?",
|
|
(str(fixed_path),),
|
|
)
|
|
|
|
db_track = res.fetchone()
|
|
|
|
# check if the track was found
|
|
if db_track is None:
|
|
return False
|
|
|
|
track.engine_id = db_track[0]
|
|
|
|
return True
|
|
|
|
def get_database_uuid(self, cur: sqlite3.Cursor) -> str:
|
|
"""
|
|
Get the uuid of the Engine database
|
|
"""
|
|
res = cur.execute("SELECT uuid FROM Information WHERE id=1").fetchone()
|
|
|
|
if res is None:
|
|
return None
|
|
|
|
return res[0]
|
|
|
|
def get_playlist(self, playlist_id: int, cur: sqlite3.Cursor) -> Playlist:
|
|
"""
|
|
Get a playlist object from an Engine database ID.
|
|
"""
|
|
|
|
res = cur.execute(
|
|
"SELECT title, lastEditTime, parentListId FROM Playlist WHERE id=?",
|
|
(playlist_id,),
|
|
)
|
|
|
|
pl_db_data = res.fetchone()
|
|
|
|
if pl_db_data is None:
|
|
# something has gone wrong
|
|
return None
|
|
|
|
pl_db_name = pl_db_data[0]
|
|
pl_db_last_edit = pl_db_data[1]
|
|
pl_db_parent = pl_db_data[2]
|
|
|
|
playlist = Playlist(pl_db_name, engine_id=playlist_id, parent=pl_db_parent)
|
|
playlist.last_edited = dateparse(pl_db_last_edit)
|
|
|
|
return playlist
|
|
|
|
def get_playlist_id(self, path: Path, cur: sqlite3.Cursor) -> Playlist:
|
|
"""
|
|
Get the Engine database ID of a given playlist in the tree.
|
|
"""
|
|
if len(path.parts) < 1:
|
|
print("no parts")
|
|
return None
|
|
|
|
db_path = ";".join(reversed(path.parts))
|
|
db_path += ";"
|
|
print("looking for path", db_path)
|
|
|
|
# check if the playlist exists
|
|
res = cur.execute(
|
|
"SELECT id FROM PlaylistPath WHERE path=?",
|
|
(db_path,),
|
|
)
|
|
|
|
pl_db_id = res.fetchone()
|
|
|
|
if pl_db_id is None:
|
|
print("no path")
|
|
return None
|
|
|
|
# clean up the data
|
|
pl_db_id = pl_db_id[0]
|
|
|
|
return self.get_playlist(pl_db_id, cur)
|
|
|
|
def add_playlist(self, playlist: Playlist, cur: sqlite3.Cursor) -> None:
|
|
"""
|
|
Add a new playlist to the Engine database
|
|
|
|
Adds the playlist ID to the playlist object.
|
|
"""
|
|
|
|
# insert the new playlist
|
|
res = cur.execute(
|
|
"INSERT INTO Playlist (title, parentListId, isPersisted, nextListId, lastEditTime, isExplicitlyExported) VALUES (?, ?, 1, 0, ?, 1) RETURNING id",
|
|
(
|
|
playlist.name,
|
|
playlist.parent,
|
|
playlist.last_edited.isoformat(sep=" ", timespec="seconds"),
|
|
),
|
|
)
|
|
|
|
playlist.engine_id = res.fetchone()[0]
|
|
|
|
def add_track_to_playlist(
|
|
self,
|
|
track: Track,
|
|
playlist: Playlist,
|
|
database_uuid: str,
|
|
cur: sqlite3.Cursor,
|
|
true_member: bool = True,
|
|
) -> None:
|
|
"""
|
|
Add a track to a given playlist.
|
|
"""
|
|
"""
|
|
membershipReference:
|
|
|
|
If the track is not in this playlist but propagates up from one of the child playlists,
|
|
the membership reference field represents how many direkt child playlists contain this track.
|
|
|
|
If the track is direct part of this playlist, the membership reference field is 0.
|
|
"""
|
|
|
|
# check if the track is already in the playlist
|
|
res = cur.execute(
|
|
"SELECT id, membershipReference FROM PlaylistEntity WHERE listId=? AND trackId=?",
|
|
(playlist.engine_id, track.engine_id),
|
|
)
|
|
|
|
entity = res.fetchone()
|
|
|
|
if entity is None:
|
|
# track is not yet in playlist, add it
|
|
|
|
# if this is caused by propagation, set the membership reference to 1, otherwise 0
|
|
mem_ref = 0 if true_member else 1
|
|
|
|
# insert the track into the playlist
|
|
cur.execute(
|
|
"INSERT INTO PlaylistEntity (listId, trackId, databaseUuid, nextEntityId, membershipReference) VALUES (?, ?, ?, 0, ?)",
|
|
(playlist.engine_id, track.engine_id, database_uuid, mem_ref),
|
|
)
|
|
|
|
# if this is the root level, we're done
|
|
if playlist.parent == 0:
|
|
return
|
|
|
|
# if the playlist has a parent, propagate up
|
|
parent_pl = self.get_playlist(playlist.parent, cur)
|
|
self.add_track_to_playlist(
|
|
track, parent_pl, database_uuid, cur, true_member=False
|
|
)
|
|
|
|
return
|
|
|
|
# track is already in playlist, check if we need to update
|
|
|
|
entity_id = entity[0]
|
|
mem_ref = entity[1]
|
|
|
|
# if the membership reference is 0, we're done
|
|
if mem_ref == 0:
|
|
return
|
|
|
|
mem_ref = 0 if true_member else mem_ref + 1
|
|
|
|
# we only need to update this level
|
|
cur.execute(
|
|
"UPDATE PlaylistEntity SET membershipReference=? WHERE id=?",
|
|
(mem_ref, entity_id),
|
|
)
|
|
|
|
def check_track_in_playlist(
|
|
self, track: Track, playlist: Playlist, cur: sqlite3.Cursor
|
|
) -> bool:
|
|
"""
|
|
Check if a track is in a given playlist.
|
|
"""
|