From a30c4467bcae79acaab61042aedc5545744d66d8 Mon Sep 17 00:00:00 2001 From: Brian Lee Date: Sat, 11 Jan 2025 19:13:23 -0800 Subject: [PATCH] Refactor strategies and shared utility functions into separate modules --- src/helpers/SimpleBeatSaverAPI.py | 218 ++++ src/saberlist/make.py | 934 +----------------- src/saberlist/playlist_strategies/accuracy.py | 232 +++++ .../playlist_strategies/beatsaver.py | 308 ++++++ .../playlist_strategies/oldscores.py | 214 ++++ .../playlist_strategies/performance.py | 188 ++++ src/saberlist/utils.py | 133 +++ 7 files changed, 1343 insertions(+), 884 deletions(-) create mode 100644 src/helpers/SimpleBeatSaverAPI.py create mode 100644 src/saberlist/playlist_strategies/accuracy.py create mode 100644 src/saberlist/playlist_strategies/beatsaver.py create mode 100644 src/saberlist/playlist_strategies/oldscores.py create mode 100644 src/saberlist/playlist_strategies/performance.py create mode 100644 src/saberlist/utils.py diff --git a/src/helpers/SimpleBeatSaverAPI.py b/src/helpers/SimpleBeatSaverAPI.py new file mode 100644 index 0000000..d3cd04b --- /dev/null +++ b/src/helpers/SimpleBeatSaverAPI.py @@ -0,0 +1,218 @@ +from datetime import datetime, timedelta +import base64 +import json +import os +import random +import requests +import time +from time import sleep + +import logging +logging.basicConfig( + format='%(asctime)s %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=logging.DEBUG +) + +class SimpleBeatSaverAPI: + BASE_URL = "https://api.beatsaver.com" + + def __init__(self, cache_expiry_days=1): + self.session = requests.Session() + self.cache_expiry_days = cache_expiry_days + self.CACHE_DIR = self._determine_cache_dir() + if not os.path.exists(self.CACHE_DIR): + os.makedirs(self.CACHE_DIR) + + def _determine_cache_dir(self): + home_cache = os.path.expanduser("~/.cache") + saberlist_cache = os.path.join(home_cache, "saberlist") + beatsaver_cache = os.path.join(saberlist_cache, "beatsaver") + + if os.path.exists(home_cache): + if not os.path.exists(saberlist_cache): + try: + os.makedirs(saberlist_cache) + logging.info(f"Created cache directory: {saberlist_cache}") + except OSError as e: + logging.warning(f"Failed to create {saberlist_cache}: {e}") + return os.path.join(os.getcwd(), ".cache") + return beatsaver_cache + else: + logging.info("~/.cache doesn't exist, using local .cache directory") + return os.path.join(os.getcwd(), ".cache") + + def _get_cache_filename(self, player_id): + return os.path.join(self.CACHE_DIR, f"player_{player_id}_scores.json") + + def _is_cache_valid(self, cache_file): + if not os.path.exists(cache_file): + return False + file_modified_time = datetime.fromtimestamp(os.path.getmtime(cache_file)) + return datetime.now() - file_modified_time < timedelta(days=self.cache_expiry_days) + + def clear_cache(self, player_id=None): + if player_id: + cache_file = self._get_cache_filename(player_id) + if os.path.exists(cache_file): + os.remove(cache_file) + logging.debug(f"Cleared cache for player {player_id}") + else: + for file in os.listdir(self.CACHE_DIR): + os.remove(os.path.join(self.CACHE_DIR, file)) + logging.debug("Cleared all cache") + + def get_cache_dir(self): + return self.CACHE_DIR + + + def get_curated_songs(self, use_cache=True) -> list[dict]: + """ + Retrieve curated songs from BeatSaver. + + Note: BeatSaver's sort order behavior is not documented, so we cannot reliably + cache results per page or determine if new songs have been added. Therefore, + we must fetch all pages each time to ensure we have the complete, up-to-date list. + + :param use_cache: Whether to use cached data if available (default: True) + :return: List of dictionaries containing song hash, characteristic, and difficulty + """ + cache_file = os.path.join(self.CACHE_DIR, f"curated_songs.json") + + # Never expire the curated songs cache because it results in a many calls to the API + if use_cache and os.path.exists(cache_file): + logging.debug(f"Using cached data for curated songs") + with open(cache_file, 'r') as f: + return json.load(f) + + processed_songs = [] + page = 0 + + while True: + url = f"{self.BASE_URL}/search/text/{page}" + params = { + "sortOrder": "Curated", + "curated": "true" + } + + try: + response = self.session.get(url, params=params) + response.raise_for_status() + data = response.json() + + # Process the response to extract relevant information + for song in data.get('docs', []): + for version in song.get('versions', []): + song_info = { + 'hash': version['hash'], + 'key': song['id'], + 'songName': song['metadata']['songName'], + } + processed_songs.append(song_info) + + # Check if we've reached the last page + if page >= data['info']['pages'] - 1: + break + + page += 1 + sleep(1) + + except requests.exceptions.RequestException as e: + logging.error(f"Error fetching curated songs: {e}") + return [] + + # Cache the results + with open(cache_file, 'w') as f: + json.dump(processed_songs, f) + + return processed_songs + + def get_followed_mappers(self, user_id: int = 243016, use_cache=True) -> list[dict]: + """ + Retrieve list of mappers followed by a specific user. + + :param user_id: BeatSaver user ID + :param use_cache: Whether to use cached data if available (default: True) + :return: List of dictionaries containing mapper information + """ + cache_file = os.path.join(self.CACHE_DIR, f"followed_mappers_{user_id}.json") + + if use_cache and self._is_cache_valid(cache_file): + logging.debug(f"Using cached data for followed mappers of user {user_id}") + with open(cache_file, 'r') as f: + return json.load(f) + + url = f"{self.BASE_URL}/users/followedBy/{user_id}/0" + + try: + response = self.session.get(url) + response.raise_for_status() + mappers = response.json() + + # Cache the results + with open(cache_file, 'w') as f: + json.dump(mappers, f) + + return mappers + + except requests.exceptions.RequestException as e: + logging.error(f"Error fetching followed mappers: {e}") + return [] + + def get_mapper_maps(self, mapper_id: int, use_cache=True) -> list[dict]: + """ + Retrieve all maps created by a specific mapper. + + :param mapper_id: BeatSaver mapper ID + :param use_cache: Whether to use cached data if available (default: True) + :return: List of dictionaries containing map information + """ + cache_file = os.path.join(self.CACHE_DIR, f"mapper_{mapper_id}_maps.json") + + if use_cache and self._is_cache_valid(cache_file): + logging.debug(f"Using cached data for mapper {mapper_id}") + with open(cache_file, 'r') as f: + return json.load(f) + + processed_songs = [] + page = 0 + + while True: + url = f"{self.BASE_URL}/search/text/{page}" + params = { + 'collaborator': str(mapper_id), + 'automapper': 'true', + 'sortOrder': 'Latest' + } + + try: + response = self.session.get(url, params=params) + response.raise_for_status() + data = response.json() + + # Process the response to extract relevant information + for song in data.get('docs', []): + for version in song.get('versions', []): + song_info = { + 'hash': version['hash'], + 'key': song['id'], + 'songName': song['metadata']['songName'], + } + processed_songs.append(song_info) + + # Page numbering starts at 0 + if page >= data['info']['pages'] - 1: + break + + page += 1 + sleep(1) + + except requests.exceptions.RequestException as e: + logging.error(f"Error fetching mapper maps: {e}") + return [] + + # Cache the results + with open(cache_file, 'w') as f: + json.dump(processed_songs, f) + + return processed_songs diff --git a/src/saberlist/make.py b/src/saberlist/make.py index 9e37f69..1b51fbe 100644 --- a/src/saberlist/make.py +++ b/src/saberlist/make.py @@ -1,19 +1,10 @@ -from collections import defaultdict -from datetime import datetime, timedelta, timezone -from helpers.BeatSaverAPI import BeatSaverAPI -from helpers.SimpleBeatLeaderAPI import SimpleBeatLeaderAPI -from statistics import mean, median -from typing import Dict, Any, List import argparse -import json import logging import os import sys -import math from dotenv import load_dotenv load_dotenv() LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO').upper() -HISTORY_FILE = os.environ.get('HISTORY_FILE', "playlist_history.json") CACHE_EXPIRY_DAYS = int(os.environ.get('CACHE_EXPIRY_DAYS', 7)) import logging @@ -26,780 +17,76 @@ logging.basicConfig( from helpers.PlaylistBuilder import PlaylistBuilder from helpers.ScoreSaberAPI import ScoreSaberAPI from helpers.BeatLeaderAPI import BeatLeaderAPI +from helpers.SimpleBeatLeaderAPI import SimpleBeatLeaderAPI +from helpers.SimpleBeatSaverAPI import SimpleBeatSaverAPI -import calendar +from saberlist.utils import reset_history +from saberlist.playlist_strategies.oldscores import playlist_strategy_beatleader_oldscores, playlist_strategy_scoresaber_oldscores +from saberlist.playlist_strategies.accuracy import playlist_strategy_beatleader_lowest_acc, playlist_strategy_beatleader_accuracy_gaps +from saberlist.playlist_strategies.performance import playlist_strategy_beatleader_lowest_pp, playlist_strategy_scoresaber_lowest_pp +from saberlist.playlist_strategies.beatsaver import playlist_strategy_beatsaver_acc, playlist_strategy_beatsaver_curated, playlist_strategy_beatsaver_mappers -def load_history() -> Dict[str, Any]: +def saberlist() -> None: """ - Load the playlist history from a JSON file. - - :return: A dictionary containing the history. + Generate a playlist of songs using a specified strategy. + Avoids reusing the same song+difficulty in a playlist based on history. """ - if os.path.exists(HISTORY_FILE): - with open(HISTORY_FILE, 'r') as f: - history = json.load(f) - history.setdefault('playlist_counts', {}) - return history - return {'scoresaver': {}, 'playlist_counts': {}} + strategy = get_strategy() -def save_history(history: Dict[str, Any]) -> None: - """ - Save the playlist history to a JSON file. - - :param history: The history dictionary to save. - """ - with open(HISTORY_FILE, 'w') as f: - json.dump(history, f, indent=2) - -def prompt_for_player_id(default_id: str = '76561199407393962') -> str: - """ - Prompt the user to enter a ScoreSaber or BeatLeader player ID. - Uses a default ID if the user presses Enter without input. - - :param default_id: The default player ID to use. - :return: The player ID entered by the user or the default. - """ - prompt = f"Enter player ID (press Enter for default '{default_id}'): " - player_id = input(prompt).strip() or default_id - return player_id - -def format_time_ago(time_difference: timedelta) -> str: - """ - Format a timedelta object into a human-readable string. - - :param time_difference: The time difference to format. - :return: A string representing the time difference (e.g., '5d', '2w'). - """ - days = time_difference.days - - if days < 7: - return f"{days} day(s)" - elif days < 30: - weeks = days // 7 - return f"{weeks} week(s)" - elif days < 365: - months = days // 30 - return f"{months} month(s)" + if strategy == 'scoresaber_oldscores': + playlist_data, playlist_title = playlist_strategy_scoresaber_oldscores(ScoreSaberAPI(cache_expiry_days=CACHE_EXPIRY_DAYS)) + playlist_builder = PlaylistBuilder() + elif strategy == 'beatleader_oldscores': + playlist_data, playlist_title = playlist_strategy_beatleader_oldscores(BeatLeaderAPI(cache_expiry_days=CACHE_EXPIRY_DAYS)) + playlist_builder = PlaylistBuilder() + elif strategy == 'beatsaver_acc': + playlist_data, playlist_title = playlist_strategy_beatsaver_acc() + playlist_builder = PlaylistBuilder(covers_dir='./covers/beatsavers') + elif strategy == 'beatleader_lowest_pp': + playlist_data, playlist_title = playlist_strategy_beatleader_lowest_pp(BeatLeaderAPI(cache_expiry_days=CACHE_EXPIRY_DAYS)) + playlist_builder = PlaylistBuilder(covers_dir='./covers/beatleader') + elif strategy == 'scoresaber_lowest_pp': + playlist_data, playlist_title = playlist_strategy_scoresaber_lowest_pp(ScoreSaberAPI(cache_expiry_days=CACHE_EXPIRY_DAYS)) + playlist_builder = PlaylistBuilder(covers_dir='./covers/scoresaber') + elif strategy == 'beatleader_lowest_acc': + playlist_data, playlist_title = playlist_strategy_beatleader_lowest_acc(BeatLeaderAPI(cache_expiry_days=CACHE_EXPIRY_DAYS)) + playlist_builder = PlaylistBuilder(covers_dir='./covers/kaiju') + elif strategy == 'beatleader_accuracy_gaps': + playlist_data, playlist_title = playlist_strategy_beatleader_accuracy_gaps(SimpleBeatLeaderAPI(cache_expiry_days=CACHE_EXPIRY_DAYS)) + playlist_builder = PlaylistBuilder(covers_dir='./covers/pajamas') + elif strategy == 'beatsaver_curated': + playlist_data, playlist_title = playlist_strategy_beatsaver_curated(SimpleBeatSaverAPI()) + playlist_builder = PlaylistBuilder(covers_dir='./covers/curated') + elif strategy == 'beatsaver_mappers': + playlist_data, playlist_title = playlist_strategy_beatsaver_mappers(SimpleBeatSaverAPI()) + playlist_builder = PlaylistBuilder(covers_dir='./covers/pajamas') else: - years = days // 365 - return f"{years} year(s)" - -def normalize_difficulty_name(difficulty_name): - difficulty_names = { - # ScoreSaber - '_ExpertPlus_SoloStandard': 'expertplus', - '_Expert_SoloStandard': 'expert', - '_Hard_SoloStandard': 'hard', - '_Normal_SoloStandard': 'normal', - '_Easy_SoloStandard': 'easy', - # BeatLeader - 1: 'easy', - 3: 'normal', - 5: 'hard', - 7: 'expert', - 9: 'expertplus', - } - - # Return the mapped value or the original name if there is no mapping - return difficulty_names.get(difficulty_name, difficulty_name) - -"""deprecated in favor of using undocumented api call -def infer_beatleader_leaderboard_id(song_id: str, difficulty: str) -> str: - difficulty_map = { - 'Easy': 1, - 'Normal': 3, - 'Hard': 5, - 'Expert': 7, - 'ExpertPlus': 9, - } - return f"{song_id}{difficulty_map[difficulty]}1" -""" - - -def playlist_strategy_beatleader_accuracy_gaps( - api: SimpleBeatLeaderAPI, - song_count: int = 40, - bin_size: float = 0.25, - bin_sort: bool = False -) -> List[Dict[str, Any]]: - """ - Build a playlist of songs where the player's accuracy is furthest below the median accuracy - for their star rating range. Songs are grouped into bins by star rating to ensure fair comparison. - - :param api: SimpleBeatLeaderAPI instance for making API calls - :param song_count: Number of songs to include in the playlist - :param bin_size: Size of star rating bins for grouping similar difficulty songs - :param bin_sort: Whether to sort the bins by star rating - :return: A tuple containing (list of song dictionaries, playlist title string) - """ - player_id = prompt_for_player_id() - history = load_history() - history.setdefault('beatleader_accuracy_gaps', {}) - history.setdefault('playlist_counts', {}) - - # Get the current count and increment it - count_key = 'beatleader_accuracy_gaps' - current_count = history['playlist_counts'].get(count_key, 0) - new_count = current_count + 1 - history['playlist_counts'][count_key] = new_count - - # Fetch accuracy graph data - all_scores = api.get_player_accgraph(player_id) - if not all_scores: - logging.warning(f"No accgraph data found for player ID {player_id} on BeatLeader.") - return [], "" - logging.debug(f"Found {len(all_scores)} accgraph entries for player ID {player_id} on BeatLeader.") - - # Collect all star ratings - star_ratings = [entry['stars'] for entry in all_scores if entry.get('stars') is not None] - if not star_ratings: - logging.warning("No star ratings found in accgraph data.") - return [], "" - min_stars = min(star_ratings) - max_stars = max(star_ratings) - star_range = max_stars - min_stars - - # Remove the bin size calculation logic - num_bins = math.ceil(star_range / bin_size) - logging.info(f"Using bin size: {bin_size}, resulting in {num_bins} bins.") - - # Group accuracies by bins - bin_to_accuracies = defaultdict(list) - for entry in all_scores: - stars = entry.get('stars') - acc = entry.get('acc') - if stars is not None and acc is not None: - bin_index = int((stars - min_stars) / bin_size) - bin_to_accuracies[bin_index].append(acc) - - # Calculate median accuracy for each bin - bin_to_median = {} - for bin_index, accs in bin_to_accuracies.items(): - bin_to_median[bin_index] = median(accs) - bin_start = min_stars + bin_index * bin_size - bin_end = bin_start + bin_size - logging.debug(f"Median accuracy for bin {bin_index} (stars {bin_start:.2f} to {bin_end:.2f}): {bin_to_median[bin_index]:.4f}") - - # Compute difference from median for each score - for entry in all_scores: - stars = entry.get('stars') - acc = entry.get('acc') - if stars is not None and acc is not None: - bin_index = int((stars - min_stars) / bin_size) - median_acc = bin_to_median.get(bin_index) - if median_acc is not None: - entry['diff_from_median'] = acc - median_acc - else: - entry['diff_from_median'] = float('inf') # Place entries with missing data at the end - else: - entry['diff_from_median'] = float('inf') # Place entries with missing data at the end - - # Sort scores by difference from median (ascending: most below median first) - all_scores.sort(key=lambda x: x.get('diff_from_median', float('inf'))) - - playlist_data = [] - for score_entry in all_scores: - if len(playlist_data) >= song_count: - break - - acc = score_entry.get('acc', 0) - stars = score_entry.get('stars') - song_hash = score_entry.get('hash') - - if not song_hash or stars is None: - logging.debug(f"Skipping entry due to missing hash or stars: {score_entry}") - continue - - # Use stars as a proxy for difficulty; adjust if you have actual difficulty levels - difficulty = score_entry.get('diff', '') - difficulty_characteristic = score_entry.get('mode', 'Standard') - - if song_hash in history['beatleader_accuracy_gaps'] and difficulty in history['beatleader_accuracy_gaps'][song_hash]: - logging.debug(f"Skipping song {song_hash} with difficulty {difficulty} as it's in history.") - continue - - song_dict = { - 'hash': song_hash, - 'difficulties': [ - { - 'name': difficulty, - 'characteristic': difficulty_characteristic - } - ] - } - - playlist_data.append(song_dict) - logging.debug(f"Selected song for playlist: Hash={song_hash}, Difficulty={difficulty}, " - f"Accuracy={acc*100:.2f}%, Diff from Median={score_entry['diff_from_median']*100:.2f}%") - - # Update history - history['beatleader_accuracy_gaps'].setdefault(song_hash, []).append(difficulty) + logging.error(f"Unknown strategy '{strategy}'") + return if not playlist_data: - logging.info("No new songs found to add to the playlist based on history for BeatLeader accuracy gaps.") - else: - for song in playlist_data: - song_hash = song['hash'] - difficulty = song['difficulties'][0]['name'] - logging.info(f"Song added: Hash={song_hash}, Difficulty={difficulty}") - logging.info(f"Total songs added to playlist from BeatLeader accuracy gaps: {len(playlist_data)}") + logging.info("No new scores found to add to the playlist.") + return - save_history(history) - playlist_title = f"accgraph-{new_count:02d}" - - return playlist_data, playlist_title - -def playlist_strategy_scoresaber_oldscores( - api: ScoreSaberAPI, - song_count: int = 20 -) -> List[Dict[str, Any]]: - """Build and format a list of songs based on old scores from ScoreSaber, avoiding reusing the same song+difficulty.""" - - player_id = prompt_for_player_id() - history = load_history() - history.setdefault('scoresaber_oldscores', {}) - history.setdefault('playlist_counts', {}) - - # Get the current count for ScoreSaber old scores and increment it - count_key = 'scoresaber_oldscores' - current_count = history['playlist_counts'].get(count_key, 0) - new_count = current_count + 1 - history['playlist_counts'][count_key] = new_count - - scores_data = api.get_player_scores(player_id, use_cache=True) - all_scores = scores_data.get('playerScores', []) - if not all_scores: - logging.warning(f"No scores found for player ID {player_id}.") - return [], "" - logging.debug(f"Found {len(all_scores)} scores for player ID {player_id}.") - - # Sort scores by timeSet in ascending order (oldest first) - all_scores.sort(key=lambda x: x['score'].get('timeSet', '')) - - playlist_data = [] - current_time = datetime.now(timezone.utc) - - for score in all_scores: - leaderboard = score.get('leaderboard', {}) - song_id = leaderboard.get('songHash') - difficulty_raw = leaderboard.get('difficulty', {}).get('difficultyRaw', '') - - if not song_id or not difficulty_raw: - logging.debug(f"Skipping score due to missing song_id or difficulty_raw: {score}") - continue # Skip if essential data is missing - - # Calculate time ago - time_set_str = score['score'].get('timeSet') - if not time_set_str: - logging.debug(f"Skipping score due to missing timeSet: {score}") - continue # Skip if time_set is missing - try: - time_set = datetime.fromisoformat(time_set_str.replace('Z', '+00:00')) - except ValueError as e: - logging.error(f"Invalid time format for score ID {score['score'].get('id')}: {e}") - continue - time_difference = current_time - time_set - time_ago = format_time_ago(time_difference) - - # Normalize the difficulty name - difficulty = normalize_difficulty_name(difficulty_raw) - game_mode = leaderboard.get('difficulty', {}).get('gameMode', 'Standard') - if 'Standard' in game_mode: - game_mode = 'Standard' - - # Check history to avoid reusing song+difficulty - if song_id in history['scoresaber_oldscores'] and difficulty in history['scoresaber_oldscores'][song_id]: - logging.debug(f"Skipping song {song_id} with difficulty {difficulty} as it's in history.") - continue # Skip if already used - - # Format the song data as expected by PlaylistBuilder - song_dict = { - 'hash': song_id, - 'songName': leaderboard.get('songName', 'Unknown'), - 'difficulties': [ - { - 'name': difficulty, - 'characteristic': game_mode - } - ] - } - - # Add the song to the playlist - playlist_data.append(song_dict) - logging.debug(f"Selected song for playlist: {song_dict['songName']} ({difficulty})") - - # Log the song addition - mapper = "Unknown" # Mapper information can be added if available - logging.info(f"Song added: {song_dict['songName']} ({difficulty}), mapped by {mapper}. Last played {time_ago} ago.") - - # Check if the desired number of songs has been reached - if len(playlist_data) >= song_count: - logging.debug(f"Reached the desired song count: {song_count}.") - break - - # Log if no songs were added - if not playlist_data: - logging.info("No new songs found to add to the playlist based on history.") - else: - logging.info(f"Total songs added to playlist: {len(playlist_data)}") - - # Update history to avoid reusing the same song+difficulty - for song in playlist_data: - song_id = song['hash'] - difficulty_name = song['difficulties'][0]['name'] - history['scoresaber_oldscores'].setdefault(song_id, []).append(difficulty_name) - save_history(history) - - return playlist_data, f"scoresaber_oldscores-{new_count:02d}" - -def playlist_strategy_beatleader_oldscores( - api: BeatLeaderAPI, - song_count: int = 20 -) -> List[Dict[str, Any]]: - player_id = prompt_for_player_id() - history = load_history() - history.setdefault('beatleader_oldscores', {}) - history.setdefault('playlist_counts', {}) - - # Get the current count for BeatLeader old scores and increment it - count_key = 'beatleader_oldscores' - current_count = history['playlist_counts'].get(count_key, 0) - new_count = current_count + 1 - history['playlist_counts'][count_key] = new_count - - scores_data = api.get_player_scores(player_id) - all_scores = scores_data.get('playerScores', []) - if not all_scores: - logging.warning(f"No scores found for player ID {player_id} on BeatLeader.") - return [], "" - logging.debug(f"Found {len(all_scores)} scores for player ID {player_id} on BeatLeader.") - - # Sort scores by epochTime in ascending order (oldest first) - all_scores.sort(key=lambda x: x.get('score', {}).get('epochTime', 0)) - - playlist_data = [] - current_time = datetime.now(timezone.utc) - - for score_entry in all_scores: - if len(playlist_data) >= song_count: - break # Stop if we've reached the desired number of songs - - score = score_entry.get('score', {}) - leaderboard = score_entry.get('leaderboard', {}) - - song_hash = leaderboard.get('songHash') - difficulty_raw = int(leaderboard.get('difficulty', '')) - game_mode = leaderboard.get('modeName', 'Standard') - epoch_time = score.get('epochTime') - - if not song_hash or not difficulty_raw or not epoch_time: - logging.debug(f"Skipping score due to missing song_hash or difficulty_raw: {score_entry}") - continue - - difficulty = normalize_difficulty_name(difficulty_raw) - - # avoid reusing song+difficulty - if song_hash in history['beatleader_oldscores'] and difficulty in history['beatleader_oldscores'][song_hash]: - logging.debug(f"Skipping song {song_hash} with difficulty {difficulty} as it's in history.") - continue # Skip if already used - - # Calculate time ago - try: - time_set = datetime.fromtimestamp(epoch_time, tz=timezone.utc) - except (ValueError, OSError) as e: - logging.error(f"Invalid epochTime for score ID {score.get('id')}: {e}") - continue - time_difference = current_time - time_set - time_ago = format_time_ago(time_difference) - - # Format the song data for PlaylistBuilder - song_dict = { - 'hash': song_hash, - 'difficulties': [ - { - 'name': difficulty, - 'characteristic': game_mode - } - ] - } - - # Add the song to the playlist - playlist_data.append(song_dict) - logging.debug(f"Selected song for playlist: Hash={song_hash}, Difficulty={difficulty}. Last played {time_ago} ago.") - - # Update history - history['beatleader_oldscores'].setdefault(song_hash, []).append(difficulty) - - # Log the final playlist - if not playlist_data: - logging.info("No new songs found to add to the playlist based on history for BeatLeader.") - else: - for song in playlist_data: - song_hash = song['hash'] - difficulty = song['difficulties'][0]['name'] - logging.info(f"Song added: Hash={song_hash}, Difficulty={difficulty}.") - logging.info(f"Total songs added to playlist from BeatLeader: {len(playlist_data)}") - - save_history(history) - - return playlist_data, f"beatleader_oldscores-{new_count:02d}" - -def playlist_strategy_beatleader_lowest_pp( - api: BeatLeaderAPI, - song_count: int = 20 -) -> List[Dict[str, Any]]: - player_id = prompt_for_player_id() - history = load_history() - history.setdefault('beatleader_lowest_pp', {}) - history.setdefault('playlist_counts', {}) - - # Get the current count for BeatLeader lowest PP and increment it - count_key = 'beatleader_lowest_pp' - current_count = history['playlist_counts'].get(count_key, 0) - new_count = current_count + 1 - history['playlist_counts'][count_key] = new_count - - scores_data = api.get_player_scores(player_id) - all_scores = scores_data.get('playerScores', []) - if not all_scores: - logging.warning(f"No scores found for player ID {player_id} on BeatLeader.") - return [], "" - logging.debug(f"Found {len(all_scores)} scores for player ID {player_id} on BeatLeader.") - - # Filter out scores with zero PP and sort by PP in ascending order - ranked_scores = [s for s in all_scores if s.get('score', {}).get('pp', 0) > 0] - ranked_scores.sort(key=lambda x: x.get('score', {}).get('pp', float('inf'))) - - playlist_data = [] - for score_entry in ranked_scores: - if len(playlist_data) >= song_count: - break # Stop if we've reached the desired number of songs - - score = score_entry.get('score', {}) - leaderboard = score_entry.get('leaderboard', {}) - - song_hash = leaderboard.get('songHash') - difficulty_raw = int(leaderboard.get('difficulty', '')) - game_mode = leaderboard.get('modeName', 'Standard') - pp = score.get('pp', 0) - - if not song_hash or not difficulty_raw: - logging.debug(f"Skipping score due to missing song_hash or difficulty_raw: {score_entry}") - continue - - difficulty = normalize_difficulty_name(difficulty_raw) - - # avoid reusing song+difficulty - if song_hash in history['beatleader_lowest_pp'] and difficulty in history['beatleader_lowest_pp'][song_hash]: - logging.debug(f"Skipping song {song_hash} with difficulty {difficulty} as it's in history.") - continue # Skip if already used - - # Format the song data for PlaylistBuilder - song_dict = { - 'hash': song_hash, - 'difficulties': [ - { - 'name': difficulty, - 'characteristic': game_mode - } - ] - } - - # Add the song to the playlist - playlist_data.append(song_dict) - logging.debug(f"Selected song for playlist: Hash={song_hash}, Difficulty={difficulty}, PP={pp:.2f}") - - # Update history - history['beatleader_lowest_pp'].setdefault(song_hash, []).append(difficulty) - - # Log the final playlist - if not playlist_data: - logging.info("No new songs found to add to the playlist based on history for BeatLeader lowest PP.") - else: - for song in playlist_data: - song_hash = song['hash'] - difficulty = song['difficulties'][0]['name'] - logging.info(f"Song added: Hash={song_hash}, Difficulty={difficulty}") - logging.info(f"Total songs added to playlist from BeatLeader lowest PP: {len(playlist_data)}") - - save_history(history) - - return playlist_data, f"beatleader_lowest_pp-{new_count:02d}" - -def playlist_strategy_scoresaber_lowest_pp( - api: ScoreSaberAPI, - song_count: int = 20 -) -> List[Dict[str, Any]]: - """Build and format a list of songs based on lowest PP scores from ScoreSaber, avoiding reusing the same song+difficulty.""" - - player_id = prompt_for_player_id() - history = load_history() - history.setdefault('scoresaber_lowest_pp', {}) - history.setdefault('playlist_counts', {}) - - # Get the current count for ScoreSaber lowest PP and increment it - count_key = 'scoresaber_lowest_pp' - current_count = history['playlist_counts'].get(count_key, 0) - new_count = current_count + 1 - history['playlist_counts'][count_key] = new_count - - scores_data = api.get_player_scores(player_id, use_cache=True) - all_scores = scores_data.get('playerScores', []) - if not all_scores: - logging.warning(f"No scores found for player ID {player_id}.") - return [], "" - logging.debug(f"Found {len(all_scores)} scores for player ID {player_id}.") - - # Filter out scores with zero PP and sort by PP in ascending order - ranked_scores = [s for s in all_scores if s['score'].get('pp', 0) > 0] - ranked_scores.sort(key=lambda x: x['score'].get('pp', float('inf'))) - - playlist_data = [] - - for score in ranked_scores: - leaderboard = score.get('leaderboard', {}) - song_id = leaderboard.get('songHash') - difficulty_raw = leaderboard.get('difficulty', {}).get('difficultyRaw', '') - - if not song_id or not difficulty_raw: - logging.debug(f"Skipping score due to missing song_id or difficulty_raw: {score}") - continue # Skip if essential data is missing - - # Normalize the difficulty name - difficulty = normalize_difficulty_name(difficulty_raw) - game_mode = leaderboard.get('difficulty', {}).get('gameMode', 'Standard') - if 'Standard' in game_mode: - game_mode = 'Standard' - - # Check history to avoid reusing song+difficulty - if song_id in history['scoresaber_lowest_pp'] and difficulty in history['scoresaber_lowest_pp'][song_id]: - logging.debug(f"Skipping song {song_id} with difficulty {difficulty} as it's in history.") - continue # Skip if already used - - # Format the song data as expected by PlaylistBuilder - song_dict = { - 'hash': song_id, - 'songName': leaderboard.get('songName', 'Unknown'), - 'difficulties': [ - { - 'name': difficulty, - 'characteristic': game_mode - } - ] - } - - # Add the song to the playlist - playlist_data.append(song_dict) - pp_score = score['score'].get('pp', 0) - logging.info(f"Song added: {song_dict['songName']} ({difficulty}), PP: {pp_score:.2f}") - - # Check if the desired number of songs has been reached - if len(playlist_data) >= song_count: - logging.debug(f"Reached the desired song count: {song_count}.") - break - - # Log if no songs were added - if not playlist_data: - logging.info("No new songs found to add to the playlist based on history.") - else: - logging.info(f"Total songs added to playlist: {len(playlist_data)}") - - # Update history to avoid reusing the same song+difficulty - for song in playlist_data: - song_id = song['hash'] - difficulty_name = song['difficulties'][0]['name'] - history['scoresaber_lowest_pp'].setdefault(song_id, []).append(difficulty_name) - save_history(history) - - return playlist_data, f"scoresaber_lowest_pp-{new_count:02d}" - -def map_leaders_by_month(month: int = 9, year: int = 2024, game_modes: List[str] = ['Standard']) -> List[Dict]: - """ - Gathers a month's worth of maps using the BeatSaver latest maps endpoint, - prioritizes map difficulties where players have already set good scores, - and calculates the average accuracy for each map+difficulty. - - Args: - month: The month to gather maps for. - year: The year to gather maps for. - game_modes: The game modes to include. - - Returns: - A list of dictionaries, each containing: - - hash: Hash of the map - - difficulties: List of difficulties with their characteristics - - map_name: Name of the map - - average_accuracy: Average accuracy of the leaderboard - """ - beatleader_api = SimpleBeatLeaderAPI(cache_expiry_days=30) - beatsaver_api = BeatSaverAPI(cache_expiry_days=30) - - logging.debug(f"Fetching maps for {month}/{year}") - map_data = beatsaver_api.get_maps(year=year, month=month, page_size=100) - - collected_data = [] - - for i, map_entry in enumerate(map_data): - - # Ensure there are versions available - if not map_entry.versions: - logging.warning(f"No versions found for map: {map_entry.name}") - continue - - latest_version = max(map_entry.versions, key=lambda version: version.created_at) - song_hash = latest_version.hash_ - - for diff in latest_version.diffs: - if diff.characteristic not in game_modes: - continue - - logging.info(f"Getting leaderboard for {i+1}/{len(map_data)}: {map_entry.name}") - leaderboard_data = beatleader_api.get_leaderboard(song_hash, diff.difficulty) - - if not leaderboard_data: - logging.debug(f"No leaderboard data for {map_entry.name} [{diff.difficulty}], skipping") - continue - - # Calculate average accuracy - accuracies = [entry.get('accuracy', 0) for entry in leaderboard_data if 'accuracy' in entry] - if not accuracies: - logging.debug(f"No accuracy data for {map_entry.name} [{diff.difficulty}]") - continue - - avg_accuracy = mean(accuracies) - - collected_data.append({ - 'hash': song_hash, - 'difficulties': [ - { - 'name': diff.difficulty, - 'characteristic': diff.characteristic - } - ], - 'map_name': map_entry.name, - 'average_accuracy': avg_accuracy - }) - - logging.info(f"Collected leaderboards for {len(collected_data)} map+difficulty combinations, orderable by average accuracy of top ten plays for {month}/{year}.") - return collected_data - -def playlist_strategy_scoresaver_acc( - song_count: int = 40 -) -> List[Dict[str, Any]]: - """ - Build and format a list of songs based on the highest average accuracy from recent maps. - Prompts the user for the month and year, defaulting to last month. - Excludes any map that's in the history, regardless of difficulty. - - :param song_count: The number of songs to include in the playlist. Default is 40. - :return: A list of dictionaries containing song information for the playlist. - """ - history = load_history() - history.setdefault('scoresaver', {}) - history.setdefault('playlist_counts', {}) - - # Get last month's date - today = datetime.now() - last_month = today.replace(day=1) - timedelta(days=1) - default_month = last_month.month - default_year = last_month.year - - # Prompt for month and year - while True: - month_input = input(f"Enter month (1-12, default {default_month}): ").strip() or str(default_month) - year_input = input(f"Enter year (default {default_year}): ").strip() or str(default_year) - - try: - month = int(month_input) - year = int(year_input) - if 1 <= month <= 12 and 2000 <= year <= datetime.now().year: - break - else: - print("Invalid month or year. Please try again.") - except ValueError: - print("Invalid input. Please enter numbers only.") - - # Get the current count for highest accuracy and increment it - count_key = f"scoresaver-{year}-{month:02d}" - current_count = history['playlist_counts'].get(count_key, 0) - new_count = current_count + 1 - history['playlist_counts'][count_key] = new_count - - leaderboard_data = map_leaders_by_month(month=month, year=year) - - if not leaderboard_data: - logging.error(f"No map+difficulty data available for {calendar.month_name[month]} {year}.") - return [], "" - - # Sort the data by average_accuracy in descending order - sorted_data = sorted( - leaderboard_data, - key=lambda x: x['average_accuracy'], - reverse=True + playlist_builder.create_playlist( + playlist_data, + playlist_title=playlist_title, + playlist_author="SaberList Tool" ) - playlist_data = [] - for entry in sorted_data: - if len(playlist_data) >= song_count: - break - - song_hash = entry['hash'] - - # Check history to avoid reusing any map, regardless of difficulty - if song_hash in history['scoresaver']: - logging.debug(f"Skipping song {song_hash} as it's in history.") - continue - - playlist_data.append({ - 'hash': song_hash, - 'difficulties': entry['difficulties'], - 'songName': entry['map_name'] - }) - - # Log the song addition - difficulty = entry['difficulties'][0]['name'] - logging.info(f"Song added: {entry['map_name']} ({difficulty}) - Average Accuracy: {entry['average_accuracy'] * 100:.2f}%") - - # Update history (now we're just adding the song hash, not the difficulty) - history['scoresaver'][song_hash] = True - - # Log if no songs were added - if not playlist_data: - logging.info(f"No new songs found to add to the playlist for {calendar.month_name[month]} {year} based on history.") - else: - logging.info(f"Total songs added to playlist: {len(playlist_data)}") - - save_history(history) - - return playlist_data, f"scoresaver-{year}-{month:02d}-{new_count:02d}" - -def reset_history(strategy: str) -> None: - """ - Reset the history for a given playlist strategy. - - :param strategy: The strategy to reset history for. - """ - history = load_history() - if strategy in history: - del history[strategy] - if 'playlist_counts' in history and strategy in history['playlist_counts']: - history['playlist_counts'][strategy] = 0 - save_history(history) - logging.info(f"History and playlist count for '{strategy}' have been reset.") - else: - logging.info(f"No history found for '{strategy}'. Nothing to reset.") - def get_strategy(): parser = argparse.ArgumentParser(description="Generate Beat Saber playlists") parser.add_argument("-s", "--strategy", choices=[ "scoresaber_oldscores", "beatleader_oldscores", - "scoresaver_acc", + "beatsaver_acc", # "beatleader_lowest_pp", # "scoresaber_lowest_pp", # "beatleader_lowest_acc", - "beatleader_accuracy_gaps" + "beatleader_accuracy_gaps", + "beatsaver_curated", + "beatsaver_mappers" ], help="Specify the playlist generation strategy") parser.add_argument("-r", "--reset", @@ -822,124 +109,3 @@ def get_strategy(): parser.error("--strategy is required unless --reset is used") return args.strategy - -def playlist_strategy_beatleader_lowest_acc( - api: BeatLeaderAPI, - song_count: int = 20 -) -> List[Dict[str, Any]]: - player_id = prompt_for_player_id() - history = load_history() - history.setdefault('beatleader_lowest_acc', {}) - history.setdefault('playlist_counts', {}) - """Selects songs with the lowest accuracy, avoiding reusing the same song+difficulty.""" - - # Get the current count and increment it - count_key = 'beatleader_lowest_acc' - current_count = history['playlist_counts'].get(count_key, 0) - new_count = current_count + 1 - history['playlist_counts'][count_key] = new_count - - scores_data = api.get_player_scores(player_id) - all_scores = scores_data.get('playerScores', []) - if not all_scores: - logging.warning(f"No scores found for player ID {player_id} on BeatLeader.") - return [], "" - logging.debug(f"Found {len(all_scores)} scores for player ID {player_id} on BeatLeader.") - - # Sort by accuracy in ascending order (lowest first) - all_scores.sort(key=lambda x: x.get('score', {}).get('accuracy', float('inf'))) - - playlist_data = [] - for score_entry in all_scores: - if len(playlist_data) >= song_count: - break - - score = score_entry.get('score', {}) - leaderboard = score_entry.get('leaderboard', {}) - - song_hash = leaderboard.get('songHash') - difficulty_raw = int(leaderboard.get('difficulty', '')) - game_mode = leaderboard.get('modeName', 'Standard') - accuracy = score.get('accuracy', 0) - - if not song_hash or not difficulty_raw: - logging.debug(f"Skipping score due to missing song_hash or difficulty_raw: {score_entry}") - continue - - difficulty = normalize_difficulty_name(difficulty_raw) - - # avoid reusing song+difficulty - if song_hash in history['beatleader_lowest_acc'] and difficulty in history['beatleader_lowest_acc'][song_hash]: - logging.debug(f"Skipping song {song_hash} with difficulty {difficulty} as it's in history.") - continue - - song_dict = { - 'hash': song_hash, - 'difficulties': [ - { - 'name': difficulty, - 'characteristic': game_mode - } - ] - } - - playlist_data.append(song_dict) - logging.debug(f"Selected song for playlist: Hash={song_hash}, Difficulty={difficulty}, Accuracy={accuracy*100:.2f}%") - - # Update history - history['beatleader_lowest_acc'].setdefault(song_hash, []).append(difficulty) - - if not playlist_data: - logging.info("No new songs found to add to the playlist based on history for BeatLeader lowest accuracy.") - else: - for song in playlist_data: - song_hash = song['hash'] - difficulty = song['difficulties'][0]['name'] - logging.info(f"Song added: Hash={song_hash}, Difficulty={difficulty}") - logging.info(f"Total songs added to playlist from BeatLeader lowest accuracy: {len(playlist_data)}") - - save_history(history) - - return playlist_data, f"beatleader_lowest_acc-{new_count:02d}" - -def saberlist() -> None: - """ - Generate a playlist of songs using a specified strategy. - Avoids reusing the same song+difficulty in a playlist based on history. - """ - strategy = get_strategy() - - if strategy == 'scoresaber_oldscores': - playlist_data, playlist_title = playlist_strategy_scoresaber_oldscores(ScoreSaberAPI(cache_expiry_days=CACHE_EXPIRY_DAYS)) - playlist_builder = PlaylistBuilder() - elif strategy == 'beatleader_oldscores': - playlist_data, playlist_title = playlist_strategy_beatleader_oldscores(BeatLeaderAPI(cache_expiry_days=CACHE_EXPIRY_DAYS)) - playlist_builder = PlaylistBuilder() - elif strategy == 'scoresaver_acc': - playlist_data, playlist_title = playlist_strategy_scoresaver_acc() - playlist_builder = PlaylistBuilder(covers_dir='./covers/scoresavers') - elif strategy == 'beatleader_lowest_pp': - playlist_data, playlist_title = playlist_strategy_beatleader_lowest_pp(BeatLeaderAPI(cache_expiry_days=CACHE_EXPIRY_DAYS)) - playlist_builder = PlaylistBuilder(covers_dir='./covers/beatleader') - elif strategy == 'scoresaber_lowest_pp': - playlist_data, playlist_title = playlist_strategy_scoresaber_lowest_pp(ScoreSaberAPI(cache_expiry_days=CACHE_EXPIRY_DAYS)) - playlist_builder = PlaylistBuilder(covers_dir='./covers/scoresaber') - elif strategy == 'beatleader_lowest_acc': - playlist_data, playlist_title = playlist_strategy_beatleader_lowest_acc(BeatLeaderAPI(cache_expiry_days=CACHE_EXPIRY_DAYS)) - playlist_builder = PlaylistBuilder(covers_dir='./covers/kaiju') - elif strategy == 'beatleader_accuracy_gaps': - playlist_data, playlist_title = playlist_strategy_beatleader_accuracy_gaps(SimpleBeatLeaderAPI(cache_expiry_days=CACHE_EXPIRY_DAYS)) - playlist_builder = PlaylistBuilder(covers_dir='./covers/pajamas') - else: - logging.error(f"Unknown strategy '{strategy}'") - return - - if not playlist_data: - logging.info("No new scores found to add to the playlist.") - return - - playlist_builder.create_playlist( - playlist_data, - playlist_title=playlist_title, - playlist_author="SaberList Tool" - ) diff --git a/src/saberlist/playlist_strategies/accuracy.py b/src/saberlist/playlist_strategies/accuracy.py new file mode 100644 index 0000000..c0191e0 --- /dev/null +++ b/src/saberlist/playlist_strategies/accuracy.py @@ -0,0 +1,232 @@ +from collections import defaultdict +from statistics import median +from typing import Dict, Any, List +import logging +import os +import math +from dotenv import load_dotenv +load_dotenv() +LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO').upper() + +import logging +logging.basicConfig( + format='%(asctime)s %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=LOG_LEVEL +) + +from helpers.SimpleBeatLeaderAPI import SimpleBeatLeaderAPI + +from saberlist.utils import prompt_for_player_id, load_history, save_history, normalize_difficulty_name + +def playlist_strategy_beatleader_accuracy_gaps( + api: SimpleBeatLeaderAPI, + song_count: int = 40, + bin_size: float = 0.25, + bin_sort: bool = False +) -> List[Dict[str, Any]]: + """ + Build a playlist of songs where the player's accuracy is furthest below the median accuracy + for their star rating range. Songs are grouped into bins by star rating to ensure fair comparison. + + :param api: SimpleBeatLeaderAPI instance for making API calls + :param song_count: Number of songs to include in the playlist + :param bin_size: Size of star rating bins for grouping similar difficulty songs + :param bin_sort: Whether to sort the bins by star rating + :return: A tuple containing (list of song dictionaries, playlist title string) + """ + player_id = prompt_for_player_id() + history = load_history() + history.setdefault('beatleader_accuracy_gaps', {}) + history.setdefault('playlist_counts', {}) + + # Get the current count and increment it + count_key = 'beatleader_accuracy_gaps' + current_count = history['playlist_counts'].get(count_key, 0) + new_count = current_count + 1 + history['playlist_counts'][count_key] = new_count + + # Fetch accuracy graph data + all_scores = api.get_player_accgraph(player_id) + if not all_scores: + logging.warning(f"No accgraph data found for player ID {player_id} on BeatLeader.") + return [], "" + logging.debug(f"Found {len(all_scores)} accgraph entries for player ID {player_id} on BeatLeader.") + + # Collect all star ratings + star_ratings = [entry['stars'] for entry in all_scores if entry.get('stars') is not None] + if not star_ratings: + logging.warning("No star ratings found in accgraph data.") + return [], "" + min_stars = min(star_ratings) + max_stars = max(star_ratings) + star_range = max_stars - min_stars + + # Remove the bin size calculation logic + num_bins = math.ceil(star_range / bin_size) + logging.info(f"Using bin size: {bin_size}, resulting in {num_bins} bins.") + + # Group accuracies by bins + bin_to_accuracies = defaultdict(list) + for entry in all_scores: + stars = entry.get('stars') + acc = entry.get('acc') + if stars is not None and acc is not None: + bin_index = int((stars - min_stars) / bin_size) + bin_to_accuracies[bin_index].append(acc) + + # Calculate median accuracy for each bin + bin_to_median = {} + for bin_index, accs in bin_to_accuracies.items(): + bin_to_median[bin_index] = median(accs) + bin_start = min_stars + bin_index * bin_size + bin_end = bin_start + bin_size + logging.debug(f"Median accuracy for bin {bin_index} (stars {bin_start:.2f} to {bin_end:.2f}): {bin_to_median[bin_index]:.4f}") + + # Compute difference from median for each score + for entry in all_scores: + stars = entry.get('stars') + acc = entry.get('acc') + if stars is not None and acc is not None: + bin_index = int((stars - min_stars) / bin_size) + median_acc = bin_to_median.get(bin_index) + if median_acc is not None: + entry['diff_from_median'] = acc - median_acc + else: + entry['diff_from_median'] = float('inf') # Place entries with missing data at the end + else: + entry['diff_from_median'] = float('inf') # Place entries with missing data at the end + + # Sort scores by difference from median (ascending: most below median first) + all_scores.sort(key=lambda x: x.get('diff_from_median', float('inf'))) + + playlist_data = [] + for score_entry in all_scores: + if len(playlist_data) >= song_count: + break + + acc = score_entry.get('acc', 0) + stars = score_entry.get('stars') + song_hash = score_entry.get('hash') + + if not song_hash or stars is None: + logging.debug(f"Skipping entry due to missing hash or stars: {score_entry}") + continue + + # Use stars as a proxy for difficulty; adjust if you have actual difficulty levels + difficulty = score_entry.get('diff', '') + difficulty_characteristic = score_entry.get('mode', 'Standard') + + if song_hash in history['beatleader_accuracy_gaps'] and difficulty in history['beatleader_accuracy_gaps'][song_hash]: + logging.debug(f"Skipping song {song_hash} with difficulty {difficulty} as it's in history.") + continue + + song_dict = { + 'hash': song_hash, + 'difficulties': [ + { + 'name': difficulty, + 'characteristic': difficulty_characteristic + } + ] + } + + playlist_data.append(song_dict) + logging.debug(f"Selected song for playlist: Hash={song_hash}, Difficulty={difficulty}, " + f"Accuracy={acc*100:.2f}%, Diff from Median={score_entry['diff_from_median']*100:.2f}%") + + # Update history + history['beatleader_accuracy_gaps'].setdefault(song_hash, []).append(difficulty) + + if not playlist_data: + logging.info("No new songs found to add to the playlist based on history for BeatLeader accuracy gaps.") + else: + for song in playlist_data: + song_hash = song['hash'] + difficulty = song['difficulties'][0]['name'] + logging.info(f"Song added: Hash={song_hash}, Difficulty={difficulty}") + logging.info(f"Total songs added to playlist from BeatLeader accuracy gaps: {len(playlist_data)}") + + save_history(history) + playlist_title = f"accgraph-{new_count:02d}" + + return playlist_data, playlist_title + +def playlist_strategy_beatleader_lowest_acc( + api: BeatLeaderAPI, + song_count: int = 20 +) -> List[Dict[str, Any]]: + player_id = prompt_for_player_id() + history = load_history() + history.setdefault('beatleader_lowest_acc', {}) + history.setdefault('playlist_counts', {}) + """Selects songs with the lowest accuracy, avoiding reusing the same song+difficulty.""" + + # Get the current count and increment it + count_key = 'beatleader_lowest_acc' + current_count = history['playlist_counts'].get(count_key, 0) + new_count = current_count + 1 + history['playlist_counts'][count_key] = new_count + + scores_data = api.get_player_scores(player_id) + all_scores = scores_data.get('playerScores', []) + if not all_scores: + logging.warning(f"No scores found for player ID {player_id} on BeatLeader.") + return [], "" + logging.debug(f"Found {len(all_scores)} scores for player ID {player_id} on BeatLeader.") + + # Sort by accuracy in ascending order (lowest first) + all_scores.sort(key=lambda x: x.get('score', {}).get('accuracy', float('inf'))) + + playlist_data = [] + for score_entry in all_scores: + if len(playlist_data) >= song_count: + break + + score = score_entry.get('score', {}) + leaderboard = score_entry.get('leaderboard', {}) + + song_hash = leaderboard.get('songHash') + difficulty_raw = int(leaderboard.get('difficulty', '')) + game_mode = leaderboard.get('modeName', 'Standard') + accuracy = score.get('accuracy', 0) + + if not song_hash or not difficulty_raw: + logging.debug(f"Skipping score due to missing song_hash or difficulty_raw: {score_entry}") + continue + + difficulty = normalize_difficulty_name(difficulty_raw) + + # avoid reusing song+difficulty + if song_hash in history['beatleader_lowest_acc'] and difficulty in history['beatleader_lowest_acc'][song_hash]: + logging.debug(f"Skipping song {song_hash} with difficulty {difficulty} as it's in history.") + continue + + song_dict = { + 'hash': song_hash, + 'difficulties': [ + { + 'name': difficulty, + 'characteristic': game_mode + } + ] + } + + playlist_data.append(song_dict) + logging.debug(f"Selected song for playlist: Hash={song_hash}, Difficulty={difficulty}, Accuracy={accuracy*100:.2f}%") + + # Update history + history['beatleader_lowest_acc'].setdefault(song_hash, []).append(difficulty) + + if not playlist_data: + logging.info("No new songs found to add to the playlist based on history for BeatLeader lowest accuracy.") + else: + for song in playlist_data: + song_hash = song['hash'] + difficulty = song['difficulties'][0]['name'] + logging.info(f"Song added: Hash={song_hash}, Difficulty={difficulty}") + logging.info(f"Total songs added to playlist from BeatLeader lowest accuracy: {len(playlist_data)}") + + save_history(history) + + return playlist_data, f"beatleader_lowest_acc-{new_count:02d}" diff --git a/src/saberlist/playlist_strategies/beatsaver.py b/src/saberlist/playlist_strategies/beatsaver.py new file mode 100644 index 0000000..54152f0 --- /dev/null +++ b/src/saberlist/playlist_strategies/beatsaver.py @@ -0,0 +1,308 @@ +from statistics import mean +from typing import Dict, Any, List +import logging +import os +from datetime import datetime, timedelta +import calendar + +from dotenv import load_dotenv +load_dotenv() +LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO').upper() + +import logging +logging.basicConfig( + format='%(asctime)s %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=LOG_LEVEL +) + +from helpers.SimpleBeatSaverAPI import SimpleBeatSaverAPI + +from saberlist.utils import load_history, save_history, prompt_for_mapper_ids + +def playlist_strategy_beatsaver_acc( + song_count: int = 40 +) -> List[Dict[str, Any]]: + """ + Build and format a list of songs based on the highest average accuracy from recent maps. + Prompts the user for the month and year, defaulting to last month. + Excludes any map that's in the history, regardless of difficulty. + + :param song_count: The number of songs to include in the playlist. Default is 40. + :return: A list of dictionaries containing song information for the playlist. + """ + history = load_history() + history.setdefault('beatsaver', {}) + history.setdefault('playlist_counts', {}) + + # Get last month's date + today = datetime.now() + last_month = today.replace(day=1) - timedelta(days=1) + default_month = last_month.month + default_year = last_month.year + + # Prompt for month and year + while True: + month_input = input(f"Enter month (1-12, default {default_month}): ").strip() or str(default_month) + year_input = input(f"Enter year (default {default_year}): ").strip() or str(default_year) + + try: + month = int(month_input) + year = int(year_input) + if 1 <= month <= 12 and 2000 <= year <= datetime.now().year: + break + else: + print("Invalid month or year. Please try again.") + except ValueError: + print("Invalid input. Please enter numbers only.") + + # Get the current count for highest accuracy and increment it + count_key = f"beatsaver-{year}-{month:02d}" + current_count = history['playlist_counts'].get(count_key, 0) + new_count = current_count + 1 + history['playlist_counts'][count_key] = new_count + + leaderboard_data = map_leaders_by_month(month=month, year=year) + + if not leaderboard_data: + logging.error(f"No map+difficulty data available for {calendar.month_name[month]} {year}.") + return [], "" + + # Sort the data by average_accuracy in descending order + sorted_data = sorted( + leaderboard_data, + key=lambda x: x['average_accuracy'], + reverse=True + ) + + playlist_data = [] + for entry in sorted_data: + if len(playlist_data) >= song_count: + break + + song_hash = entry['hash'] + + # Check history to avoid reusing any map, regardless of difficulty + if song_hash in history['beatsaver']: + logging.debug(f"Skipping song {song_hash} as it's in history.") + continue + + playlist_data.append({ + 'hash': song_hash, + 'difficulties': entry['difficulties'], + 'songName': entry['map_name'] + }) + + # Log the song addition + difficulty = entry['difficulties'][0]['name'] + logging.info(f"Song added: {entry['map_name']} ({difficulty}) - Average Accuracy: {entry['average_accuracy'] * 100:.2f}%") + + # Update history (now we're just adding the song hash, not the difficulty) + history['beatsaver'][song_hash] = True + + # Log if no songs were added + if not playlist_data: + logging.info(f"No new songs found to add to the playlist for {calendar.month_name[month]} {year} based on history.") + else: + logging.info(f"Total songs added to playlist: {len(playlist_data)}") + + save_history(history) + + return playlist_data, f"beatsaver-{year}-{month:02d}-{new_count:02d}" + +def playlist_strategy_beatsaver_curated( + api: SimpleBeatSaverAPI, + song_count: int = 50 +) -> List[Dict[str, Any]]: + """ + Build a playlist from BeatSaver's curated songs list. + + :param api: SimpleBeatSaverAPI instance for making API calls + :param song_count: Number of songs to include in the playlist + :return: Tuple of (playlist data, playlist title) + """ + history = load_history() + history.setdefault('beatsaver_curated', {}) + history.setdefault('playlist_counts', {}) + + # Get the current count and increment it + count_key = 'beatsaver_curated' + current_count = history['playlist_counts'].get(count_key, 0) + new_count = current_count + 1 + history['playlist_counts'][count_key] = new_count + + curated_songs = api.get_curated_songs() + if not curated_songs: + raise RuntimeError("Unable to fetch curated songs from BeatSaver.") + logging.info(f"Found {len(curated_songs)} curated songs on BeatSaver.") + + playlist_data = [] + for song in curated_songs: + if len(playlist_data) >= song_count: + break + + song_hash = song.get('hash') + + if song_hash in history['beatsaver_curated']: + logging.debug(f"Skipping song {song_hash} as it's in history") + continue + + playlist_data.append(song) + logging.info(f"Song added: {song['songName']}") + + # Update history + history['beatsaver_curated'][song_hash] = True + + if not playlist_data: + logging.info("No new songs found to add to the playlist based on history") + else: + logging.info(f"Total songs added to playlist: {len(playlist_data)}") + + save_history(history) + playlist_title = f"curated-{new_count:02d}" + + return playlist_data, playlist_title + +def map_leaders_by_month(month: int = 9, year: int = 2024, game_modes: List[str] = ['Standard']) -> List[Dict]: + """ + Gathers a month's worth of maps using the BeatSaver latest maps endpoint, + prioritizes map difficulties where players have already set good scores, + and calculates the average accuracy for each map+difficulty. + + Args: + month: The month to gather maps for. + year: The year to gather maps for. + game_modes: The game modes to include. + + Returns: + A list of dictionaries, each containing: + - hash: Hash of the map + - difficulties: List of difficulties with their characteristics + - map_name: Name of the map + - average_accuracy: Average accuracy of the leaderboard + """ + beatleader_api = SimpleBeatLeaderAPI(cache_expiry_days=30) + beatsaver_api = BeatSaverAPI(cache_expiry_days=30) + + logging.debug(f"Fetching maps for {month}/{year}") + map_data = beatsaver_api.get_maps(year=year, month=month, page_size=100) + + collected_data = [] + + for i, map_entry in enumerate(map_data): + + # Ensure there are versions available + if not map_entry.versions: + logging.warning(f"No versions found for map: {map_entry.name}") + continue + + latest_version = max(map_entry.versions, key=lambda version: version.created_at) + song_hash = latest_version.hash_ + + for diff in latest_version.diffs: + if diff.characteristic not in game_modes: + continue + + logging.info(f"Getting leaderboard for {i+1}/{len(map_data)}: {map_entry.name}") + leaderboard_data = beatleader_api.get_leaderboard(song_hash, diff.difficulty) + + if not leaderboard_data: + logging.debug(f"No leaderboard data for {map_entry.name} [{diff.difficulty}], skipping") + continue + + # Calculate average accuracy + accuracies = [entry.get('accuracy', 0) for entry in leaderboard_data if 'accuracy' in entry] + if not accuracies: + logging.debug(f"No accuracy data for {map_entry.name} [{diff.difficulty}]") + continue + + avg_accuracy = mean(accuracies) + + collected_data.append({ + 'hash': song_hash, + 'difficulties': [ + { + 'name': diff.difficulty, + 'characteristic': diff.characteristic + } + ], + 'map_name': map_entry.name, + 'average_accuracy': avg_accuracy + }) + + logging.info(f"Collected leaderboards for {len(collected_data)} map+difficulty combinations, orderable by average accuracy of top ten plays for {month}/{year}.") + return collected_data + +def playlist_strategy_beatsaver_mappers( + api: SimpleBeatSaverAPI, + song_count: int = 50 +) -> List[Dict[str, Any]]: + """ + Build a playlist from maps created by specified mappers on BeatSaver, + interleaving maps from different mappers for variety. + + :param api: SimpleBeatSaverAPI instance for making API calls + :param song_count: Number of songs to include in the playlist + :return: Tuple of (playlist data, playlist title) + """ + history = load_history() + history.setdefault('beatsaver_mappers', {}) + history.setdefault('playlist_counts', {}) + mapper_ids = prompt_for_mapper_ids() + + # Get the current count and increment it + count_key = 'beatsaver_mappers' + current_count = history['playlist_counts'].get(count_key, 0) + new_count = current_count + 1 + history['playlist_counts'][count_key] = new_count + + # Collect maps by mapper + maps_by_mapper = {} + for mapper_id in mapper_ids: + logging.info(f"Fetching maps for mapper ID: {mapper_id}") + mapper_maps = api.get_mapper_maps(mapper_id=mapper_id, use_cache=False) + + if not mapper_maps: + logging.warning(f"No maps found for mapper ID: {mapper_id}") + continue + + logging.info(f"Found {len(mapper_maps)} maps from mapper ID: {mapper_id}") + + # Filter out maps that are in history + eligible_maps = [ + song for song in mapper_maps + if song.get('hash') not in history['beatsaver_mappers'] + ] + + if eligible_maps: + maps_by_mapper[mapper_id] = eligible_maps + + # Interleave maps from different mappers + playlist_data = [] + while maps_by_mapper and len(playlist_data) < song_count: + for mapper_id in list(maps_by_mapper.keys()): + if not maps_by_mapper[mapper_id]: + del maps_by_mapper[mapper_id] + continue + + song = maps_by_mapper[mapper_id].pop(0) + song_hash = song.get('hash') + + playlist_data.append(song) + logging.info(f"Song added: {song['songName']} by mapper {mapper_id}") + + # Update history + history['beatsaver_mappers'][song_hash] = True + + if len(playlist_data) >= song_count: + break + + if not playlist_data: + logging.info("No new songs found to add to the playlist based on history") + else: + logging.info(f"Total songs added to playlist: {len(playlist_data)}") + + save_history(history) + playlist_title = f"mappers-{new_count:02d}" + + return playlist_data, playlist_title diff --git a/src/saberlist/playlist_strategies/oldscores.py b/src/saberlist/playlist_strategies/oldscores.py new file mode 100644 index 0000000..9cf3672 --- /dev/null +++ b/src/saberlist/playlist_strategies/oldscores.py @@ -0,0 +1,214 @@ +from datetime import datetime, timezone +from typing import Dict, Any, List +import logging +import os +from dotenv import load_dotenv +load_dotenv() +LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO').upper() + +import logging +logging.basicConfig( + format='%(asctime)s %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=LOG_LEVEL +) + +from helpers.ScoreSaberAPI import ScoreSaberAPI +from helpers.BeatLeaderAPI import BeatLeaderAPI + +from saberlist.utils import prompt_for_player_id, load_history, save_history, format_time_ago, normalize_difficulty_name + +def playlist_strategy_beatleader_oldscores( + api: BeatLeaderAPI, + song_count: int = 20 +) -> List[Dict[str, Any]]: + player_id = prompt_for_player_id() + history = load_history() + history.setdefault('beatleader_oldscores', {}) + history.setdefault('playlist_counts', {}) + + # Get the current count for BeatLeader old scores and increment it + count_key = 'beatleader_oldscores' + current_count = history['playlist_counts'].get(count_key, 0) + new_count = current_count + 1 + history['playlist_counts'][count_key] = new_count + + scores_data = api.get_player_scores(player_id) + all_scores = scores_data.get('playerScores', []) + if not all_scores: + logging.warning(f"No scores found for player ID {player_id} on BeatLeader.") + return [], "" + logging.debug(f"Found {len(all_scores)} scores for player ID {player_id} on BeatLeader.") + + # Sort scores by epochTime in ascending order (oldest first) + all_scores.sort(key=lambda x: x.get('score', {}).get('epochTime', 0)) + + playlist_data = [] + current_time = datetime.now(timezone.utc) + + for score_entry in all_scores: + if len(playlist_data) >= song_count: + break # Stop if we've reached the desired number of songs + + score = score_entry.get('score', {}) + leaderboard = score_entry.get('leaderboard', {}) + + song_hash = leaderboard.get('songHash') + difficulty_raw = int(leaderboard.get('difficulty', '')) + game_mode = leaderboard.get('modeName', 'Standard') + epoch_time = score.get('epochTime') + + if not song_hash or not difficulty_raw or not epoch_time: + logging.debug(f"Skipping score due to missing song_hash or difficulty_raw: {score_entry}") + continue + + difficulty = normalize_difficulty_name(difficulty_raw) + + # avoid reusing song+difficulty + if song_hash in history['beatleader_oldscores'] and difficulty in history['beatleader_oldscores'][song_hash]: + logging.debug(f"Skipping song {song_hash} with difficulty {difficulty} as it's in history.") + continue # Skip if already used + + # Calculate time ago + try: + time_set = datetime.fromtimestamp(epoch_time, tz=timezone.utc) + except (ValueError, OSError) as e: + logging.error(f"Invalid epochTime for score ID {score.get('id')}: {e}") + continue + time_difference = current_time - time_set + time_ago = format_time_ago(time_difference) + + # Format the song data for PlaylistBuilder + song_dict = { + 'hash': song_hash, + 'difficulties': [ + { + 'name': difficulty, + 'characteristic': game_mode + } + ] + } + + # Add the song to the playlist + playlist_data.append(song_dict) + logging.debug(f"Selected song for playlist: Hash={song_hash}, Difficulty={difficulty}. Last played {time_ago} ago.") + + # Update history + history['beatleader_oldscores'].setdefault(song_hash, []).append(difficulty) + + # Log the final playlist + if not playlist_data: + logging.info("No new songs found to add to the playlist based on history for BeatLeader.") + else: + for song in playlist_data: + song_hash = song['hash'] + difficulty = song['difficulties'][0]['name'] + logging.info(f"Song added: Hash={song_hash}, Difficulty={difficulty}.") + logging.info(f"Total songs added to playlist from BeatLeader: {len(playlist_data)}") + + save_history(history) + + return playlist_data, f"beatleader_oldscores-{new_count:02d}" + +def playlist_strategy_scoresaber_oldscores( + api: ScoreSaberAPI, + song_count: int = 20 +) -> List[Dict[str, Any]]: + """Build and format a list of songs based on old scores from ScoreSaber, avoiding reusing the same song+difficulty.""" + + player_id = prompt_for_player_id() + history = load_history() + history.setdefault('scoresaber_oldscores', {}) + history.setdefault('playlist_counts', {}) + + # Get the current count for ScoreSaber old scores and increment it + count_key = 'scoresaber_oldscores' + current_count = history['playlist_counts'].get(count_key, 0) + new_count = current_count + 1 + history['playlist_counts'][count_key] = new_count + + scores_data = api.get_player_scores(player_id, use_cache=True) + all_scores = scores_data.get('playerScores', []) + if not all_scores: + logging.warning(f"No scores found for player ID {player_id}.") + return [], "" + logging.debug(f"Found {len(all_scores)} scores for player ID {player_id}.") + + # Sort scores by timeSet in ascending order (oldest first) + all_scores.sort(key=lambda x: x['score'].get('timeSet', '')) + + playlist_data = [] + current_time = datetime.now(timezone.utc) + + for score in all_scores: + leaderboard = score.get('leaderboard', {}) + song_id = leaderboard.get('songHash') + difficulty_raw = leaderboard.get('difficulty', {}).get('difficultyRaw', '') + + if not song_id or not difficulty_raw: + logging.debug(f"Skipping score due to missing song_id or difficulty_raw: {score}") + continue # Skip if essential data is missing + + # Calculate time ago + time_set_str = score['score'].get('timeSet') + if not time_set_str: + logging.debug(f"Skipping score due to missing timeSet: {score}") + continue # Skip if time_set is missing + try: + time_set = datetime.fromisoformat(time_set_str.replace('Z', '+00:00')) + except ValueError as e: + logging.error(f"Invalid time format for score ID {score['score'].get('id')}: {e}") + continue + time_difference = current_time - time_set + time_ago = format_time_ago(time_difference) + + # Normalize the difficulty name + difficulty = normalize_difficulty_name(difficulty_raw) + game_mode = leaderboard.get('difficulty', {}).get('gameMode', 'Standard') + if 'Standard' in game_mode: + game_mode = 'Standard' + + # Check history to avoid reusing song+difficulty + if song_id in history['scoresaber_oldscores'] and difficulty in history['scoresaber_oldscores'][song_id]: + logging.debug(f"Skipping song {song_id} with difficulty {difficulty} as it's in history.") + continue # Skip if already used + + # Format the song data as expected by PlaylistBuilder + song_dict = { + 'hash': song_id, + 'songName': leaderboard.get('songName', 'Unknown'), + 'difficulties': [ + { + 'name': difficulty, + 'characteristic': game_mode + } + ] + } + + # Add the song to the playlist + playlist_data.append(song_dict) + logging.debug(f"Selected song for playlist: {song_dict['songName']} ({difficulty})") + + # Log the song addition + mapper = "Unknown" # Mapper information can be added if available + logging.info(f"Song added: {song_dict['songName']} ({difficulty}), mapped by {mapper}. Last played {time_ago} ago.") + + # Check if the desired number of songs has been reached + if len(playlist_data) >= song_count: + logging.debug(f"Reached the desired song count: {song_count}.") + break + + # Log if no songs were added + if not playlist_data: + logging.info("No new songs found to add to the playlist based on history.") + else: + logging.info(f"Total songs added to playlist: {len(playlist_data)}") + + # Update history to avoid reusing the same song+difficulty + for song in playlist_data: + song_id = song['hash'] + difficulty_name = song['difficulties'][0]['name'] + history['scoresaber_oldscores'].setdefault(song_id, []).append(difficulty_name) + save_history(history) + + return playlist_data, f"scoresaber_oldscores-{new_count:02d}" \ No newline at end of file diff --git a/src/saberlist/playlist_strategies/performance.py b/src/saberlist/playlist_strategies/performance.py new file mode 100644 index 0000000..0a19675 --- /dev/null +++ b/src/saberlist/playlist_strategies/performance.py @@ -0,0 +1,188 @@ +from typing import Dict, Any, List +import argparse +import logging +import os +from dotenv import load_dotenv +load_dotenv() +LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO').upper() + +import logging +logging.basicConfig( + format='%(asctime)s %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=LOG_LEVEL +) + +from helpers.ScoreSaberAPI import ScoreSaberAPI +from helpers.BeatLeaderAPI import BeatLeaderAPI + +from saberlist.utils import prompt_for_player_id, load_history, save_history, normalize_difficulty_name + +def playlist_strategy_beatleader_lowest_pp( + api: BeatLeaderAPI, + song_count: int = 20 +) -> List[Dict[str, Any]]: + player_id = prompt_for_player_id() + history = load_history() + history.setdefault('beatleader_lowest_pp', {}) + history.setdefault('playlist_counts', {}) + + # Get the current count for BeatLeader lowest PP and increment it + count_key = 'beatleader_lowest_pp' + current_count = history['playlist_counts'].get(count_key, 0) + new_count = current_count + 1 + history['playlist_counts'][count_key] = new_count + + scores_data = api.get_player_scores(player_id) + all_scores = scores_data.get('playerScores', []) + if not all_scores: + logging.warning(f"No scores found for player ID {player_id} on BeatLeader.") + return [], "" + logging.debug(f"Found {len(all_scores)} scores for player ID {player_id} on BeatLeader.") + + # Filter out scores with zero PP and sort by PP in ascending order + ranked_scores = [s for s in all_scores if s.get('score', {}).get('pp', 0) > 0] + ranked_scores.sort(key=lambda x: x.get('score', {}).get('pp', float('inf'))) + + playlist_data = [] + for score_entry in ranked_scores: + if len(playlist_data) >= song_count: + break # Stop if we've reached the desired number of songs + + score = score_entry.get('score', {}) + leaderboard = score_entry.get('leaderboard', {}) + + song_hash = leaderboard.get('songHash') + difficulty_raw = int(leaderboard.get('difficulty', '')) + game_mode = leaderboard.get('modeName', 'Standard') + pp = score.get('pp', 0) + + if not song_hash or not difficulty_raw: + logging.debug(f"Skipping score due to missing song_hash or difficulty_raw: {score_entry}") + continue + + difficulty = normalize_difficulty_name(difficulty_raw) + + # avoid reusing song+difficulty + if song_hash in history['beatleader_lowest_pp'] and difficulty in history['beatleader_lowest_pp'][song_hash]: + logging.debug(f"Skipping song {song_hash} with difficulty {difficulty} as it's in history.") + continue # Skip if already used + + # Format the song data for PlaylistBuilder + song_dict = { + 'hash': song_hash, + 'difficulties': [ + { + 'name': difficulty, + 'characteristic': game_mode + } + ] + } + + # Add the song to the playlist + playlist_data.append(song_dict) + logging.debug(f"Selected song for playlist: Hash={song_hash}, Difficulty={difficulty}, PP={pp:.2f}") + + # Update history + history['beatleader_lowest_pp'].setdefault(song_hash, []).append(difficulty) + + # Log the final playlist + if not playlist_data: + logging.info("No new songs found to add to the playlist based on history for BeatLeader lowest PP.") + else: + for song in playlist_data: + song_hash = song['hash'] + difficulty = song['difficulties'][0]['name'] + logging.info(f"Song added: Hash={song_hash}, Difficulty={difficulty}") + logging.info(f"Total songs added to playlist from BeatLeader lowest PP: {len(playlist_data)}") + + save_history(history) + + return playlist_data, f"beatleader_lowest_pp-{new_count:02d}" + +def playlist_strategy_scoresaber_lowest_pp( + api: ScoreSaberAPI, + song_count: int = 20 +) -> List[Dict[str, Any]]: + """Build and format a list of songs based on lowest PP scores from ScoreSaber, avoiding reusing the same song+difficulty.""" + + player_id = prompt_for_player_id() + history = load_history() + history.setdefault('scoresaber_lowest_pp', {}) + history.setdefault('playlist_counts', {}) + + # Get the current count for ScoreSaber lowest PP and increment it + count_key = 'scoresaber_lowest_pp' + current_count = history['playlist_counts'].get(count_key, 0) + new_count = current_count + 1 + history['playlist_counts'][count_key] = new_count + + scores_data = api.get_player_scores(player_id, use_cache=True) + all_scores = scores_data.get('playerScores', []) + if not all_scores: + logging.warning(f"No scores found for player ID {player_id}.") + return [], "" + logging.debug(f"Found {len(all_scores)} scores for player ID {player_id}.") + + # Filter out scores with zero PP and sort by PP in ascending order + ranked_scores = [s for s in all_scores if s['score'].get('pp', 0) > 0] + ranked_scores.sort(key=lambda x: x['score'].get('pp', float('inf'))) + + playlist_data = [] + + for score in ranked_scores: + leaderboard = score.get('leaderboard', {}) + song_id = leaderboard.get('songHash') + difficulty_raw = leaderboard.get('difficulty', {}).get('difficultyRaw', '') + + if not song_id or not difficulty_raw: + logging.debug(f"Skipping score due to missing song_id or difficulty_raw: {score}") + continue # Skip if essential data is missing + + # Normalize the difficulty name + difficulty = normalize_difficulty_name(difficulty_raw) + game_mode = leaderboard.get('difficulty', {}).get('gameMode', 'Standard') + if 'Standard' in game_mode: + game_mode = 'Standard' + + # Check history to avoid reusing song+difficulty + if song_id in history['scoresaber_lowest_pp'] and difficulty in history['scoresaber_lowest_pp'][song_id]: + logging.debug(f"Skipping song {song_id} with difficulty {difficulty} as it's in history.") + continue # Skip if already used + + # Format the song data as expected by PlaylistBuilder + song_dict = { + 'hash': song_id, + 'songName': leaderboard.get('songName', 'Unknown'), + 'difficulties': [ + { + 'name': difficulty, + 'characteristic': game_mode + } + ] + } + + # Add the song to the playlist + playlist_data.append(song_dict) + pp_score = score['score'].get('pp', 0) + logging.info(f"Song added: {song_dict['songName']} ({difficulty}), PP: {pp_score:.2f}") + + # Check if the desired number of songs has been reached + if len(playlist_data) >= song_count: + logging.debug(f"Reached the desired song count: {song_count}.") + break + + # Log if no songs were added + if not playlist_data: + logging.info("No new songs found to add to the playlist based on history.") + else: + logging.info(f"Total songs added to playlist: {len(playlist_data)}") + + # Update history to avoid reusing the same song+difficulty + for song in playlist_data: + song_id = song['hash'] + difficulty_name = song['difficulties'][0]['name'] + history['scoresaber_lowest_pp'].setdefault(song_id, []).append(difficulty_name) + save_history(history) + + return playlist_data, f"scoresaber_lowest_pp-{new_count:02d}" diff --git a/src/saberlist/utils.py b/src/saberlist/utils.py new file mode 100644 index 0000000..8bd3dc9 --- /dev/null +++ b/src/saberlist/utils.py @@ -0,0 +1,133 @@ +from datetime import timedelta +from typing import Dict, Any, List +import json +import logging +import os +from dotenv import load_dotenv +load_dotenv() +LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO').upper() +HISTORY_FILE = os.environ.get('HISTORY_FILE', "playlist_history.json") + +import logging +logging.basicConfig( + format='%(asctime)s %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=LOG_LEVEL +) + +import calendar + +def load_history() -> Dict[str, Any]: + """ + Load the playlist history from a JSON file. + + :return: A dictionary containing the history. + """ + if os.path.exists(HISTORY_FILE): + with open(HISTORY_FILE, 'r') as f: + history = json.load(f) + history.setdefault('playlist_counts', {}) + return history + return {'scoresaver': {}, 'playlist_counts': {}} + +def save_history(history: Dict[str, Any]) -> None: + """ + Save the playlist history to a JSON file. + + :param history: The history dictionary to save. + """ + with open(HISTORY_FILE, 'w') as f: + json.dump(history, f, indent=2) + +def prompt_for_player_id(default_id: str = '76561199407393962') -> str: + """ + Prompt the user to enter a ScoreSaber or BeatLeader player ID. + Uses a default ID if the user presses Enter without input. + + :param default_id: The default player ID to use. + :return: The player ID entered by the user or the default. + """ + prompt = f"Enter player ID (press Enter for default '{default_id}'): " + player_id = input(prompt).strip() or default_id + return player_id + +def prompt_for_mapper_ids() -> List[int]: + default_mapper_ids = [ + 4285547, # Avexus + 4330286, # VoltageO + 4294118, # Spinvvy + 4284542, # PogU (ForsenCDPogU) + 4285738, # Lekrkoekj + 113133 # Cush + ] + prompt = f"Enter mapper IDs (Default: {default_mapper_ids}): " + mapper_ids = input(prompt).strip() or ",".join(map(str, default_mapper_ids)) + return [int(id) for id in mapper_ids.split(',')] + +def format_time_ago(time_difference: timedelta) -> str: + """ + Format a timedelta object into a human-readable string. + + :param time_difference: The time difference to format. + :return: A string representing the time difference (e.g., '5d', '2w'). + """ + days = time_difference.days + + if days < 7: + return f"{days} day(s)" + elif days < 30: + weeks = days // 7 + return f"{weeks} week(s)" + elif days < 365: + months = days // 30 + return f"{months} month(s)" + else: + years = days // 365 + return f"{years} year(s)" + +def normalize_difficulty_name(difficulty_name): + difficulty_names = { + # ScoreSaber + '_ExpertPlus_SoloStandard': 'expertplus', + '_Expert_SoloStandard': 'expert', + '_Hard_SoloStandard': 'hard', + '_Normal_SoloStandard': 'normal', + '_Easy_SoloStandard': 'easy', + # BeatLeader + 1: 'easy', + 3: 'normal', + 5: 'hard', + 7: 'expert', + 9: 'expertplus', + } + + # Return the mapped value or the original name if there is no mapping + return difficulty_names.get(difficulty_name, difficulty_name) + +"""deprecated in favor of using undocumented api call +def infer_beatleader_leaderboard_id(song_id: str, difficulty: str) -> str: + difficulty_map = { + 'Easy': 1, + 'Normal': 3, + 'Hard': 5, + 'Expert': 7, + 'ExpertPlus': 9, + } + return f"{song_id}{difficulty_map[difficulty]}1" +""" + +def reset_history(strategy: str) -> None: + """ + Reset the history for a given playlist strategy. + + :param strategy: The strategy to reset history for. + """ + history = load_history() + if strategy in history: + del history[strategy] + if 'playlist_counts' in history and strategy in history['playlist_counts']: + history['playlist_counts'][strategy] = 0 + save_history(history) + logging.info(f"History and playlist count for '{strategy}' have been reset.") + else: + logging.info(f"No history found for '{strategy}'. Nothing to reset.")