basic importing functionality

This commit is contained in:
Fabian Posch 2025-08-05 00:46:48 +02:00
parent 51676044e3
commit 63a9b37312
4 changed files with 638 additions and 0 deletions

52
engine_sync/__main__.py Normal file
View file

@ -0,0 +1,52 @@
from pathlib import Path
import sqlite3
from engine_sync.engine_interface import EngineInterface
from engine_sync.track import Track
from engine_sync.playlist import Playlist
def main():
engine_prefix_path = Path(
"../../AppData/Local/AIR Music Technology/EnginePrime/Favorites/Music_Vault/"
)
junction_point = Path("/mnt/media/music/")
print("Hello from engine-sync!")
db_path = Path("/mnt/c/Users/Fabian/Music/Engine Library/Database2/m.db")
con = sqlite3.connect(db_path)
itf = EngineInterface(engine_prefix_path, junction_point)
filepath = Path(
"/mnt/media/music/astr0/nanobii/Rainbow Road/Disk 001/nanobii - Rainbow Road.flac"
)
track = Track(filepath)
track.load_from_metadata()
track.fill_empty()
print(track)
cur = con.cursor()
parent = itf.get_playlist_id(Path("container"), cur)
playlist = Playlist("First try", parent=parent.engine_id)
itf.check_track_in_database(track, cur)
# itf.add_track(track, cur)
# playlist = itf.get_playlist_id(Path("1/1.3"), cur)
# print("playlist", playlist)
uuid = itf.get_database_uuid(cur)
itf.add_playlist(playlist, cur)
itf.add_track_to_playlist(track, playlist, uuid, cur)
con.commit()
# print("Track ID is", track.engine_id)
print("Playlist ID is", playlist.engine_id)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,359 @@
import sqlite3
from pathlib import Path
from hashlib import sha1
from PIL import Image, ImageOps
from io import BytesIO
from datetime import datetime
from dateutil.parser import parse as dateparse
from engine_sync.track import Track
from engine_sync.playlist import Playlist
class EngineInterface:
def __init__(self, engine_prefix_path: Path, junction_point: Path) -> None:
self.engine_prefix_path = engine_prefix_path
self.junction_point = junction_point
def add_track(self, track: Track, cur: sqlite3.Cursor) -> None:
"""
Add a track to the Engine DJ library.
Track ID will be added to the track object
"""
album_art_id = None
if track.cover is not None:
# add the track cover to the database
album_art_id = self.add_album_cover(track.cover, cur)
# get the filename
filename = track.file.name
# generate the path Engine wants to see
fixed_path = self.engine_prefix_path / track.file.relative_to(
self.junction_point
)
# merge the artist string
artist_str = ", ".join(track.artists)
# get the current UNIX timestamp date
current_date = int(datetime.now().timestamp())
# translate data type for explicit lyrics
explicit_int = 1 if track.explicit else 0
# add the track to the database
res = cur.execute(
"""
INSERT INTO Track (
length,
bpm,
year,
path,
filename,
bitrate,
albumArtId,
fileBytes,
title,
artist,
album,
genre,
label,
fileType,
dateCreated,
dateAdded,
explicitLyrics,
lastEditTime,
playOrder,
rating,
albumArt,
isPlayed,
isAnalyzed,
isAvailable,
isMetadataOfPackedTrackChanged,
isPerfomanceDataOfPackedTrackChanged,
isMetadataImported,
pdbImportKey,
streamingFlags
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 0, 'image://planck/0', 0, 0, 1, 0, 0, 1, 0, 0)
RETURNING id
""",
(
track.length,
track.bpm,
track.release_date.year,
str(fixed_path),
filename,
track.bitrate,
album_art_id,
track.file_size,
track.title,
artist_str,
track.album,
track.genre,
track.label,
track.file.suffix[1:],
track.creation_date,
current_date,
explicit_int,
current_date,
),
)
track.engine_id = res.fetchone()[0]
def add_album_cover(self, cover: bytes, cur: sqlite3.Cursor) -> int:
"""
Add an album cover to the track database, skip if already in there.
Return the ID of the column in the database.
"""
# if this is a duplicate, skip it
duplicate, hash, id = self.check_cover_in_database(cover, cur)
if duplicate:
return id
cover_im = Image.open(BytesIO(cover))
# resize the image to 256x256 (Engine requirement)
# keep the aspect ratio
cover_im = ImageOps.contain(cover_im, (256, 256))
to_db_bytes = b""
# convert to PNG
with BytesIO() as f:
cover_im.save(f, format="PNG")
f.seek(0)
to_db_bytes = f.getvalue()
# save to the database and readback ID
res = cur.execute(
"INSERT INTO AlbumArt (hash, albumArt) VALUES (?, ?) RETURNING id",
(
hash,
to_db_bytes,
),
)
# return the ID
return res.fetchone()[0]
def check_cover_in_database(
self, cover: bytes, cur: sqlite3.Cursor
) -> tuple[bool, str, int]:
"""
Check if the given image exists in the album art table.
While the exact way Engine calculates the image hash is unknown,
we can at least de-duplicate with tracks that have been imported
through this tool.
The hash is a simple sha1 sum of the original cover image.
"""
image_hash = sha1(cover).hexdigest()
res = cur.execute("SELECT id FROM AlbumArt WHERE hash=?", (image_hash,))
found = False
id = -1
data = res.fetchone()
if data is not None:
found = True
id = data[0]
return found, image_hash, id
def check_track_in_database(self, track: Track, cur: sqlite3.Cursor) -> bool:
"""
Check if a track is the Engine database / collection.
Track object only needs a file path.
If true is returned, the track object will have the Engine database ID filled.
"""
# generate the path Engine wants to see
fixed_path = self.engine_prefix_path / track.file.relative_to(
self.junction_point
)
res = cur.execute(
"SELECT (id) FROM Track WHERE path=?",
(str(fixed_path),),
)
db_track = res.fetchone()
# check if the track was found
if db_track is None:
return False
track.engine_id = db_track[0]
return True
def get_database_uuid(self, cur: sqlite3.Cursor) -> str:
"""
Get the uuid of the Engine database
"""
res = cur.execute("SELECT uuid FROM Information WHERE id=1").fetchone()
if res is None:
return None
return res[0]
def get_playlist(self, playlist_id: int, cur: sqlite3.Cursor) -> Playlist:
"""
Get a playlist object from an Engine database ID.
"""
res = cur.execute(
"SELECT title, lastEditTime, parentListId FROM Playlist WHERE id=?",
(playlist_id,),
)
pl_db_data = res.fetchone()
if pl_db_data is None:
# something has gone wrong
return None
pl_db_name = pl_db_data[0]
pl_db_last_edit = pl_db_data[1]
pl_db_parent = pl_db_data[2]
playlist = Playlist(pl_db_name, engine_id=playlist_id, parent=pl_db_parent)
playlist.last_edited = dateparse(pl_db_last_edit)
return playlist
def get_playlist_id(self, path: Path, cur: sqlite3.Cursor) -> Playlist:
"""
Get the Engine database ID of a given playlist in the tree.
"""
if len(path.parts) < 1:
print("no parts")
return None
db_path = ";".join(reversed(path.parts))
db_path += ";"
print("looking for path", db_path)
# check if the playlist exists
res = cur.execute(
"SELECT id FROM PlaylistPath WHERE path=?",
(db_path,),
)
pl_db_id = res.fetchone()
if pl_db_id is None:
print("no path")
return None
# clean up the data
pl_db_id = pl_db_id[0]
return self.get_playlist(pl_db_id, cur)
def add_playlist(self, playlist: Playlist, cur: sqlite3.Cursor) -> None:
"""
Add a new playlist to the Engine database
Adds the playlist ID to the playlist object.
"""
# insert the new playlist
res = cur.execute(
"INSERT INTO Playlist (title, parentListId, isPersisted, nextListId, lastEditTime, isExplicitlyExported) VALUES (?, ?, 1, 0, ?, 1) RETURNING id",
(
playlist.name,
playlist.parent,
playlist.last_edited.isoformat(sep=" ", timespec="seconds"),
),
)
playlist.engine_id = res.fetchone()[0]
def add_track_to_playlist(
self,
track: Track,
playlist: Playlist,
database_uuid: str,
cur: sqlite3.Cursor,
true_member: bool = True,
) -> None:
"""
Add a track to a given playlist.
"""
"""
membershipReference:
If the track is not in this playlist but propagates up from one of the child playlists,
the membership reference field represents how many direkt child playlists contain this track.
If the track is direct part of this playlist, the membership reference field is 0.
"""
# check if the track is already in the playlist
res = cur.execute(
"SELECT id, membershipReference FROM PlaylistEntity WHERE listId=? AND trackId=?",
(playlist.engine_id, track.engine_id),
)
entity = res.fetchone()
if entity is None:
# track is not yet in playlist, add it
# if this is caused by propagation, set the membership reference to 1, otherwise 0
mem_ref = 0 if true_member else 1
# insert the track into the playlist
cur.execute(
"INSERT INTO PlaylistEntity (listId, trackId, databaseUuid, nextEntityId, membershipReference) VALUES (?, ?, ?, 0, ?)",
(playlist.engine_id, track.engine_id, database_uuid, mem_ref),
)
# if this is the root level, we're done
if playlist.parent == 0:
return
# if the playlist has a parent, propagate up
parent_pl = self.get_playlist(playlist.parent, cur)
self.add_track_to_playlist(
track, parent_pl, database_uuid, cur, true_member=False
)
return
# track is already in playlist, check if we need to update
entity_id = entity[0]
mem_ref = entity[1]
# if the membership reference is 0, we're done
if mem_ref == 0:
return
mem_ref = 0 if true_member else mem_ref + 1
# we only need to update this level
cur.execute(
"UPDATE PlaylistEntity SET membershipReference=? WHERE id=?",
(mem_ref, entity_id),
)
def check_track_in_playlist(
self, track: Track, playlist: Playlist, cur: sqlite3.Cursor
) -> bool:
"""
Check if a track is in a given playlist.
"""

23
engine_sync/playlist.py Normal file
View file

@ -0,0 +1,23 @@
from datetime import datetime
class Playlist:
def __init__(self, name: str, engine_id: int = None, parent: int = 0):
self.name = name
self.parent = parent
self.engine_id = engine_id
self.last_edited = datetime.now()
def __str__(self):
st = "'" + self.name + "'"
if self.parent != 0:
st += ", parent " + str(self.parent)
if self.engine_id != 0:
st += ", Engine ID " + str(self.engine_id)
st += ", Last edited " + self.last_edited.isoformat(sep=" ", timespec="seconds")
return st

204
engine_sync/track.py Normal file
View file

@ -0,0 +1,204 @@
import math
from pathlib import Path
from mutagen.mp3 import MP3
from mutagen.flac import FLAC
from datetime import datetime
from dateutil.parser import parse as dateparse
class Track:
def __init__(
self,
file: Path,
bitrate: int = None,
file_size: int = None,
creation_date: int = None,
modification_date: int = None,
length: int = None,
title: str = None,
artists: list[str] = None,
album: str = None,
genre: str = None,
bpm: int = None,
release_date: datetime = None,
label: str = None,
cover: bytes = None,
filetype: str = None,
explicit: bool = False,
engine_id: int = None,
):
self.file = file
self.bitrate = bitrate
self.file_size = file_size
self.creation_date = creation_date
self.modification_date = modification_date
self.length = length
self.title = title
self.artists = artists
self.album = album
self.genre = genre
self.bpm = bpm
self.release_date = release_date
self.label = label
self.cover = cover
self.filetype = filetype
self.explicit = explicit
self.engine_id = engine_id
def fill_empty(self):
if self.title is None:
self.title = ""
if self.artists is None:
self.artists = [""]
if self.album is None:
self.album = ""
if self.genre is None:
self.genre = ""
if self.release_date is None:
self.release_date = datetime.now()
if self.label is None:
self.label = ""
if self.explicit is None:
self.explicit = False
def load_from_metadata(self):
"""
Try to fill up non-defined fields from file metadata.
"""
if self.file.suffix == ".flac":
self.load_from_metadata_flac()
def load_from_metadata_flac(self):
"""
Try to fill up non-defined fields from file metadata.
Loads the fields from a FLAC file.
"""
tags = FLAC(self.file)
if self.bitrate is None:
self.bitrate = int(tags.info.bitrate / 1000)
if self.file_size is None:
self.file_size = self.file.stat().st_size
if self.creation_date is None:
self.creation_date = int(self.file.stat().st_ctime)
if self.modification_date is None:
self.modification_date = int(self.file.stat().st_mtime)
if self.length is None:
self.length = int(math.ceil(tags.info.length))
if self.title is None:
try:
self.title = tags["title"][0]
except KeyError:
pass
if self.artists is None:
try:
self.artists = tags["artist"]
except KeyError:
pass
if self.album is None:
try:
self.album = tags["album"][0]
except KeyError:
pass
if self.genre is None:
try:
self.genre = tags["genre"][0]
except KeyError:
pass
if self.bpm is None:
try:
self.bpm = int(tags["bpm"][0])
except KeyError:
pass
if self.release_date is None:
try:
self.release_date = dateparse(tags["date"][0])
except KeyError:
pass
if self.label is None:
try:
self.label = tags["publisher"][0]
except KeyError:
pass
if self.cover is None:
pictures = tags.pictures
if pictures is not None and len(pictures) > 0:
self.cover = tags.pictures[0].data
if self.explicit is None:
try:
self.explicit = bool(tags["itunesadvisory"])
except KeyError:
self.explicit = False
def __str__(self) -> str:
st = (
"'"
+ self.title
+ "' by "
+ ", ".join(self.artists)
+ " in album '"
+ self.album
+ "', released "
+ str(self.release_date.date())
)
if self.label is not None:
st += " by " + self.label + ". "
else:
st += ". "
st += "Song is "
if self.bpm is not None:
st += "at " + str(self.bpm) + " BPM, "
st += "Explicit " + str(self.explicit)
if self.genre is not None:
st += ", Genre " + self.genre
st += (
", Runtime "
+ str(self.length)
+ ", Bitrate "
+ str(self.bitrate)
+ ", File size "
+ str(self.file_size)
+ ", Created "
+ str(self.creation_date)
+ ", Last modified "
+ str(self.modification_date)
)
if self.engine_id is not None:
st += ", Engine ID " + str(self.engine_id)
if self.cover is None:
st += ", No cover."
else:
st += ", Cover attached."
return st