From 63a9b3731245c0e2dffb1db92faff94594437949 Mon Sep 17 00:00:00 2001 From: Fabian Posch Date: Tue, 5 Aug 2025 00:46:48 +0200 Subject: [PATCH] basic importing functionality --- engine_sync/__main__.py | 52 +++++ engine_sync/engine_interface.py | 359 ++++++++++++++++++++++++++++++++ engine_sync/playlist.py | 23 ++ engine_sync/track.py | 204 ++++++++++++++++++ 4 files changed, 638 insertions(+) create mode 100644 engine_sync/__main__.py create mode 100644 engine_sync/engine_interface.py create mode 100644 engine_sync/playlist.py create mode 100644 engine_sync/track.py diff --git a/engine_sync/__main__.py b/engine_sync/__main__.py new file mode 100644 index 0000000..1381dfe --- /dev/null +++ b/engine_sync/__main__.py @@ -0,0 +1,52 @@ +from pathlib import Path +import sqlite3 + +from engine_sync.engine_interface import EngineInterface +from engine_sync.track import Track +from engine_sync.playlist import Playlist + + +def main(): + + engine_prefix_path = Path( + "../../AppData/Local/AIR Music Technology/EnginePrime/Favorites/Music_Vault/" + ) + + junction_point = Path("/mnt/media/music/") + + print("Hello from engine-sync!") + + db_path = Path("/mnt/c/Users/Fabian/Music/Engine Library/Database2/m.db") + con = sqlite3.connect(db_path) + + itf = EngineInterface(engine_prefix_path, junction_point) + + filepath = Path( + "/mnt/media/music/astr0/nanobii/Rainbow Road/Disk 001/nanobii - Rainbow Road.flac" + ) + + track = Track(filepath) + track.load_from_metadata() + track.fill_empty() + + print(track) + + cur = con.cursor() + + parent = itf.get_playlist_id(Path("container"), cur) + playlist = Playlist("First try", parent=parent.engine_id) + itf.check_track_in_database(track, cur) + # itf.add_track(track, cur) + # playlist = itf.get_playlist_id(Path("1/1.3"), cur) + # print("playlist", playlist) + uuid = itf.get_database_uuid(cur) + itf.add_playlist(playlist, cur) + itf.add_track_to_playlist(track, playlist, uuid, cur) + + con.commit() + # print("Track ID is", track.engine_id) + print("Playlist ID is", playlist.engine_id) + + +if __name__ == "__main__": + main() diff --git a/engine_sync/engine_interface.py b/engine_sync/engine_interface.py new file mode 100644 index 0000000..50b306a --- /dev/null +++ b/engine_sync/engine_interface.py @@ -0,0 +1,359 @@ +import sqlite3 +from pathlib import Path +from hashlib import sha1 +from PIL import Image, ImageOps +from io import BytesIO +from datetime import datetime +from dateutil.parser import parse as dateparse + +from engine_sync.track import Track +from engine_sync.playlist import Playlist + + +class EngineInterface: + def __init__(self, engine_prefix_path: Path, junction_point: Path) -> None: + self.engine_prefix_path = engine_prefix_path + self.junction_point = junction_point + + def add_track(self, track: Track, cur: sqlite3.Cursor) -> None: + """ + Add a track to the Engine DJ library. + + Track ID will be added to the track object + """ + + album_art_id = None + + if track.cover is not None: + # add the track cover to the database + album_art_id = self.add_album_cover(track.cover, cur) + + # get the filename + filename = track.file.name + + # generate the path Engine wants to see + fixed_path = self.engine_prefix_path / track.file.relative_to( + self.junction_point + ) + + # merge the artist string + artist_str = ", ".join(track.artists) + + # get the current UNIX timestamp date + current_date = int(datetime.now().timestamp()) + + # translate data type for explicit lyrics + explicit_int = 1 if track.explicit else 0 + + # add the track to the database + res = cur.execute( + """ + INSERT INTO Track ( + length, + bpm, + year, + path, + filename, + bitrate, + albumArtId, + fileBytes, + title, + artist, + album, + genre, + label, + fileType, + dateCreated, + dateAdded, + explicitLyrics, + lastEditTime, + playOrder, + rating, + albumArt, + isPlayed, + isAnalyzed, + isAvailable, + isMetadataOfPackedTrackChanged, + isPerfomanceDataOfPackedTrackChanged, + isMetadataImported, + pdbImportKey, + streamingFlags + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 0, 'image://planck/0', 0, 0, 1, 0, 0, 1, 0, 0) + RETURNING id +""", + ( + track.length, + track.bpm, + track.release_date.year, + str(fixed_path), + filename, + track.bitrate, + album_art_id, + track.file_size, + track.title, + artist_str, + track.album, + track.genre, + track.label, + track.file.suffix[1:], + track.creation_date, + current_date, + explicit_int, + current_date, + ), + ) + + track.engine_id = res.fetchone()[0] + + def add_album_cover(self, cover: bytes, cur: sqlite3.Cursor) -> int: + """ + Add an album cover to the track database, skip if already in there. + Return the ID of the column in the database. + """ + + # if this is a duplicate, skip it + duplicate, hash, id = self.check_cover_in_database(cover, cur) + + if duplicate: + return id + + cover_im = Image.open(BytesIO(cover)) + + # resize the image to 256x256 (Engine requirement) + # keep the aspect ratio + cover_im = ImageOps.contain(cover_im, (256, 256)) + + to_db_bytes = b"" + + # convert to PNG + with BytesIO() as f: + cover_im.save(f, format="PNG") + f.seek(0) + to_db_bytes = f.getvalue() + + # save to the database and readback ID + res = cur.execute( + "INSERT INTO AlbumArt (hash, albumArt) VALUES (?, ?) RETURNING id", + ( + hash, + to_db_bytes, + ), + ) + + # return the ID + return res.fetchone()[0] + + def check_cover_in_database( + self, cover: bytes, cur: sqlite3.Cursor + ) -> tuple[bool, str, int]: + """ + Check if the given image exists in the album art table. + + While the exact way Engine calculates the image hash is unknown, + we can at least de-duplicate with tracks that have been imported + through this tool. + + The hash is a simple sha1 sum of the original cover image. + """ + + image_hash = sha1(cover).hexdigest() + + res = cur.execute("SELECT id FROM AlbumArt WHERE hash=?", (image_hash,)) + + found = False + id = -1 + + data = res.fetchone() + + if data is not None: + found = True + id = data[0] + + return found, image_hash, id + + def check_track_in_database(self, track: Track, cur: sqlite3.Cursor) -> bool: + """ + Check if a track is the Engine database / collection. + Track object only needs a file path. + If true is returned, the track object will have the Engine database ID filled. + """ + + # generate the path Engine wants to see + fixed_path = self.engine_prefix_path / track.file.relative_to( + self.junction_point + ) + + res = cur.execute( + "SELECT (id) FROM Track WHERE path=?", + (str(fixed_path),), + ) + + db_track = res.fetchone() + + # check if the track was found + if db_track is None: + return False + + track.engine_id = db_track[0] + + return True + + def get_database_uuid(self, cur: sqlite3.Cursor) -> str: + """ + Get the uuid of the Engine database + """ + res = cur.execute("SELECT uuid FROM Information WHERE id=1").fetchone() + + if res is None: + return None + + return res[0] + + def get_playlist(self, playlist_id: int, cur: sqlite3.Cursor) -> Playlist: + """ + Get a playlist object from an Engine database ID. + """ + + res = cur.execute( + "SELECT title, lastEditTime, parentListId FROM Playlist WHERE id=?", + (playlist_id,), + ) + + pl_db_data = res.fetchone() + + if pl_db_data is None: + # something has gone wrong + return None + + pl_db_name = pl_db_data[0] + pl_db_last_edit = pl_db_data[1] + pl_db_parent = pl_db_data[2] + + playlist = Playlist(pl_db_name, engine_id=playlist_id, parent=pl_db_parent) + playlist.last_edited = dateparse(pl_db_last_edit) + + return playlist + + def get_playlist_id(self, path: Path, cur: sqlite3.Cursor) -> Playlist: + """ + Get the Engine database ID of a given playlist in the tree. + """ + if len(path.parts) < 1: + print("no parts") + return None + + db_path = ";".join(reversed(path.parts)) + db_path += ";" + print("looking for path", db_path) + + # check if the playlist exists + res = cur.execute( + "SELECT id FROM PlaylistPath WHERE path=?", + (db_path,), + ) + + pl_db_id = res.fetchone() + + if pl_db_id is None: + print("no path") + return None + + # clean up the data + pl_db_id = pl_db_id[0] + + return self.get_playlist(pl_db_id, cur) + + def add_playlist(self, playlist: Playlist, cur: sqlite3.Cursor) -> None: + """ + Add a new playlist to the Engine database + + Adds the playlist ID to the playlist object. + """ + + # insert the new playlist + res = cur.execute( + "INSERT INTO Playlist (title, parentListId, isPersisted, nextListId, lastEditTime, isExplicitlyExported) VALUES (?, ?, 1, 0, ?, 1) RETURNING id", + ( + playlist.name, + playlist.parent, + playlist.last_edited.isoformat(sep=" ", timespec="seconds"), + ), + ) + + playlist.engine_id = res.fetchone()[0] + + def add_track_to_playlist( + self, + track: Track, + playlist: Playlist, + database_uuid: str, + cur: sqlite3.Cursor, + true_member: bool = True, + ) -> None: + """ + Add a track to a given playlist. + """ + """ + membershipReference: + + If the track is not in this playlist but propagates up from one of the child playlists, + the membership reference field represents how many direkt child playlists contain this track. + + If the track is direct part of this playlist, the membership reference field is 0. + """ + + # check if the track is already in the playlist + res = cur.execute( + "SELECT id, membershipReference FROM PlaylistEntity WHERE listId=? AND trackId=?", + (playlist.engine_id, track.engine_id), + ) + + entity = res.fetchone() + + if entity is None: + # track is not yet in playlist, add it + + # if this is caused by propagation, set the membership reference to 1, otherwise 0 + mem_ref = 0 if true_member else 1 + + # insert the track into the playlist + cur.execute( + "INSERT INTO PlaylistEntity (listId, trackId, databaseUuid, nextEntityId, membershipReference) VALUES (?, ?, ?, 0, ?)", + (playlist.engine_id, track.engine_id, database_uuid, mem_ref), + ) + + # if this is the root level, we're done + if playlist.parent == 0: + return + + # if the playlist has a parent, propagate up + parent_pl = self.get_playlist(playlist.parent, cur) + self.add_track_to_playlist( + track, parent_pl, database_uuid, cur, true_member=False + ) + + return + + # track is already in playlist, check if we need to update + + entity_id = entity[0] + mem_ref = entity[1] + + # if the membership reference is 0, we're done + if mem_ref == 0: + return + + mem_ref = 0 if true_member else mem_ref + 1 + + # we only need to update this level + cur.execute( + "UPDATE PlaylistEntity SET membershipReference=? WHERE id=?", + (mem_ref, entity_id), + ) + + def check_track_in_playlist( + self, track: Track, playlist: Playlist, cur: sqlite3.Cursor + ) -> bool: + """ + Check if a track is in a given playlist. + """ diff --git a/engine_sync/playlist.py b/engine_sync/playlist.py new file mode 100644 index 0000000..407cb9b --- /dev/null +++ b/engine_sync/playlist.py @@ -0,0 +1,23 @@ +from datetime import datetime + + +class Playlist: + def __init__(self, name: str, engine_id: int = None, parent: int = 0): + self.name = name + self.parent = parent + self.engine_id = engine_id + self.last_edited = datetime.now() + + def __str__(self): + + st = "'" + self.name + "'" + + if self.parent != 0: + st += ", parent " + str(self.parent) + + if self.engine_id != 0: + st += ", Engine ID " + str(self.engine_id) + + st += ", Last edited " + self.last_edited.isoformat(sep=" ", timespec="seconds") + + return st diff --git a/engine_sync/track.py b/engine_sync/track.py new file mode 100644 index 0000000..4bdaa85 --- /dev/null +++ b/engine_sync/track.py @@ -0,0 +1,204 @@ +import math + +from pathlib import Path +from mutagen.mp3 import MP3 +from mutagen.flac import FLAC +from datetime import datetime +from dateutil.parser import parse as dateparse + + +class Track: + def __init__( + self, + file: Path, + bitrate: int = None, + file_size: int = None, + creation_date: int = None, + modification_date: int = None, + length: int = None, + title: str = None, + artists: list[str] = None, + album: str = None, + genre: str = None, + bpm: int = None, + release_date: datetime = None, + label: str = None, + cover: bytes = None, + filetype: str = None, + explicit: bool = False, + engine_id: int = None, + ): + self.file = file + self.bitrate = bitrate + self.file_size = file_size + self.creation_date = creation_date + self.modification_date = modification_date + self.length = length + self.title = title + self.artists = artists + self.album = album + self.genre = genre + self.bpm = bpm + self.release_date = release_date + self.label = label + self.cover = cover + self.filetype = filetype + self.explicit = explicit + self.engine_id = engine_id + + def fill_empty(self): + + if self.title is None: + self.title = "" + + if self.artists is None: + self.artists = [""] + + if self.album is None: + self.album = "" + + if self.genre is None: + self.genre = "" + + if self.release_date is None: + self.release_date = datetime.now() + + if self.label is None: + self.label = "" + + if self.explicit is None: + self.explicit = False + + def load_from_metadata(self): + """ + Try to fill up non-defined fields from file metadata. + """ + if self.file.suffix == ".flac": + self.load_from_metadata_flac() + + def load_from_metadata_flac(self): + """ + Try to fill up non-defined fields from file metadata. + Loads the fields from a FLAC file. + """ + + tags = FLAC(self.file) + + if self.bitrate is None: + self.bitrate = int(tags.info.bitrate / 1000) + + if self.file_size is None: + self.file_size = self.file.stat().st_size + + if self.creation_date is None: + self.creation_date = int(self.file.stat().st_ctime) + + if self.modification_date is None: + self.modification_date = int(self.file.stat().st_mtime) + + if self.length is None: + self.length = int(math.ceil(tags.info.length)) + + if self.title is None: + try: + self.title = tags["title"][0] + except KeyError: + pass + + if self.artists is None: + try: + self.artists = tags["artist"] + except KeyError: + pass + + if self.album is None: + try: + self.album = tags["album"][0] + except KeyError: + pass + + if self.genre is None: + try: + self.genre = tags["genre"][0] + except KeyError: + pass + + if self.bpm is None: + try: + self.bpm = int(tags["bpm"][0]) + except KeyError: + pass + + if self.release_date is None: + try: + self.release_date = dateparse(tags["date"][0]) + except KeyError: + pass + + if self.label is None: + try: + self.label = tags["publisher"][0] + except KeyError: + pass + + if self.cover is None: + pictures = tags.pictures + + if pictures is not None and len(pictures) > 0: + self.cover = tags.pictures[0].data + + if self.explicit is None: + try: + self.explicit = bool(tags["itunesadvisory"]) + except KeyError: + self.explicit = False + + def __str__(self) -> str: + st = ( + "'" + + self.title + + "' by " + + ", ".join(self.artists) + + " in album '" + + self.album + + "', released " + + str(self.release_date.date()) + ) + + if self.label is not None: + st += " by " + self.label + ". " + else: + st += ". " + + st += "Song is " + + if self.bpm is not None: + st += "at " + str(self.bpm) + " BPM, " + + st += "Explicit " + str(self.explicit) + + if self.genre is not None: + st += ", Genre " + self.genre + + st += ( + ", Runtime " + + str(self.length) + + ", Bitrate " + + str(self.bitrate) + + ", File size " + + str(self.file_size) + + ", Created " + + str(self.creation_date) + + ", Last modified " + + str(self.modification_date) + ) + + if self.engine_id is not None: + st += ", Engine ID " + str(self.engine_id) + + if self.cover is None: + st += ", No cover." + else: + st += ", Cover attached." + + return st