Spotify Listening History Database

by

It has been about a year since I created my first Spotify dashboard. Since then, I have improved my skillset tremendously through both work and self-study. So, with Spotify wrapped just around the corner, I thought, “Why not create my own?” Of course, I wanted to use the new skills (and Raspberry Pi 5) I’d acquired, so I devised the following plan:

  1. Acquire and merge my lifetime Spotify listening history. This was simple and effective.
  2. Set up MariaDB on my Raspberry Pi 5. Naturally, I had to use my SQL skills somehow.
  3. Import my lifetime Spotify listening history to the DB. What would my Spotify project be without it? This took far too long the first time, so I hoped I could complete it flawlessly this time.
  4. Utilise the Spotify Web API to update the DB in real-time. …Okay maybe not real-time. But for my purposes every 15 minutes worked just fine.
  5. Utilise the Discogs API to fetch genres. Since the Spotify listening history doesn’t include genres, and the Spotify Web API only has genres of artists — not each of their tracks/albums.
  6. Success! Genuinely. This was so much smoother than the first time. And, in my opinion, it came out much, much better.

Merging Spotify History

For those of you who don’t know, you can request your Spotify listening history in the Privacy section of user settings on the browser version of Spotify. There are several data request options, but the one I needed for this project was the ‘Extended Streaming History.’ It can take up to 30 days to arrive in your inbox, so I was pleasantly surprised when, only 3 days later, I received the email. Unfortunately, the data comes in many (depending on how much history you have) different .json files. Previously, I used an online .json merger and opened the merged file in Excel. But this time was different. I had Python in my arsenal. So, I wrote a basic script iterating through all .json files in the given folder, and merging them. The code I used is as follows:

Code
import pandas as pd
import os

def merge_json_files(folder_path, output_file):
merged_data = pd.DataFrame()
for file in os.listdir(folder_path):
    if file.endswith('.json'):
        file_path = os.path.join(folder_path, file)
        try:
            # Read and merge data
            data = pd.read_json(file_path, encoding="utf8")
            merged_data = pd.concat([merged_data, data], ignore_index=True)
        except ValueError as e:
            print(f"Error reading {file}: {e}")

merged_data.to_csv(output_file, encoding='utf8', index=False)
print(f"Merged data saved to {output_file}")

folder_path = r'C:\Users\dwind\Desktop\Spotify1'
output_file = os.path.join(folder_path, 'spotify_history.csv')
merge_json_files(folder_path, output_file)

It was simple, but it worked an absolute treat! So, I had my lifetime Spotify listening history in one place: The aptly named ‘spotify_history.csv’.

Setting Up MariaDB And Importing Data

This was, by far, the most straightforward part of the process, and — to be quite honest — there isn’t a whole lot to tell about it. I set up my Raspberry Pi 5, booting up Raspbian from a micro SD card. I will talk about my Raspberry Pi 5 in a separate post, so — to keep a long story short — I did the bog-standard setup procedure and installed MariaDB. I altered the config to suit my needs and ensured it starts up every time my RPi5 boots (merely a convenience thing).

Once this was done, I created the DB and table I needed, matching the desired columns from my Spotify listening history. In this case, these were ms_played (time listened to per track), track_name, album_name, and artist_name; the essentials, basically. Once the server and database were set up (which I did via the terminal), I switched to a database visualiser, in this case, DBeaver. I created a table (music) and included the following columns:

id INT PRIMARY KEY 
ts TIMESTAMP NOT NULL
ms_played INT NOT NULL
track_name VARCHAR(255) NOT NULL
artist_name VARCHAR(255) 
album_name VARCHAR(255)
release_date YEAR(4)
genres VARCHAR(255)

The id column had to be generated, and the release_date and genres columns had no data directly from Spotify (the APIs would come in with that later). The final thing to do was adjust the spotify_history.csv timestamp to suit what I wanted in the table. The default format is 2022-11-04T03:11:44Z. I simply adjusted it in Excel using the ‘Custom Format’ feature and imported the file.

And it worked.

First time.

Utilising Spotify’s Web API

While it’s possible to request your lifetime listening history from Spotify, this becomes incredibly impractical if you wish to work with updated data regularly. I was at a slight loss as to what I could do. I knew it was possible to keep track of my data in real-time because of apps such as stats.fm (fantastic app, by the way), but I wasn’t entirely sure how I could do it. Then it hit me… Spotify’s Web API!

Previously, I’d not had much experience working with APIs, but I knew this was what I wanted, so I began researching what and how I could do it. I only really had one condition: It could update my database regularly with my recent listening history. I knew this was possible by having a script run regularly on my Raspberry Pi 5 using cron. I also knew it was possible to update the database using Python. Finally, I knew it was possible to collate data from Spotify. It was just a matter of bringing it all together. After a lot of trial and error, I finally came up with the following script*:

Code

from flask import Flask, redirect, request, session, url_for
from spotipy import Spotify
from spotipy.oauth2 import SpotifyOAuth
import mysql.connector
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
import json
import os

app = Flask(__name__)

DB_CONFIG = {
    'host': 192.168.1.174, 
    'user': [my user],  
    'password': [my password],  
    'database': 'spotify', 
    'port': 3306  
}

SPOTIPY_CLIENT_ID = [my client id]
SPOTIPY_CLIENT_SECRET = [my api key]
SPOTIPY_REDIRECT_URI = 'http://192.168.1.174:5000/callback'

TOKEN_FILE = 'token_info.json'

sp_oauth = SpotifyOAuth(client_id=SPOTIPY_CLIENT_ID,
                         client_secret=SPOTIPY_CLIENT_SECRET,
                         redirect_uri=SPOTIPY_REDIRECT_URI,
                         scope='user-library-read user-read-playback-state user-read-recently-played')

def save_token_info(token_info):
    with open(TOKEN_FILE, 'w') as f:
        json.dump(token_info, f)


def load_token_info():
    if os.path.exists(TOKEN_FILE):
        with open(TOKEN_FILE, 'r') as f:
            return json.load(f)
    return None


def get_spotify_client():
    token_info = load_token_info()
    if not token_info:
        return None
    if sp_oauth.is_token_expired(token_info):
        token_info = sp_oauth.refresh_access_token(token_info['refresh_token'])
        save_token_info(token_info)
    return Spotify(auth=token_info['access_token'])


@app.route('/')
def index():
    token_info = load_token_info()
    if not token_info:
        return redirect(url_for('login'))

    sp = get_spotify_client()
    if not sp:
        return redirect(url_for('login'))

    results = sp.current_user_playing_track()

    if results:
        track_name = results['item']['name']
        artist_name = results['item']['artists'][0]['name']
        artist_id = results['item']['artists'][0]['id']


        return f'Currently playing: {track_name} by {artist_name}.'
    return 'No track is currently playing.'

@app.route('/login')
def login():
    auth_url = sp_oauth.get_authorize_url()
    return redirect(auth_url)

@app.route('/callback')
def callback():
    token_info = sp_oauth.get_access_token(request.args['code'])
    save_token_info(token_info)
    return redirect(url_for('index'))

def convert_timestamp(iso_timestamp):
    try:
        dt = datetime.strptime(iso_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")
    except ValueError:
        dt = datetime.strptime(iso_timestamp, "%Y-%m-%dT%H:%M:%SZ")
    return dt.strftime("%Y-%m-%d %H:%M:%S")

def fetch_recently_played(sp):
    results = sp.current_user_recently_played(limit=50)
    tracks = []
    for item in results['items']:
        track = item['track']
        played_at = convert_timestamp(item['played_at'])

        tracks.append({
            'ts': played_at,
            'ms_played': track['duration_ms'],
            'track_name': track['name'],
            'artist_name': track['artists'][0]['name'],
            'album_name': track['album']['name'],
        })
    return tracks

def insert_into_database(tracks):
    cursor = None
    try:
        conn = mysql.connector.connect(**DB_CONFIG)
        cursor = conn.cursor()

        for track in tracks:
            cursor.execute("""
                INSERT INTO music (
                ts, ms_played, track_name, artist_name, 
                album_name
                )
                VALUES (%s, %s, %s, %s, %s, %s, %s)
            """, (
                track['ts'], track['ms_played'],
                track['track_name'], track['artist_name'],
                track['album_name']
            ))
        conn.commit()
    except mysql.connector.Error as err:
        print(f"Error: {err}")
    finally:
        if cursor:
            cursor.close()
        if conn:
            conn.close()

def fetch_and_insert():
    sp = get_spotify_client()
    if not sp:
        print("Spotify client could not be initialized.")
        return

    recently_played_tracks = fetch_recently_played(sp)
    insert_into_database(recently_played_tracks)
    
@app.route('/fetch_and_insert')
def fetch_and_insert_route():
    fetch_and_insert()
    return "Fetch and insert operation completed successfully."


if __name__ == "__main__":
    # Start the scheduler
    scheduler = BackgroundScheduler()
    scheduler.add_job(fetch_and_insert, 'interval', minutes=15)
    scheduler.start()

    try:
        app.run(host='0.0.0.0', port=5000, debug=True)
    except KeyboardInterrupt:
        scheduler.shutdown()

* Please note that the credentials are not included for privacy reasons.

Utilising Discogs’ API

Having my Spotify history was one thing — but what is this information without track details? Sure, the Spotify Web API does include genres, but they’re on a per-artist basis as opposed to per-track or per-album. I needed a solution. Luckily, because of my interest in collecting vinyl records, I learned about Discogs. To sum Discogs up in a single sentence: Details of every track of every album released by every artist ever. Okay, maybe that’s a bit of a stretch, but you get the point; It was perfect.

I simply had to fetch the artist and album names from my database, and run an UPDATE statement based on the conditions of the respective artist and album names. Simple… right? Indeed it was! At this point I realised that including a column of the year each album was release would also be a great idea for additional insights into my listening habits. So, I did. This is the code I used:

Code
import mysql.connector
import requests
import time

DISCOGS_API_TOKEN = [my api key]

db_config = {
    "host": "192.168.1.174",
    "user": [my user],  
    "password": [my password],  
    "database": "spotify"  
}

def connect_to_database():
    return mysql.connector.connect(**db_config)

def fetch_empty_rows(cursor):
    query = """
        SELECT artist_name, album_name
        FROM music
        WHERE genres IS NULL OR genres = '' OR release_date IS NULL;
    """
    cursor.execute(query)
    return cursor.fetchall()

def get_album_data_from_discogs(artist_name, album_name):
    base_url = "https://api.discogs.com/database/search"
    headers = {"User-Agent": "SpotifyHistoryGenreUpdater/1.0"}
    params = {
        "artist": artist_name,
        "release_title": album_name,
        "token": DISCOGS_API_TOKEN
    }

    response = requests.get(base_url, headers=headers, params=params)

    if response.status_code == 200:
        data = response.json()
        print(f"API response for {artist_name} - {album_name}: {data}")  # Debugging

        if "results" in data and data["results"]:
            result = data["results"][0]  
            genres = result.get("genre", [])
            styles = result.get("style", [])
            release_date = result.get("year") 
            return genres + styles, release_date
        else:
            print(f"No results found for {artist_name} - {album_name}")
    else:
        print(f"Error: Received status code {response.status_code} for {artist_name} - {album_name}")
    return None, None

def update_album_data(cursor, db, album_name, genres, release_date):
    if not album_name or not album_name.strip():
        print("Error: Album name is missing or invalid!")
        return

    genre_string = ", ".join(genres) if genres else "Unknown"
    genre_string = genre_string.replace("%", "%%")  # Escape % if present

    query = """
        UPDATE music
        SET genres = %s, release_date = %s
        WHERE album_name = %s;
    """
    params = (genre_string, release_date, album_name.strip())

    print(f"Query: {query}")
    print(f"Params: {params}")

    try:
        cursor.execute(query, params)
        db.commit()
        print(f"Updated data for album: {album_name}")
    except Exception as e:
        print(f"Error updating data: {e}")

def main():
    db = connect_to_database()
    cursor = db.cursor()

    rows = fetch_empty_rows(cursor)
    print(f"Found {len(rows)} rows with empty genres or release dates.")

    for row in rows:
        artist_name, album_name = row
        print(f"Processing: {artist_name} - {album_name}")

        genres, release_date = get_album_data_from_discogs(artist_name, album_name)
        if genres:
            print(f"Genres found: {'; '.join(genres)}")
        else:
            print("No genres found.")

        if release_date:
            print(f"Release date found: {release_date}")
        else:
            print("No release date found.")

        update_album_data(cursor, db, album_name, genres, release_date)

        time.sleep(1)


    cursor.close()
    db.close()
    print("Data updated successfully.")

if __name__ == "__main__":
    main()

Mission: Complete!

As you may have gathered, this was very much a ‘backend’-style project, but I really wanted to use the new skills I’d acquired over the past year to see what I could accomplish with my Spotify data. And, in my opinion, it turned out great! I now have a Spotify listening history that regularly updates with what I’ve been listening to. Mission: Complete!