Refactor strategies and shared utility functions into separate modules

This commit is contained in:
Brian Lee 2025-01-11 19:13:23 -08:00
parent 7385f6b146
commit a30c4467bc
7 changed files with 1343 additions and 884 deletions

View File

@ -0,0 +1,218 @@
from datetime import datetime, timedelta
import base64
import json
import os
import random
import requests
import time
from time import sleep
import logging
logging.basicConfig(
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.DEBUG
)
class SimpleBeatSaverAPI:
BASE_URL = "https://api.beatsaver.com"
def __init__(self, cache_expiry_days=1):
self.session = requests.Session()
self.cache_expiry_days = cache_expiry_days
self.CACHE_DIR = self._determine_cache_dir()
if not os.path.exists(self.CACHE_DIR):
os.makedirs(self.CACHE_DIR)
def _determine_cache_dir(self):
home_cache = os.path.expanduser("~/.cache")
saberlist_cache = os.path.join(home_cache, "saberlist")
beatsaver_cache = os.path.join(saberlist_cache, "beatsaver")
if os.path.exists(home_cache):
if not os.path.exists(saberlist_cache):
try:
os.makedirs(saberlist_cache)
logging.info(f"Created cache directory: {saberlist_cache}")
except OSError as e:
logging.warning(f"Failed to create {saberlist_cache}: {e}")
return os.path.join(os.getcwd(), ".cache")
return beatsaver_cache
else:
logging.info("~/.cache doesn't exist, using local .cache directory")
return os.path.join(os.getcwd(), ".cache")
def _get_cache_filename(self, player_id):
return os.path.join(self.CACHE_DIR, f"player_{player_id}_scores.json")
def _is_cache_valid(self, cache_file):
if not os.path.exists(cache_file):
return False
file_modified_time = datetime.fromtimestamp(os.path.getmtime(cache_file))
return datetime.now() - file_modified_time < timedelta(days=self.cache_expiry_days)
def clear_cache(self, player_id=None):
if player_id:
cache_file = self._get_cache_filename(player_id)
if os.path.exists(cache_file):
os.remove(cache_file)
logging.debug(f"Cleared cache for player {player_id}")
else:
for file in os.listdir(self.CACHE_DIR):
os.remove(os.path.join(self.CACHE_DIR, file))
logging.debug("Cleared all cache")
def get_cache_dir(self):
return self.CACHE_DIR
def get_curated_songs(self, use_cache=True) -> list[dict]:
"""
Retrieve curated songs from BeatSaver.
Note: BeatSaver's sort order behavior is not documented, so we cannot reliably
cache results per page or determine if new songs have been added. Therefore,
we must fetch all pages each time to ensure we have the complete, up-to-date list.
:param use_cache: Whether to use cached data if available (default: True)
:return: List of dictionaries containing song hash, characteristic, and difficulty
"""
cache_file = os.path.join(self.CACHE_DIR, f"curated_songs.json")
# Never expire the curated songs cache because it results in a many calls to the API
if use_cache and os.path.exists(cache_file):
logging.debug(f"Using cached data for curated songs")
with open(cache_file, 'r') as f:
return json.load(f)
processed_songs = []
page = 0
while True:
url = f"{self.BASE_URL}/search/text/{page}"
params = {
"sortOrder": "Curated",
"curated": "true"
}
try:
response = self.session.get(url, params=params)
response.raise_for_status()
data = response.json()
# Process the response to extract relevant information
for song in data.get('docs', []):
for version in song.get('versions', []):
song_info = {
'hash': version['hash'],
'key': song['id'],
'songName': song['metadata']['songName'],
}
processed_songs.append(song_info)
# Check if we've reached the last page
if page >= data['info']['pages'] - 1:
break
page += 1
sleep(1)
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching curated songs: {e}")
return []
# Cache the results
with open(cache_file, 'w') as f:
json.dump(processed_songs, f)
return processed_songs
def get_followed_mappers(self, user_id: int = 243016, use_cache=True) -> list[dict]:
"""
Retrieve list of mappers followed by a specific user.
:param user_id: BeatSaver user ID
:param use_cache: Whether to use cached data if available (default: True)
:return: List of dictionaries containing mapper information
"""
cache_file = os.path.join(self.CACHE_DIR, f"followed_mappers_{user_id}.json")
if use_cache and self._is_cache_valid(cache_file):
logging.debug(f"Using cached data for followed mappers of user {user_id}")
with open(cache_file, 'r') as f:
return json.load(f)
url = f"{self.BASE_URL}/users/followedBy/{user_id}/0"
try:
response = self.session.get(url)
response.raise_for_status()
mappers = response.json()
# Cache the results
with open(cache_file, 'w') as f:
json.dump(mappers, f)
return mappers
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching followed mappers: {e}")
return []
def get_mapper_maps(self, mapper_id: int, use_cache=True) -> list[dict]:
"""
Retrieve all maps created by a specific mapper.
:param mapper_id: BeatSaver mapper ID
:param use_cache: Whether to use cached data if available (default: True)
:return: List of dictionaries containing map information
"""
cache_file = os.path.join(self.CACHE_DIR, f"mapper_{mapper_id}_maps.json")
if use_cache and self._is_cache_valid(cache_file):
logging.debug(f"Using cached data for mapper {mapper_id}")
with open(cache_file, 'r') as f:
return json.load(f)
processed_songs = []
page = 0
while True:
url = f"{self.BASE_URL}/search/text/{page}"
params = {
'collaborator': str(mapper_id),
'automapper': 'true',
'sortOrder': 'Latest'
}
try:
response = self.session.get(url, params=params)
response.raise_for_status()
data = response.json()
# Process the response to extract relevant information
for song in data.get('docs', []):
for version in song.get('versions', []):
song_info = {
'hash': version['hash'],
'key': song['id'],
'songName': song['metadata']['songName'],
}
processed_songs.append(song_info)
# Page numbering starts at 0
if page >= data['info']['pages'] - 1:
break
page += 1
sleep(1)
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching mapper maps: {e}")
return []
# Cache the results
with open(cache_file, 'w') as f:
json.dump(processed_songs, f)
return processed_songs

View File

@ -1,19 +1,10 @@
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from helpers.BeatSaverAPI import BeatSaverAPI
from helpers.SimpleBeatLeaderAPI import SimpleBeatLeaderAPI
from statistics import mean, median
from typing import Dict, Any, List
import argparse import argparse
import json
import logging import logging
import os import os
import sys import sys
import math
from dotenv import load_dotenv from dotenv import load_dotenv
load_dotenv() load_dotenv()
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO').upper() LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO').upper()
HISTORY_FILE = os.environ.get('HISTORY_FILE', "playlist_history.json")
CACHE_EXPIRY_DAYS = int(os.environ.get('CACHE_EXPIRY_DAYS', 7)) CACHE_EXPIRY_DAYS = int(os.environ.get('CACHE_EXPIRY_DAYS', 7))
import logging import logging
@ -26,780 +17,76 @@ logging.basicConfig(
from helpers.PlaylistBuilder import PlaylistBuilder from helpers.PlaylistBuilder import PlaylistBuilder
from helpers.ScoreSaberAPI import ScoreSaberAPI from helpers.ScoreSaberAPI import ScoreSaberAPI
from helpers.BeatLeaderAPI import BeatLeaderAPI from helpers.BeatLeaderAPI import BeatLeaderAPI
from helpers.SimpleBeatLeaderAPI import SimpleBeatLeaderAPI
from helpers.SimpleBeatSaverAPI import SimpleBeatSaverAPI
import calendar from saberlist.utils import reset_history
from saberlist.playlist_strategies.oldscores import playlist_strategy_beatleader_oldscores, playlist_strategy_scoresaber_oldscores
from saberlist.playlist_strategies.accuracy import playlist_strategy_beatleader_lowest_acc, playlist_strategy_beatleader_accuracy_gaps
from saberlist.playlist_strategies.performance import playlist_strategy_beatleader_lowest_pp, playlist_strategy_scoresaber_lowest_pp
from saberlist.playlist_strategies.beatsaver import playlist_strategy_beatsaver_acc, playlist_strategy_beatsaver_curated, playlist_strategy_beatsaver_mappers
def load_history() -> Dict[str, Any]: def saberlist() -> None:
""" """
Load the playlist history from a JSON file. Generate a playlist of songs using a specified strategy.
Avoids reusing the same song+difficulty in a playlist based on history.
:return: A dictionary containing the history.
""" """
if os.path.exists(HISTORY_FILE): strategy = get_strategy()
with open(HISTORY_FILE, 'r') as f:
history = json.load(f)
history.setdefault('playlist_counts', {})
return history
return {'scoresaver': {}, 'playlist_counts': {}}
def save_history(history: Dict[str, Any]) -> None: if strategy == 'scoresaber_oldscores':
""" playlist_data, playlist_title = playlist_strategy_scoresaber_oldscores(ScoreSaberAPI(cache_expiry_days=CACHE_EXPIRY_DAYS))
Save the playlist history to a JSON file. playlist_builder = PlaylistBuilder()
elif strategy == 'beatleader_oldscores':
:param history: The history dictionary to save. playlist_data, playlist_title = playlist_strategy_beatleader_oldscores(BeatLeaderAPI(cache_expiry_days=CACHE_EXPIRY_DAYS))
""" playlist_builder = PlaylistBuilder()
with open(HISTORY_FILE, 'w') as f: elif strategy == 'beatsaver_acc':
json.dump(history, f, indent=2) playlist_data, playlist_title = playlist_strategy_beatsaver_acc()
playlist_builder = PlaylistBuilder(covers_dir='./covers/beatsavers')
def prompt_for_player_id(default_id: str = '76561199407393962') -> str: elif strategy == 'beatleader_lowest_pp':
""" playlist_data, playlist_title = playlist_strategy_beatleader_lowest_pp(BeatLeaderAPI(cache_expiry_days=CACHE_EXPIRY_DAYS))
Prompt the user to enter a ScoreSaber or BeatLeader player ID. playlist_builder = PlaylistBuilder(covers_dir='./covers/beatleader')
Uses a default ID if the user presses Enter without input. elif strategy == 'scoresaber_lowest_pp':
playlist_data, playlist_title = playlist_strategy_scoresaber_lowest_pp(ScoreSaberAPI(cache_expiry_days=CACHE_EXPIRY_DAYS))
:param default_id: The default player ID to use. playlist_builder = PlaylistBuilder(covers_dir='./covers/scoresaber')
:return: The player ID entered by the user or the default. elif strategy == 'beatleader_lowest_acc':
""" playlist_data, playlist_title = playlist_strategy_beatleader_lowest_acc(BeatLeaderAPI(cache_expiry_days=CACHE_EXPIRY_DAYS))
prompt = f"Enter player ID (press Enter for default '{default_id}'): " playlist_builder = PlaylistBuilder(covers_dir='./covers/kaiju')
player_id = input(prompt).strip() or default_id elif strategy == 'beatleader_accuracy_gaps':
return player_id playlist_data, playlist_title = playlist_strategy_beatleader_accuracy_gaps(SimpleBeatLeaderAPI(cache_expiry_days=CACHE_EXPIRY_DAYS))
playlist_builder = PlaylistBuilder(covers_dir='./covers/pajamas')
def format_time_ago(time_difference: timedelta) -> str: elif strategy == 'beatsaver_curated':
""" playlist_data, playlist_title = playlist_strategy_beatsaver_curated(SimpleBeatSaverAPI())
Format a timedelta object into a human-readable string. playlist_builder = PlaylistBuilder(covers_dir='./covers/curated')
elif strategy == 'beatsaver_mappers':
:param time_difference: The time difference to format. playlist_data, playlist_title = playlist_strategy_beatsaver_mappers(SimpleBeatSaverAPI())
:return: A string representing the time difference (e.g., '5d', '2w'). playlist_builder = PlaylistBuilder(covers_dir='./covers/pajamas')
"""
days = time_difference.days
if days < 7:
return f"{days} day(s)"
elif days < 30:
weeks = days // 7
return f"{weeks} week(s)"
elif days < 365:
months = days // 30
return f"{months} month(s)"
else: else:
years = days // 365 logging.error(f"Unknown strategy '{strategy}'")
return f"{years} year(s)" return
def normalize_difficulty_name(difficulty_name):
difficulty_names = {
# ScoreSaber
'_ExpertPlus_SoloStandard': 'expertplus',
'_Expert_SoloStandard': 'expert',
'_Hard_SoloStandard': 'hard',
'_Normal_SoloStandard': 'normal',
'_Easy_SoloStandard': 'easy',
# BeatLeader
1: 'easy',
3: 'normal',
5: 'hard',
7: 'expert',
9: 'expertplus',
}
# Return the mapped value or the original name if there is no mapping
return difficulty_names.get(difficulty_name, difficulty_name)
"""deprecated in favor of using undocumented api call
def infer_beatleader_leaderboard_id(song_id: str, difficulty: str) -> str:
difficulty_map = {
'Easy': 1,
'Normal': 3,
'Hard': 5,
'Expert': 7,
'ExpertPlus': 9,
}
return f"{song_id}{difficulty_map[difficulty]}1"
"""
def playlist_strategy_beatleader_accuracy_gaps(
api: SimpleBeatLeaderAPI,
song_count: int = 40,
bin_size: float = 0.25,
bin_sort: bool = False
) -> List[Dict[str, Any]]:
"""
Build a playlist of songs where the player's accuracy is furthest below the median accuracy
for their star rating range. Songs are grouped into bins by star rating to ensure fair comparison.
:param api: SimpleBeatLeaderAPI instance for making API calls
:param song_count: Number of songs to include in the playlist
:param bin_size: Size of star rating bins for grouping similar difficulty songs
:param bin_sort: Whether to sort the bins by star rating
:return: A tuple containing (list of song dictionaries, playlist title string)
"""
player_id = prompt_for_player_id()
history = load_history()
history.setdefault('beatleader_accuracy_gaps', {})
history.setdefault('playlist_counts', {})
# Get the current count and increment it
count_key = 'beatleader_accuracy_gaps'
current_count = history['playlist_counts'].get(count_key, 0)
new_count = current_count + 1
history['playlist_counts'][count_key] = new_count
# Fetch accuracy graph data
all_scores = api.get_player_accgraph(player_id)
if not all_scores:
logging.warning(f"No accgraph data found for player ID {player_id} on BeatLeader.")
return [], ""
logging.debug(f"Found {len(all_scores)} accgraph entries for player ID {player_id} on BeatLeader.")
# Collect all star ratings
star_ratings = [entry['stars'] for entry in all_scores if entry.get('stars') is not None]
if not star_ratings:
logging.warning("No star ratings found in accgraph data.")
return [], ""
min_stars = min(star_ratings)
max_stars = max(star_ratings)
star_range = max_stars - min_stars
# Remove the bin size calculation logic
num_bins = math.ceil(star_range / bin_size)
logging.info(f"Using bin size: {bin_size}, resulting in {num_bins} bins.")
# Group accuracies by bins
bin_to_accuracies = defaultdict(list)
for entry in all_scores:
stars = entry.get('stars')
acc = entry.get('acc')
if stars is not None and acc is not None:
bin_index = int((stars - min_stars) / bin_size)
bin_to_accuracies[bin_index].append(acc)
# Calculate median accuracy for each bin
bin_to_median = {}
for bin_index, accs in bin_to_accuracies.items():
bin_to_median[bin_index] = median(accs)
bin_start = min_stars + bin_index * bin_size
bin_end = bin_start + bin_size
logging.debug(f"Median accuracy for bin {bin_index} (stars {bin_start:.2f} to {bin_end:.2f}): {bin_to_median[bin_index]:.4f}")
# Compute difference from median for each score
for entry in all_scores:
stars = entry.get('stars')
acc = entry.get('acc')
if stars is not None and acc is not None:
bin_index = int((stars - min_stars) / bin_size)
median_acc = bin_to_median.get(bin_index)
if median_acc is not None:
entry['diff_from_median'] = acc - median_acc
else:
entry['diff_from_median'] = float('inf') # Place entries with missing data at the end
else:
entry['diff_from_median'] = float('inf') # Place entries with missing data at the end
# Sort scores by difference from median (ascending: most below median first)
all_scores.sort(key=lambda x: x.get('diff_from_median', float('inf')))
playlist_data = []
for score_entry in all_scores:
if len(playlist_data) >= song_count:
break
acc = score_entry.get('acc', 0)
stars = score_entry.get('stars')
song_hash = score_entry.get('hash')
if not song_hash or stars is None:
logging.debug(f"Skipping entry due to missing hash or stars: {score_entry}")
continue
# Use stars as a proxy for difficulty; adjust if you have actual difficulty levels
difficulty = score_entry.get('diff', '')
difficulty_characteristic = score_entry.get('mode', 'Standard')
if song_hash in history['beatleader_accuracy_gaps'] and difficulty in history['beatleader_accuracy_gaps'][song_hash]:
logging.debug(f"Skipping song {song_hash} with difficulty {difficulty} as it's in history.")
continue
song_dict = {
'hash': song_hash,
'difficulties': [
{
'name': difficulty,
'characteristic': difficulty_characteristic
}
]
}
playlist_data.append(song_dict)
logging.debug(f"Selected song for playlist: Hash={song_hash}, Difficulty={difficulty}, "
f"Accuracy={acc*100:.2f}%, Diff from Median={score_entry['diff_from_median']*100:.2f}%")
# Update history
history['beatleader_accuracy_gaps'].setdefault(song_hash, []).append(difficulty)
if not playlist_data: if not playlist_data:
logging.info("No new songs found to add to the playlist based on history for BeatLeader accuracy gaps.") logging.info("No new scores found to add to the playlist.")
else: return
for song in playlist_data:
song_hash = song['hash']
difficulty = song['difficulties'][0]['name']
logging.info(f"Song added: Hash={song_hash}, Difficulty={difficulty}")
logging.info(f"Total songs added to playlist from BeatLeader accuracy gaps: {len(playlist_data)}")
save_history(history) playlist_builder.create_playlist(
playlist_title = f"accgraph-{new_count:02d}" playlist_data,
playlist_title=playlist_title,
return playlist_data, playlist_title playlist_author="SaberList Tool"
def playlist_strategy_scoresaber_oldscores(
api: ScoreSaberAPI,
song_count: int = 20
) -> List[Dict[str, Any]]:
"""Build and format a list of songs based on old scores from ScoreSaber, avoiding reusing the same song+difficulty."""
player_id = prompt_for_player_id()
history = load_history()
history.setdefault('scoresaber_oldscores', {})
history.setdefault('playlist_counts', {})
# Get the current count for ScoreSaber old scores and increment it
count_key = 'scoresaber_oldscores'
current_count = history['playlist_counts'].get(count_key, 0)
new_count = current_count + 1
history['playlist_counts'][count_key] = new_count
scores_data = api.get_player_scores(player_id, use_cache=True)
all_scores = scores_data.get('playerScores', [])
if not all_scores:
logging.warning(f"No scores found for player ID {player_id}.")
return [], ""
logging.debug(f"Found {len(all_scores)} scores for player ID {player_id}.")
# Sort scores by timeSet in ascending order (oldest first)
all_scores.sort(key=lambda x: x['score'].get('timeSet', ''))
playlist_data = []
current_time = datetime.now(timezone.utc)
for score in all_scores:
leaderboard = score.get('leaderboard', {})
song_id = leaderboard.get('songHash')
difficulty_raw = leaderboard.get('difficulty', {}).get('difficultyRaw', '')
if not song_id or not difficulty_raw:
logging.debug(f"Skipping score due to missing song_id or difficulty_raw: {score}")
continue # Skip if essential data is missing
# Calculate time ago
time_set_str = score['score'].get('timeSet')
if not time_set_str:
logging.debug(f"Skipping score due to missing timeSet: {score}")
continue # Skip if time_set is missing
try:
time_set = datetime.fromisoformat(time_set_str.replace('Z', '+00:00'))
except ValueError as e:
logging.error(f"Invalid time format for score ID {score['score'].get('id')}: {e}")
continue
time_difference = current_time - time_set
time_ago = format_time_ago(time_difference)
# Normalize the difficulty name
difficulty = normalize_difficulty_name(difficulty_raw)
game_mode = leaderboard.get('difficulty', {}).get('gameMode', 'Standard')
if 'Standard' in game_mode:
game_mode = 'Standard'
# Check history to avoid reusing song+difficulty
if song_id in history['scoresaber_oldscores'] and difficulty in history['scoresaber_oldscores'][song_id]:
logging.debug(f"Skipping song {song_id} with difficulty {difficulty} as it's in history.")
continue # Skip if already used
# Format the song data as expected by PlaylistBuilder
song_dict = {
'hash': song_id,
'songName': leaderboard.get('songName', 'Unknown'),
'difficulties': [
{
'name': difficulty,
'characteristic': game_mode
}
]
}
# Add the song to the playlist
playlist_data.append(song_dict)
logging.debug(f"Selected song for playlist: {song_dict['songName']} ({difficulty})")
# Log the song addition
mapper = "Unknown" # Mapper information can be added if available
logging.info(f"Song added: {song_dict['songName']} ({difficulty}), mapped by {mapper}. Last played {time_ago} ago.")
# Check if the desired number of songs has been reached
if len(playlist_data) >= song_count:
logging.debug(f"Reached the desired song count: {song_count}.")
break
# Log if no songs were added
if not playlist_data:
logging.info("No new songs found to add to the playlist based on history.")
else:
logging.info(f"Total songs added to playlist: {len(playlist_data)}")
# Update history to avoid reusing the same song+difficulty
for song in playlist_data:
song_id = song['hash']
difficulty_name = song['difficulties'][0]['name']
history['scoresaber_oldscores'].setdefault(song_id, []).append(difficulty_name)
save_history(history)
return playlist_data, f"scoresaber_oldscores-{new_count:02d}"
def playlist_strategy_beatleader_oldscores(
api: BeatLeaderAPI,
song_count: int = 20
) -> List[Dict[str, Any]]:
player_id = prompt_for_player_id()
history = load_history()
history.setdefault('beatleader_oldscores', {})
history.setdefault('playlist_counts', {})
# Get the current count for BeatLeader old scores and increment it
count_key = 'beatleader_oldscores'
current_count = history['playlist_counts'].get(count_key, 0)
new_count = current_count + 1
history['playlist_counts'][count_key] = new_count
scores_data = api.get_player_scores(player_id)
all_scores = scores_data.get('playerScores', [])
if not all_scores:
logging.warning(f"No scores found for player ID {player_id} on BeatLeader.")
return [], ""
logging.debug(f"Found {len(all_scores)} scores for player ID {player_id} on BeatLeader.")
# Sort scores by epochTime in ascending order (oldest first)
all_scores.sort(key=lambda x: x.get('score', {}).get('epochTime', 0))
playlist_data = []
current_time = datetime.now(timezone.utc)
for score_entry in all_scores:
if len(playlist_data) >= song_count:
break # Stop if we've reached the desired number of songs
score = score_entry.get('score', {})
leaderboard = score_entry.get('leaderboard', {})
song_hash = leaderboard.get('songHash')
difficulty_raw = int(leaderboard.get('difficulty', ''))
game_mode = leaderboard.get('modeName', 'Standard')
epoch_time = score.get('epochTime')
if not song_hash or not difficulty_raw or not epoch_time:
logging.debug(f"Skipping score due to missing song_hash or difficulty_raw: {score_entry}")
continue
difficulty = normalize_difficulty_name(difficulty_raw)
# avoid reusing song+difficulty
if song_hash in history['beatleader_oldscores'] and difficulty in history['beatleader_oldscores'][song_hash]:
logging.debug(f"Skipping song {song_hash} with difficulty {difficulty} as it's in history.")
continue # Skip if already used
# Calculate time ago
try:
time_set = datetime.fromtimestamp(epoch_time, tz=timezone.utc)
except (ValueError, OSError) as e:
logging.error(f"Invalid epochTime for score ID {score.get('id')}: {e}")
continue
time_difference = current_time - time_set
time_ago = format_time_ago(time_difference)
# Format the song data for PlaylistBuilder
song_dict = {
'hash': song_hash,
'difficulties': [
{
'name': difficulty,
'characteristic': game_mode
}
]
}
# Add the song to the playlist
playlist_data.append(song_dict)
logging.debug(f"Selected song for playlist: Hash={song_hash}, Difficulty={difficulty}. Last played {time_ago} ago.")
# Update history
history['beatleader_oldscores'].setdefault(song_hash, []).append(difficulty)
# Log the final playlist
if not playlist_data:
logging.info("No new songs found to add to the playlist based on history for BeatLeader.")
else:
for song in playlist_data:
song_hash = song['hash']
difficulty = song['difficulties'][0]['name']
logging.info(f"Song added: Hash={song_hash}, Difficulty={difficulty}.")
logging.info(f"Total songs added to playlist from BeatLeader: {len(playlist_data)}")
save_history(history)
return playlist_data, f"beatleader_oldscores-{new_count:02d}"
def playlist_strategy_beatleader_lowest_pp(
api: BeatLeaderAPI,
song_count: int = 20
) -> List[Dict[str, Any]]:
player_id = prompt_for_player_id()
history = load_history()
history.setdefault('beatleader_lowest_pp', {})
history.setdefault('playlist_counts', {})
# Get the current count for BeatLeader lowest PP and increment it
count_key = 'beatleader_lowest_pp'
current_count = history['playlist_counts'].get(count_key, 0)
new_count = current_count + 1
history['playlist_counts'][count_key] = new_count
scores_data = api.get_player_scores(player_id)
all_scores = scores_data.get('playerScores', [])
if not all_scores:
logging.warning(f"No scores found for player ID {player_id} on BeatLeader.")
return [], ""
logging.debug(f"Found {len(all_scores)} scores for player ID {player_id} on BeatLeader.")
# Filter out scores with zero PP and sort by PP in ascending order
ranked_scores = [s for s in all_scores if s.get('score', {}).get('pp', 0) > 0]
ranked_scores.sort(key=lambda x: x.get('score', {}).get('pp', float('inf')))
playlist_data = []
for score_entry in ranked_scores:
if len(playlist_data) >= song_count:
break # Stop if we've reached the desired number of songs
score = score_entry.get('score', {})
leaderboard = score_entry.get('leaderboard', {})
song_hash = leaderboard.get('songHash')
difficulty_raw = int(leaderboard.get('difficulty', ''))
game_mode = leaderboard.get('modeName', 'Standard')
pp = score.get('pp', 0)
if not song_hash or not difficulty_raw:
logging.debug(f"Skipping score due to missing song_hash or difficulty_raw: {score_entry}")
continue
difficulty = normalize_difficulty_name(difficulty_raw)
# avoid reusing song+difficulty
if song_hash in history['beatleader_lowest_pp'] and difficulty in history['beatleader_lowest_pp'][song_hash]:
logging.debug(f"Skipping song {song_hash} with difficulty {difficulty} as it's in history.")
continue # Skip if already used
# Format the song data for PlaylistBuilder
song_dict = {
'hash': song_hash,
'difficulties': [
{
'name': difficulty,
'characteristic': game_mode
}
]
}
# Add the song to the playlist
playlist_data.append(song_dict)
logging.debug(f"Selected song for playlist: Hash={song_hash}, Difficulty={difficulty}, PP={pp:.2f}")
# Update history
history['beatleader_lowest_pp'].setdefault(song_hash, []).append(difficulty)
# Log the final playlist
if not playlist_data:
logging.info("No new songs found to add to the playlist based on history for BeatLeader lowest PP.")
else:
for song in playlist_data:
song_hash = song['hash']
difficulty = song['difficulties'][0]['name']
logging.info(f"Song added: Hash={song_hash}, Difficulty={difficulty}")
logging.info(f"Total songs added to playlist from BeatLeader lowest PP: {len(playlist_data)}")
save_history(history)
return playlist_data, f"beatleader_lowest_pp-{new_count:02d}"
def playlist_strategy_scoresaber_lowest_pp(
api: ScoreSaberAPI,
song_count: int = 20
) -> List[Dict[str, Any]]:
"""Build and format a list of songs based on lowest PP scores from ScoreSaber, avoiding reusing the same song+difficulty."""
player_id = prompt_for_player_id()
history = load_history()
history.setdefault('scoresaber_lowest_pp', {})
history.setdefault('playlist_counts', {})
# Get the current count for ScoreSaber lowest PP and increment it
count_key = 'scoresaber_lowest_pp'
current_count = history['playlist_counts'].get(count_key, 0)
new_count = current_count + 1
history['playlist_counts'][count_key] = new_count
scores_data = api.get_player_scores(player_id, use_cache=True)
all_scores = scores_data.get('playerScores', [])
if not all_scores:
logging.warning(f"No scores found for player ID {player_id}.")
return [], ""
logging.debug(f"Found {len(all_scores)} scores for player ID {player_id}.")
# Filter out scores with zero PP and sort by PP in ascending order
ranked_scores = [s for s in all_scores if s['score'].get('pp', 0) > 0]
ranked_scores.sort(key=lambda x: x['score'].get('pp', float('inf')))
playlist_data = []
for score in ranked_scores:
leaderboard = score.get('leaderboard', {})
song_id = leaderboard.get('songHash')
difficulty_raw = leaderboard.get('difficulty', {}).get('difficultyRaw', '')
if not song_id or not difficulty_raw:
logging.debug(f"Skipping score due to missing song_id or difficulty_raw: {score}")
continue # Skip if essential data is missing
# Normalize the difficulty name
difficulty = normalize_difficulty_name(difficulty_raw)
game_mode = leaderboard.get('difficulty', {}).get('gameMode', 'Standard')
if 'Standard' in game_mode:
game_mode = 'Standard'
# Check history to avoid reusing song+difficulty
if song_id in history['scoresaber_lowest_pp'] and difficulty in history['scoresaber_lowest_pp'][song_id]:
logging.debug(f"Skipping song {song_id} with difficulty {difficulty} as it's in history.")
continue # Skip if already used
# Format the song data as expected by PlaylistBuilder
song_dict = {
'hash': song_id,
'songName': leaderboard.get('songName', 'Unknown'),
'difficulties': [
{
'name': difficulty,
'characteristic': game_mode
}
]
}
# Add the song to the playlist
playlist_data.append(song_dict)
pp_score = score['score'].get('pp', 0)
logging.info(f"Song added: {song_dict['songName']} ({difficulty}), PP: {pp_score:.2f}")
# Check if the desired number of songs has been reached
if len(playlist_data) >= song_count:
logging.debug(f"Reached the desired song count: {song_count}.")
break
# Log if no songs were added
if not playlist_data:
logging.info("No new songs found to add to the playlist based on history.")
else:
logging.info(f"Total songs added to playlist: {len(playlist_data)}")
# Update history to avoid reusing the same song+difficulty
for song in playlist_data:
song_id = song['hash']
difficulty_name = song['difficulties'][0]['name']
history['scoresaber_lowest_pp'].setdefault(song_id, []).append(difficulty_name)
save_history(history)
return playlist_data, f"scoresaber_lowest_pp-{new_count:02d}"
def map_leaders_by_month(month: int = 9, year: int = 2024, game_modes: List[str] = ['Standard']) -> List[Dict]:
"""
Gathers a month's worth of maps using the BeatSaver latest maps endpoint,
prioritizes map difficulties where players have already set good scores,
and calculates the average accuracy for each map+difficulty.
Args:
month: The month to gather maps for.
year: The year to gather maps for.
game_modes: The game modes to include.
Returns:
A list of dictionaries, each containing:
- hash: Hash of the map
- difficulties: List of difficulties with their characteristics
- map_name: Name of the map
- average_accuracy: Average accuracy of the leaderboard
"""
beatleader_api = SimpleBeatLeaderAPI(cache_expiry_days=30)
beatsaver_api = BeatSaverAPI(cache_expiry_days=30)
logging.debug(f"Fetching maps for {month}/{year}")
map_data = beatsaver_api.get_maps(year=year, month=month, page_size=100)
collected_data = []
for i, map_entry in enumerate(map_data):
# Ensure there are versions available
if not map_entry.versions:
logging.warning(f"No versions found for map: {map_entry.name}")
continue
latest_version = max(map_entry.versions, key=lambda version: version.created_at)
song_hash = latest_version.hash_
for diff in latest_version.diffs:
if diff.characteristic not in game_modes:
continue
logging.info(f"Getting leaderboard for {i+1}/{len(map_data)}: {map_entry.name}")
leaderboard_data = beatleader_api.get_leaderboard(song_hash, diff.difficulty)
if not leaderboard_data:
logging.debug(f"No leaderboard data for {map_entry.name} [{diff.difficulty}], skipping")
continue
# Calculate average accuracy
accuracies = [entry.get('accuracy', 0) for entry in leaderboard_data if 'accuracy' in entry]
if not accuracies:
logging.debug(f"No accuracy data for {map_entry.name} [{diff.difficulty}]")
continue
avg_accuracy = mean(accuracies)
collected_data.append({
'hash': song_hash,
'difficulties': [
{
'name': diff.difficulty,
'characteristic': diff.characteristic
}
],
'map_name': map_entry.name,
'average_accuracy': avg_accuracy
})
logging.info(f"Collected leaderboards for {len(collected_data)} map+difficulty combinations, orderable by average accuracy of top ten plays for {month}/{year}.")
return collected_data
def playlist_strategy_scoresaver_acc(
song_count: int = 40
) -> List[Dict[str, Any]]:
"""
Build and format a list of songs based on the highest average accuracy from recent maps.
Prompts the user for the month and year, defaulting to last month.
Excludes any map that's in the history, regardless of difficulty.
:param song_count: The number of songs to include in the playlist. Default is 40.
:return: A list of dictionaries containing song information for the playlist.
"""
history = load_history()
history.setdefault('scoresaver', {})
history.setdefault('playlist_counts', {})
# Get last month's date
today = datetime.now()
last_month = today.replace(day=1) - timedelta(days=1)
default_month = last_month.month
default_year = last_month.year
# Prompt for month and year
while True:
month_input = input(f"Enter month (1-12, default {default_month}): ").strip() or str(default_month)
year_input = input(f"Enter year (default {default_year}): ").strip() or str(default_year)
try:
month = int(month_input)
year = int(year_input)
if 1 <= month <= 12 and 2000 <= year <= datetime.now().year:
break
else:
print("Invalid month or year. Please try again.")
except ValueError:
print("Invalid input. Please enter numbers only.")
# Get the current count for highest accuracy and increment it
count_key = f"scoresaver-{year}-{month:02d}"
current_count = history['playlist_counts'].get(count_key, 0)
new_count = current_count + 1
history['playlist_counts'][count_key] = new_count
leaderboard_data = map_leaders_by_month(month=month, year=year)
if not leaderboard_data:
logging.error(f"No map+difficulty data available for {calendar.month_name[month]} {year}.")
return [], ""
# Sort the data by average_accuracy in descending order
sorted_data = sorted(
leaderboard_data,
key=lambda x: x['average_accuracy'],
reverse=True
) )
playlist_data = []
for entry in sorted_data:
if len(playlist_data) >= song_count:
break
song_hash = entry['hash']
# Check history to avoid reusing any map, regardless of difficulty
if song_hash in history['scoresaver']:
logging.debug(f"Skipping song {song_hash} as it's in history.")
continue
playlist_data.append({
'hash': song_hash,
'difficulties': entry['difficulties'],
'songName': entry['map_name']
})
# Log the song addition
difficulty = entry['difficulties'][0]['name']
logging.info(f"Song added: {entry['map_name']} ({difficulty}) - Average Accuracy: {entry['average_accuracy'] * 100:.2f}%")
# Update history (now we're just adding the song hash, not the difficulty)
history['scoresaver'][song_hash] = True
# Log if no songs were added
if not playlist_data:
logging.info(f"No new songs found to add to the playlist for {calendar.month_name[month]} {year} based on history.")
else:
logging.info(f"Total songs added to playlist: {len(playlist_data)}")
save_history(history)
return playlist_data, f"scoresaver-{year}-{month:02d}-{new_count:02d}"
def reset_history(strategy: str) -> None:
"""
Reset the history for a given playlist strategy.
:param strategy: The strategy to reset history for.
"""
history = load_history()
if strategy in history:
del history[strategy]
if 'playlist_counts' in history and strategy in history['playlist_counts']:
history['playlist_counts'][strategy] = 0
save_history(history)
logging.info(f"History and playlist count for '{strategy}' have been reset.")
else:
logging.info(f"No history found for '{strategy}'. Nothing to reset.")
def get_strategy(): def get_strategy():
parser = argparse.ArgumentParser(description="Generate Beat Saber playlists") parser = argparse.ArgumentParser(description="Generate Beat Saber playlists")
parser.add_argument("-s", "--strategy", parser.add_argument("-s", "--strategy",
choices=[ choices=[
"scoresaber_oldscores", "scoresaber_oldscores",
"beatleader_oldscores", "beatleader_oldscores",
"scoresaver_acc", "beatsaver_acc",
# "beatleader_lowest_pp", # "beatleader_lowest_pp",
# "scoresaber_lowest_pp", # "scoresaber_lowest_pp",
# "beatleader_lowest_acc", # "beatleader_lowest_acc",
"beatleader_accuracy_gaps" "beatleader_accuracy_gaps",
"beatsaver_curated",
"beatsaver_mappers"
], ],
help="Specify the playlist generation strategy") help="Specify the playlist generation strategy")
parser.add_argument("-r", "--reset", parser.add_argument("-r", "--reset",
@ -822,124 +109,3 @@ def get_strategy():
parser.error("--strategy is required unless --reset is used") parser.error("--strategy is required unless --reset is used")
return args.strategy return args.strategy
def playlist_strategy_beatleader_lowest_acc(
api: BeatLeaderAPI,
song_count: int = 20
) -> List[Dict[str, Any]]:
player_id = prompt_for_player_id()
history = load_history()
history.setdefault('beatleader_lowest_acc', {})
history.setdefault('playlist_counts', {})
"""Selects songs with the lowest accuracy, avoiding reusing the same song+difficulty."""
# Get the current count and increment it
count_key = 'beatleader_lowest_acc'
current_count = history['playlist_counts'].get(count_key, 0)
new_count = current_count + 1
history['playlist_counts'][count_key] = new_count
scores_data = api.get_player_scores(player_id)
all_scores = scores_data.get('playerScores', [])
if not all_scores:
logging.warning(f"No scores found for player ID {player_id} on BeatLeader.")
return [], ""
logging.debug(f"Found {len(all_scores)} scores for player ID {player_id} on BeatLeader.")
# Sort by accuracy in ascending order (lowest first)
all_scores.sort(key=lambda x: x.get('score', {}).get('accuracy', float('inf')))
playlist_data = []
for score_entry in all_scores:
if len(playlist_data) >= song_count:
break
score = score_entry.get('score', {})
leaderboard = score_entry.get('leaderboard', {})
song_hash = leaderboard.get('songHash')
difficulty_raw = int(leaderboard.get('difficulty', ''))
game_mode = leaderboard.get('modeName', 'Standard')
accuracy = score.get('accuracy', 0)
if not song_hash or not difficulty_raw:
logging.debug(f"Skipping score due to missing song_hash or difficulty_raw: {score_entry}")
continue
difficulty = normalize_difficulty_name(difficulty_raw)
# avoid reusing song+difficulty
if song_hash in history['beatleader_lowest_acc'] and difficulty in history['beatleader_lowest_acc'][song_hash]:
logging.debug(f"Skipping song {song_hash} with difficulty {difficulty} as it's in history.")
continue
song_dict = {
'hash': song_hash,
'difficulties': [
{
'name': difficulty,
'characteristic': game_mode
}
]
}
playlist_data.append(song_dict)
logging.debug(f"Selected song for playlist: Hash={song_hash}, Difficulty={difficulty}, Accuracy={accuracy*100:.2f}%")
# Update history
history['beatleader_lowest_acc'].setdefault(song_hash, []).append(difficulty)
if not playlist_data:
logging.info("No new songs found to add to the playlist based on history for BeatLeader lowest accuracy.")
else:
for song in playlist_data:
song_hash = song['hash']
difficulty = song['difficulties'][0]['name']
logging.info(f"Song added: Hash={song_hash}, Difficulty={difficulty}")
logging.info(f"Total songs added to playlist from BeatLeader lowest accuracy: {len(playlist_data)}")
save_history(history)
return playlist_data, f"beatleader_lowest_acc-{new_count:02d}"
def saberlist() -> None:
"""
Generate a playlist of songs using a specified strategy.
Avoids reusing the same song+difficulty in a playlist based on history.
"""
strategy = get_strategy()
if strategy == 'scoresaber_oldscores':
playlist_data, playlist_title = playlist_strategy_scoresaber_oldscores(ScoreSaberAPI(cache_expiry_days=CACHE_EXPIRY_DAYS))
playlist_builder = PlaylistBuilder()
elif strategy == 'beatleader_oldscores':
playlist_data, playlist_title = playlist_strategy_beatleader_oldscores(BeatLeaderAPI(cache_expiry_days=CACHE_EXPIRY_DAYS))
playlist_builder = PlaylistBuilder()
elif strategy == 'scoresaver_acc':
playlist_data, playlist_title = playlist_strategy_scoresaver_acc()
playlist_builder = PlaylistBuilder(covers_dir='./covers/scoresavers')
elif strategy == 'beatleader_lowest_pp':
playlist_data, playlist_title = playlist_strategy_beatleader_lowest_pp(BeatLeaderAPI(cache_expiry_days=CACHE_EXPIRY_DAYS))
playlist_builder = PlaylistBuilder(covers_dir='./covers/beatleader')
elif strategy == 'scoresaber_lowest_pp':
playlist_data, playlist_title = playlist_strategy_scoresaber_lowest_pp(ScoreSaberAPI(cache_expiry_days=CACHE_EXPIRY_DAYS))
playlist_builder = PlaylistBuilder(covers_dir='./covers/scoresaber')
elif strategy == 'beatleader_lowest_acc':
playlist_data, playlist_title = playlist_strategy_beatleader_lowest_acc(BeatLeaderAPI(cache_expiry_days=CACHE_EXPIRY_DAYS))
playlist_builder = PlaylistBuilder(covers_dir='./covers/kaiju')
elif strategy == 'beatleader_accuracy_gaps':
playlist_data, playlist_title = playlist_strategy_beatleader_accuracy_gaps(SimpleBeatLeaderAPI(cache_expiry_days=CACHE_EXPIRY_DAYS))
playlist_builder = PlaylistBuilder(covers_dir='./covers/pajamas')
else:
logging.error(f"Unknown strategy '{strategy}'")
return
if not playlist_data:
logging.info("No new scores found to add to the playlist.")
return
playlist_builder.create_playlist(
playlist_data,
playlist_title=playlist_title,
playlist_author="SaberList Tool"
)

View File

@ -0,0 +1,232 @@
from collections import defaultdict
from statistics import median
from typing import Dict, Any, List
import logging
import os
import math
from dotenv import load_dotenv
load_dotenv()
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO').upper()
import logging
logging.basicConfig(
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=LOG_LEVEL
)
from helpers.SimpleBeatLeaderAPI import SimpleBeatLeaderAPI
from saberlist.utils import prompt_for_player_id, load_history, save_history, normalize_difficulty_name
def playlist_strategy_beatleader_accuracy_gaps(
api: SimpleBeatLeaderAPI,
song_count: int = 40,
bin_size: float = 0.25,
bin_sort: bool = False
) -> List[Dict[str, Any]]:
"""
Build a playlist of songs where the player's accuracy is furthest below the median accuracy
for their star rating range. Songs are grouped into bins by star rating to ensure fair comparison.
:param api: SimpleBeatLeaderAPI instance for making API calls
:param song_count: Number of songs to include in the playlist
:param bin_size: Size of star rating bins for grouping similar difficulty songs
:param bin_sort: Whether to sort the bins by star rating
:return: A tuple containing (list of song dictionaries, playlist title string)
"""
player_id = prompt_for_player_id()
history = load_history()
history.setdefault('beatleader_accuracy_gaps', {})
history.setdefault('playlist_counts', {})
# Get the current count and increment it
count_key = 'beatleader_accuracy_gaps'
current_count = history['playlist_counts'].get(count_key, 0)
new_count = current_count + 1
history['playlist_counts'][count_key] = new_count
# Fetch accuracy graph data
all_scores = api.get_player_accgraph(player_id)
if not all_scores:
logging.warning(f"No accgraph data found for player ID {player_id} on BeatLeader.")
return [], ""
logging.debug(f"Found {len(all_scores)} accgraph entries for player ID {player_id} on BeatLeader.")
# Collect all star ratings
star_ratings = [entry['stars'] for entry in all_scores if entry.get('stars') is not None]
if not star_ratings:
logging.warning("No star ratings found in accgraph data.")
return [], ""
min_stars = min(star_ratings)
max_stars = max(star_ratings)
star_range = max_stars - min_stars
# Remove the bin size calculation logic
num_bins = math.ceil(star_range / bin_size)
logging.info(f"Using bin size: {bin_size}, resulting in {num_bins} bins.")
# Group accuracies by bins
bin_to_accuracies = defaultdict(list)
for entry in all_scores:
stars = entry.get('stars')
acc = entry.get('acc')
if stars is not None and acc is not None:
bin_index = int((stars - min_stars) / bin_size)
bin_to_accuracies[bin_index].append(acc)
# Calculate median accuracy for each bin
bin_to_median = {}
for bin_index, accs in bin_to_accuracies.items():
bin_to_median[bin_index] = median(accs)
bin_start = min_stars + bin_index * bin_size
bin_end = bin_start + bin_size
logging.debug(f"Median accuracy for bin {bin_index} (stars {bin_start:.2f} to {bin_end:.2f}): {bin_to_median[bin_index]:.4f}")
# Compute difference from median for each score
for entry in all_scores:
stars = entry.get('stars')
acc = entry.get('acc')
if stars is not None and acc is not None:
bin_index = int((stars - min_stars) / bin_size)
median_acc = bin_to_median.get(bin_index)
if median_acc is not None:
entry['diff_from_median'] = acc - median_acc
else:
entry['diff_from_median'] = float('inf') # Place entries with missing data at the end
else:
entry['diff_from_median'] = float('inf') # Place entries with missing data at the end
# Sort scores by difference from median (ascending: most below median first)
all_scores.sort(key=lambda x: x.get('diff_from_median', float('inf')))
playlist_data = []
for score_entry in all_scores:
if len(playlist_data) >= song_count:
break
acc = score_entry.get('acc', 0)
stars = score_entry.get('stars')
song_hash = score_entry.get('hash')
if not song_hash or stars is None:
logging.debug(f"Skipping entry due to missing hash or stars: {score_entry}")
continue
# Use stars as a proxy for difficulty; adjust if you have actual difficulty levels
difficulty = score_entry.get('diff', '')
difficulty_characteristic = score_entry.get('mode', 'Standard')
if song_hash in history['beatleader_accuracy_gaps'] and difficulty in history['beatleader_accuracy_gaps'][song_hash]:
logging.debug(f"Skipping song {song_hash} with difficulty {difficulty} as it's in history.")
continue
song_dict = {
'hash': song_hash,
'difficulties': [
{
'name': difficulty,
'characteristic': difficulty_characteristic
}
]
}
playlist_data.append(song_dict)
logging.debug(f"Selected song for playlist: Hash={song_hash}, Difficulty={difficulty}, "
f"Accuracy={acc*100:.2f}%, Diff from Median={score_entry['diff_from_median']*100:.2f}%")
# Update history
history['beatleader_accuracy_gaps'].setdefault(song_hash, []).append(difficulty)
if not playlist_data:
logging.info("No new songs found to add to the playlist based on history for BeatLeader accuracy gaps.")
else:
for song in playlist_data:
song_hash = song['hash']
difficulty = song['difficulties'][0]['name']
logging.info(f"Song added: Hash={song_hash}, Difficulty={difficulty}")
logging.info(f"Total songs added to playlist from BeatLeader accuracy gaps: {len(playlist_data)}")
save_history(history)
playlist_title = f"accgraph-{new_count:02d}"
return playlist_data, playlist_title
def playlist_strategy_beatleader_lowest_acc(
api: BeatLeaderAPI,
song_count: int = 20
) -> List[Dict[str, Any]]:
player_id = prompt_for_player_id()
history = load_history()
history.setdefault('beatleader_lowest_acc', {})
history.setdefault('playlist_counts', {})
"""Selects songs with the lowest accuracy, avoiding reusing the same song+difficulty."""
# Get the current count and increment it
count_key = 'beatleader_lowest_acc'
current_count = history['playlist_counts'].get(count_key, 0)
new_count = current_count + 1
history['playlist_counts'][count_key] = new_count
scores_data = api.get_player_scores(player_id)
all_scores = scores_data.get('playerScores', [])
if not all_scores:
logging.warning(f"No scores found for player ID {player_id} on BeatLeader.")
return [], ""
logging.debug(f"Found {len(all_scores)} scores for player ID {player_id} on BeatLeader.")
# Sort by accuracy in ascending order (lowest first)
all_scores.sort(key=lambda x: x.get('score', {}).get('accuracy', float('inf')))
playlist_data = []
for score_entry in all_scores:
if len(playlist_data) >= song_count:
break
score = score_entry.get('score', {})
leaderboard = score_entry.get('leaderboard', {})
song_hash = leaderboard.get('songHash')
difficulty_raw = int(leaderboard.get('difficulty', ''))
game_mode = leaderboard.get('modeName', 'Standard')
accuracy = score.get('accuracy', 0)
if not song_hash or not difficulty_raw:
logging.debug(f"Skipping score due to missing song_hash or difficulty_raw: {score_entry}")
continue
difficulty = normalize_difficulty_name(difficulty_raw)
# avoid reusing song+difficulty
if song_hash in history['beatleader_lowest_acc'] and difficulty in history['beatleader_lowest_acc'][song_hash]:
logging.debug(f"Skipping song {song_hash} with difficulty {difficulty} as it's in history.")
continue
song_dict = {
'hash': song_hash,
'difficulties': [
{
'name': difficulty,
'characteristic': game_mode
}
]
}
playlist_data.append(song_dict)
logging.debug(f"Selected song for playlist: Hash={song_hash}, Difficulty={difficulty}, Accuracy={accuracy*100:.2f}%")
# Update history
history['beatleader_lowest_acc'].setdefault(song_hash, []).append(difficulty)
if not playlist_data:
logging.info("No new songs found to add to the playlist based on history for BeatLeader lowest accuracy.")
else:
for song in playlist_data:
song_hash = song['hash']
difficulty = song['difficulties'][0]['name']
logging.info(f"Song added: Hash={song_hash}, Difficulty={difficulty}")
logging.info(f"Total songs added to playlist from BeatLeader lowest accuracy: {len(playlist_data)}")
save_history(history)
return playlist_data, f"beatleader_lowest_acc-{new_count:02d}"

View File

@ -0,0 +1,308 @@
from statistics import mean
from typing import Dict, Any, List
import logging
import os
from datetime import datetime, timedelta
import calendar
from dotenv import load_dotenv
load_dotenv()
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO').upper()
import logging
logging.basicConfig(
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=LOG_LEVEL
)
from helpers.SimpleBeatSaverAPI import SimpleBeatSaverAPI
from saberlist.utils import load_history, save_history, prompt_for_mapper_ids
def playlist_strategy_beatsaver_acc(
song_count: int = 40
) -> List[Dict[str, Any]]:
"""
Build and format a list of songs based on the highest average accuracy from recent maps.
Prompts the user for the month and year, defaulting to last month.
Excludes any map that's in the history, regardless of difficulty.
:param song_count: The number of songs to include in the playlist. Default is 40.
:return: A list of dictionaries containing song information for the playlist.
"""
history = load_history()
history.setdefault('beatsaver', {})
history.setdefault('playlist_counts', {})
# Get last month's date
today = datetime.now()
last_month = today.replace(day=1) - timedelta(days=1)
default_month = last_month.month
default_year = last_month.year
# Prompt for month and year
while True:
month_input = input(f"Enter month (1-12, default {default_month}): ").strip() or str(default_month)
year_input = input(f"Enter year (default {default_year}): ").strip() or str(default_year)
try:
month = int(month_input)
year = int(year_input)
if 1 <= month <= 12 and 2000 <= year <= datetime.now().year:
break
else:
print("Invalid month or year. Please try again.")
except ValueError:
print("Invalid input. Please enter numbers only.")
# Get the current count for highest accuracy and increment it
count_key = f"beatsaver-{year}-{month:02d}"
current_count = history['playlist_counts'].get(count_key, 0)
new_count = current_count + 1
history['playlist_counts'][count_key] = new_count
leaderboard_data = map_leaders_by_month(month=month, year=year)
if not leaderboard_data:
logging.error(f"No map+difficulty data available for {calendar.month_name[month]} {year}.")
return [], ""
# Sort the data by average_accuracy in descending order
sorted_data = sorted(
leaderboard_data,
key=lambda x: x['average_accuracy'],
reverse=True
)
playlist_data = []
for entry in sorted_data:
if len(playlist_data) >= song_count:
break
song_hash = entry['hash']
# Check history to avoid reusing any map, regardless of difficulty
if song_hash in history['beatsaver']:
logging.debug(f"Skipping song {song_hash} as it's in history.")
continue
playlist_data.append({
'hash': song_hash,
'difficulties': entry['difficulties'],
'songName': entry['map_name']
})
# Log the song addition
difficulty = entry['difficulties'][0]['name']
logging.info(f"Song added: {entry['map_name']} ({difficulty}) - Average Accuracy: {entry['average_accuracy'] * 100:.2f}%")
# Update history (now we're just adding the song hash, not the difficulty)
history['beatsaver'][song_hash] = True
# Log if no songs were added
if not playlist_data:
logging.info(f"No new songs found to add to the playlist for {calendar.month_name[month]} {year} based on history.")
else:
logging.info(f"Total songs added to playlist: {len(playlist_data)}")
save_history(history)
return playlist_data, f"beatsaver-{year}-{month:02d}-{new_count:02d}"
def playlist_strategy_beatsaver_curated(
api: SimpleBeatSaverAPI,
song_count: int = 50
) -> List[Dict[str, Any]]:
"""
Build a playlist from BeatSaver's curated songs list.
:param api: SimpleBeatSaverAPI instance for making API calls
:param song_count: Number of songs to include in the playlist
:return: Tuple of (playlist data, playlist title)
"""
history = load_history()
history.setdefault('beatsaver_curated', {})
history.setdefault('playlist_counts', {})
# Get the current count and increment it
count_key = 'beatsaver_curated'
current_count = history['playlist_counts'].get(count_key, 0)
new_count = current_count + 1
history['playlist_counts'][count_key] = new_count
curated_songs = api.get_curated_songs()
if not curated_songs:
raise RuntimeError("Unable to fetch curated songs from BeatSaver.")
logging.info(f"Found {len(curated_songs)} curated songs on BeatSaver.")
playlist_data = []
for song in curated_songs:
if len(playlist_data) >= song_count:
break
song_hash = song.get('hash')
if song_hash in history['beatsaver_curated']:
logging.debug(f"Skipping song {song_hash} as it's in history")
continue
playlist_data.append(song)
logging.info(f"Song added: {song['songName']}")
# Update history
history['beatsaver_curated'][song_hash] = True
if not playlist_data:
logging.info("No new songs found to add to the playlist based on history")
else:
logging.info(f"Total songs added to playlist: {len(playlist_data)}")
save_history(history)
playlist_title = f"curated-{new_count:02d}"
return playlist_data, playlist_title
def map_leaders_by_month(month: int = 9, year: int = 2024, game_modes: List[str] = ['Standard']) -> List[Dict]:
"""
Gathers a month's worth of maps using the BeatSaver latest maps endpoint,
prioritizes map difficulties where players have already set good scores,
and calculates the average accuracy for each map+difficulty.
Args:
month: The month to gather maps for.
year: The year to gather maps for.
game_modes: The game modes to include.
Returns:
A list of dictionaries, each containing:
- hash: Hash of the map
- difficulties: List of difficulties with their characteristics
- map_name: Name of the map
- average_accuracy: Average accuracy of the leaderboard
"""
beatleader_api = SimpleBeatLeaderAPI(cache_expiry_days=30)
beatsaver_api = BeatSaverAPI(cache_expiry_days=30)
logging.debug(f"Fetching maps for {month}/{year}")
map_data = beatsaver_api.get_maps(year=year, month=month, page_size=100)
collected_data = []
for i, map_entry in enumerate(map_data):
# Ensure there are versions available
if not map_entry.versions:
logging.warning(f"No versions found for map: {map_entry.name}")
continue
latest_version = max(map_entry.versions, key=lambda version: version.created_at)
song_hash = latest_version.hash_
for diff in latest_version.diffs:
if diff.characteristic not in game_modes:
continue
logging.info(f"Getting leaderboard for {i+1}/{len(map_data)}: {map_entry.name}")
leaderboard_data = beatleader_api.get_leaderboard(song_hash, diff.difficulty)
if not leaderboard_data:
logging.debug(f"No leaderboard data for {map_entry.name} [{diff.difficulty}], skipping")
continue
# Calculate average accuracy
accuracies = [entry.get('accuracy', 0) for entry in leaderboard_data if 'accuracy' in entry]
if not accuracies:
logging.debug(f"No accuracy data for {map_entry.name} [{diff.difficulty}]")
continue
avg_accuracy = mean(accuracies)
collected_data.append({
'hash': song_hash,
'difficulties': [
{
'name': diff.difficulty,
'characteristic': diff.characteristic
}
],
'map_name': map_entry.name,
'average_accuracy': avg_accuracy
})
logging.info(f"Collected leaderboards for {len(collected_data)} map+difficulty combinations, orderable by average accuracy of top ten plays for {month}/{year}.")
return collected_data
def playlist_strategy_beatsaver_mappers(
api: SimpleBeatSaverAPI,
song_count: int = 50
) -> List[Dict[str, Any]]:
"""
Build a playlist from maps created by specified mappers on BeatSaver,
interleaving maps from different mappers for variety.
:param api: SimpleBeatSaverAPI instance for making API calls
:param song_count: Number of songs to include in the playlist
:return: Tuple of (playlist data, playlist title)
"""
history = load_history()
history.setdefault('beatsaver_mappers', {})
history.setdefault('playlist_counts', {})
mapper_ids = prompt_for_mapper_ids()
# Get the current count and increment it
count_key = 'beatsaver_mappers'
current_count = history['playlist_counts'].get(count_key, 0)
new_count = current_count + 1
history['playlist_counts'][count_key] = new_count
# Collect maps by mapper
maps_by_mapper = {}
for mapper_id in mapper_ids:
logging.info(f"Fetching maps for mapper ID: {mapper_id}")
mapper_maps = api.get_mapper_maps(mapper_id=mapper_id, use_cache=False)
if not mapper_maps:
logging.warning(f"No maps found for mapper ID: {mapper_id}")
continue
logging.info(f"Found {len(mapper_maps)} maps from mapper ID: {mapper_id}")
# Filter out maps that are in history
eligible_maps = [
song for song in mapper_maps
if song.get('hash') not in history['beatsaver_mappers']
]
if eligible_maps:
maps_by_mapper[mapper_id] = eligible_maps
# Interleave maps from different mappers
playlist_data = []
while maps_by_mapper and len(playlist_data) < song_count:
for mapper_id in list(maps_by_mapper.keys()):
if not maps_by_mapper[mapper_id]:
del maps_by_mapper[mapper_id]
continue
song = maps_by_mapper[mapper_id].pop(0)
song_hash = song.get('hash')
playlist_data.append(song)
logging.info(f"Song added: {song['songName']} by mapper {mapper_id}")
# Update history
history['beatsaver_mappers'][song_hash] = True
if len(playlist_data) >= song_count:
break
if not playlist_data:
logging.info("No new songs found to add to the playlist based on history")
else:
logging.info(f"Total songs added to playlist: {len(playlist_data)}")
save_history(history)
playlist_title = f"mappers-{new_count:02d}"
return playlist_data, playlist_title

View File

@ -0,0 +1,214 @@
from datetime import datetime, timezone
from typing import Dict, Any, List
import logging
import os
from dotenv import load_dotenv
load_dotenv()
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO').upper()
import logging
logging.basicConfig(
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=LOG_LEVEL
)
from helpers.ScoreSaberAPI import ScoreSaberAPI
from helpers.BeatLeaderAPI import BeatLeaderAPI
from saberlist.utils import prompt_for_player_id, load_history, save_history, format_time_ago, normalize_difficulty_name
def playlist_strategy_beatleader_oldscores(
api: BeatLeaderAPI,
song_count: int = 20
) -> List[Dict[str, Any]]:
player_id = prompt_for_player_id()
history = load_history()
history.setdefault('beatleader_oldscores', {})
history.setdefault('playlist_counts', {})
# Get the current count for BeatLeader old scores and increment it
count_key = 'beatleader_oldscores'
current_count = history['playlist_counts'].get(count_key, 0)
new_count = current_count + 1
history['playlist_counts'][count_key] = new_count
scores_data = api.get_player_scores(player_id)
all_scores = scores_data.get('playerScores', [])
if not all_scores:
logging.warning(f"No scores found for player ID {player_id} on BeatLeader.")
return [], ""
logging.debug(f"Found {len(all_scores)} scores for player ID {player_id} on BeatLeader.")
# Sort scores by epochTime in ascending order (oldest first)
all_scores.sort(key=lambda x: x.get('score', {}).get('epochTime', 0))
playlist_data = []
current_time = datetime.now(timezone.utc)
for score_entry in all_scores:
if len(playlist_data) >= song_count:
break # Stop if we've reached the desired number of songs
score = score_entry.get('score', {})
leaderboard = score_entry.get('leaderboard', {})
song_hash = leaderboard.get('songHash')
difficulty_raw = int(leaderboard.get('difficulty', ''))
game_mode = leaderboard.get('modeName', 'Standard')
epoch_time = score.get('epochTime')
if not song_hash or not difficulty_raw or not epoch_time:
logging.debug(f"Skipping score due to missing song_hash or difficulty_raw: {score_entry}")
continue
difficulty = normalize_difficulty_name(difficulty_raw)
# avoid reusing song+difficulty
if song_hash in history['beatleader_oldscores'] and difficulty in history['beatleader_oldscores'][song_hash]:
logging.debug(f"Skipping song {song_hash} with difficulty {difficulty} as it's in history.")
continue # Skip if already used
# Calculate time ago
try:
time_set = datetime.fromtimestamp(epoch_time, tz=timezone.utc)
except (ValueError, OSError) as e:
logging.error(f"Invalid epochTime for score ID {score.get('id')}: {e}")
continue
time_difference = current_time - time_set
time_ago = format_time_ago(time_difference)
# Format the song data for PlaylistBuilder
song_dict = {
'hash': song_hash,
'difficulties': [
{
'name': difficulty,
'characteristic': game_mode
}
]
}
# Add the song to the playlist
playlist_data.append(song_dict)
logging.debug(f"Selected song for playlist: Hash={song_hash}, Difficulty={difficulty}. Last played {time_ago} ago.")
# Update history
history['beatleader_oldscores'].setdefault(song_hash, []).append(difficulty)
# Log the final playlist
if not playlist_data:
logging.info("No new songs found to add to the playlist based on history for BeatLeader.")
else:
for song in playlist_data:
song_hash = song['hash']
difficulty = song['difficulties'][0]['name']
logging.info(f"Song added: Hash={song_hash}, Difficulty={difficulty}.")
logging.info(f"Total songs added to playlist from BeatLeader: {len(playlist_data)}")
save_history(history)
return playlist_data, f"beatleader_oldscores-{new_count:02d}"
def playlist_strategy_scoresaber_oldscores(
api: ScoreSaberAPI,
song_count: int = 20
) -> List[Dict[str, Any]]:
"""Build and format a list of songs based on old scores from ScoreSaber, avoiding reusing the same song+difficulty."""
player_id = prompt_for_player_id()
history = load_history()
history.setdefault('scoresaber_oldscores', {})
history.setdefault('playlist_counts', {})
# Get the current count for ScoreSaber old scores and increment it
count_key = 'scoresaber_oldscores'
current_count = history['playlist_counts'].get(count_key, 0)
new_count = current_count + 1
history['playlist_counts'][count_key] = new_count
scores_data = api.get_player_scores(player_id, use_cache=True)
all_scores = scores_data.get('playerScores', [])
if not all_scores:
logging.warning(f"No scores found for player ID {player_id}.")
return [], ""
logging.debug(f"Found {len(all_scores)} scores for player ID {player_id}.")
# Sort scores by timeSet in ascending order (oldest first)
all_scores.sort(key=lambda x: x['score'].get('timeSet', ''))
playlist_data = []
current_time = datetime.now(timezone.utc)
for score in all_scores:
leaderboard = score.get('leaderboard', {})
song_id = leaderboard.get('songHash')
difficulty_raw = leaderboard.get('difficulty', {}).get('difficultyRaw', '')
if not song_id or not difficulty_raw:
logging.debug(f"Skipping score due to missing song_id or difficulty_raw: {score}")
continue # Skip if essential data is missing
# Calculate time ago
time_set_str = score['score'].get('timeSet')
if not time_set_str:
logging.debug(f"Skipping score due to missing timeSet: {score}")
continue # Skip if time_set is missing
try:
time_set = datetime.fromisoformat(time_set_str.replace('Z', '+00:00'))
except ValueError as e:
logging.error(f"Invalid time format for score ID {score['score'].get('id')}: {e}")
continue
time_difference = current_time - time_set
time_ago = format_time_ago(time_difference)
# Normalize the difficulty name
difficulty = normalize_difficulty_name(difficulty_raw)
game_mode = leaderboard.get('difficulty', {}).get('gameMode', 'Standard')
if 'Standard' in game_mode:
game_mode = 'Standard'
# Check history to avoid reusing song+difficulty
if song_id in history['scoresaber_oldscores'] and difficulty in history['scoresaber_oldscores'][song_id]:
logging.debug(f"Skipping song {song_id} with difficulty {difficulty} as it's in history.")
continue # Skip if already used
# Format the song data as expected by PlaylistBuilder
song_dict = {
'hash': song_id,
'songName': leaderboard.get('songName', 'Unknown'),
'difficulties': [
{
'name': difficulty,
'characteristic': game_mode
}
]
}
# Add the song to the playlist
playlist_data.append(song_dict)
logging.debug(f"Selected song for playlist: {song_dict['songName']} ({difficulty})")
# Log the song addition
mapper = "Unknown" # Mapper information can be added if available
logging.info(f"Song added: {song_dict['songName']} ({difficulty}), mapped by {mapper}. Last played {time_ago} ago.")
# Check if the desired number of songs has been reached
if len(playlist_data) >= song_count:
logging.debug(f"Reached the desired song count: {song_count}.")
break
# Log if no songs were added
if not playlist_data:
logging.info("No new songs found to add to the playlist based on history.")
else:
logging.info(f"Total songs added to playlist: {len(playlist_data)}")
# Update history to avoid reusing the same song+difficulty
for song in playlist_data:
song_id = song['hash']
difficulty_name = song['difficulties'][0]['name']
history['scoresaber_oldscores'].setdefault(song_id, []).append(difficulty_name)
save_history(history)
return playlist_data, f"scoresaber_oldscores-{new_count:02d}"

View File

@ -0,0 +1,188 @@
from typing import Dict, Any, List
import argparse
import logging
import os
from dotenv import load_dotenv
load_dotenv()
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO').upper()
import logging
logging.basicConfig(
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=LOG_LEVEL
)
from helpers.ScoreSaberAPI import ScoreSaberAPI
from helpers.BeatLeaderAPI import BeatLeaderAPI
from saberlist.utils import prompt_for_player_id, load_history, save_history, normalize_difficulty_name
def playlist_strategy_beatleader_lowest_pp(
api: BeatLeaderAPI,
song_count: int = 20
) -> List[Dict[str, Any]]:
player_id = prompt_for_player_id()
history = load_history()
history.setdefault('beatleader_lowest_pp', {})
history.setdefault('playlist_counts', {})
# Get the current count for BeatLeader lowest PP and increment it
count_key = 'beatleader_lowest_pp'
current_count = history['playlist_counts'].get(count_key, 0)
new_count = current_count + 1
history['playlist_counts'][count_key] = new_count
scores_data = api.get_player_scores(player_id)
all_scores = scores_data.get('playerScores', [])
if not all_scores:
logging.warning(f"No scores found for player ID {player_id} on BeatLeader.")
return [], ""
logging.debug(f"Found {len(all_scores)} scores for player ID {player_id} on BeatLeader.")
# Filter out scores with zero PP and sort by PP in ascending order
ranked_scores = [s for s in all_scores if s.get('score', {}).get('pp', 0) > 0]
ranked_scores.sort(key=lambda x: x.get('score', {}).get('pp', float('inf')))
playlist_data = []
for score_entry in ranked_scores:
if len(playlist_data) >= song_count:
break # Stop if we've reached the desired number of songs
score = score_entry.get('score', {})
leaderboard = score_entry.get('leaderboard', {})
song_hash = leaderboard.get('songHash')
difficulty_raw = int(leaderboard.get('difficulty', ''))
game_mode = leaderboard.get('modeName', 'Standard')
pp = score.get('pp', 0)
if not song_hash or not difficulty_raw:
logging.debug(f"Skipping score due to missing song_hash or difficulty_raw: {score_entry}")
continue
difficulty = normalize_difficulty_name(difficulty_raw)
# avoid reusing song+difficulty
if song_hash in history['beatleader_lowest_pp'] and difficulty in history['beatleader_lowest_pp'][song_hash]:
logging.debug(f"Skipping song {song_hash} with difficulty {difficulty} as it's in history.")
continue # Skip if already used
# Format the song data for PlaylistBuilder
song_dict = {
'hash': song_hash,
'difficulties': [
{
'name': difficulty,
'characteristic': game_mode
}
]
}
# Add the song to the playlist
playlist_data.append(song_dict)
logging.debug(f"Selected song for playlist: Hash={song_hash}, Difficulty={difficulty}, PP={pp:.2f}")
# Update history
history['beatleader_lowest_pp'].setdefault(song_hash, []).append(difficulty)
# Log the final playlist
if not playlist_data:
logging.info("No new songs found to add to the playlist based on history for BeatLeader lowest PP.")
else:
for song in playlist_data:
song_hash = song['hash']
difficulty = song['difficulties'][0]['name']
logging.info(f"Song added: Hash={song_hash}, Difficulty={difficulty}")
logging.info(f"Total songs added to playlist from BeatLeader lowest PP: {len(playlist_data)}")
save_history(history)
return playlist_data, f"beatleader_lowest_pp-{new_count:02d}"
def playlist_strategy_scoresaber_lowest_pp(
api: ScoreSaberAPI,
song_count: int = 20
) -> List[Dict[str, Any]]:
"""Build and format a list of songs based on lowest PP scores from ScoreSaber, avoiding reusing the same song+difficulty."""
player_id = prompt_for_player_id()
history = load_history()
history.setdefault('scoresaber_lowest_pp', {})
history.setdefault('playlist_counts', {})
# Get the current count for ScoreSaber lowest PP and increment it
count_key = 'scoresaber_lowest_pp'
current_count = history['playlist_counts'].get(count_key, 0)
new_count = current_count + 1
history['playlist_counts'][count_key] = new_count
scores_data = api.get_player_scores(player_id, use_cache=True)
all_scores = scores_data.get('playerScores', [])
if not all_scores:
logging.warning(f"No scores found for player ID {player_id}.")
return [], ""
logging.debug(f"Found {len(all_scores)} scores for player ID {player_id}.")
# Filter out scores with zero PP and sort by PP in ascending order
ranked_scores = [s for s in all_scores if s['score'].get('pp', 0) > 0]
ranked_scores.sort(key=lambda x: x['score'].get('pp', float('inf')))
playlist_data = []
for score in ranked_scores:
leaderboard = score.get('leaderboard', {})
song_id = leaderboard.get('songHash')
difficulty_raw = leaderboard.get('difficulty', {}).get('difficultyRaw', '')
if not song_id or not difficulty_raw:
logging.debug(f"Skipping score due to missing song_id or difficulty_raw: {score}")
continue # Skip if essential data is missing
# Normalize the difficulty name
difficulty = normalize_difficulty_name(difficulty_raw)
game_mode = leaderboard.get('difficulty', {}).get('gameMode', 'Standard')
if 'Standard' in game_mode:
game_mode = 'Standard'
# Check history to avoid reusing song+difficulty
if song_id in history['scoresaber_lowest_pp'] and difficulty in history['scoresaber_lowest_pp'][song_id]:
logging.debug(f"Skipping song {song_id} with difficulty {difficulty} as it's in history.")
continue # Skip if already used
# Format the song data as expected by PlaylistBuilder
song_dict = {
'hash': song_id,
'songName': leaderboard.get('songName', 'Unknown'),
'difficulties': [
{
'name': difficulty,
'characteristic': game_mode
}
]
}
# Add the song to the playlist
playlist_data.append(song_dict)
pp_score = score['score'].get('pp', 0)
logging.info(f"Song added: {song_dict['songName']} ({difficulty}), PP: {pp_score:.2f}")
# Check if the desired number of songs has been reached
if len(playlist_data) >= song_count:
logging.debug(f"Reached the desired song count: {song_count}.")
break
# Log if no songs were added
if not playlist_data:
logging.info("No new songs found to add to the playlist based on history.")
else:
logging.info(f"Total songs added to playlist: {len(playlist_data)}")
# Update history to avoid reusing the same song+difficulty
for song in playlist_data:
song_id = song['hash']
difficulty_name = song['difficulties'][0]['name']
history['scoresaber_lowest_pp'].setdefault(song_id, []).append(difficulty_name)
save_history(history)
return playlist_data, f"scoresaber_lowest_pp-{new_count:02d}"

133
src/saberlist/utils.py Normal file
View File

@ -0,0 +1,133 @@
from datetime import timedelta
from typing import Dict, Any, List
import json
import logging
import os
from dotenv import load_dotenv
load_dotenv()
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO').upper()
HISTORY_FILE = os.environ.get('HISTORY_FILE', "playlist_history.json")
import logging
logging.basicConfig(
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=LOG_LEVEL
)
import calendar
def load_history() -> Dict[str, Any]:
"""
Load the playlist history from a JSON file.
:return: A dictionary containing the history.
"""
if os.path.exists(HISTORY_FILE):
with open(HISTORY_FILE, 'r') as f:
history = json.load(f)
history.setdefault('playlist_counts', {})
return history
return {'scoresaver': {}, 'playlist_counts': {}}
def save_history(history: Dict[str, Any]) -> None:
"""
Save the playlist history to a JSON file.
:param history: The history dictionary to save.
"""
with open(HISTORY_FILE, 'w') as f:
json.dump(history, f, indent=2)
def prompt_for_player_id(default_id: str = '76561199407393962') -> str:
"""
Prompt the user to enter a ScoreSaber or BeatLeader player ID.
Uses a default ID if the user presses Enter without input.
:param default_id: The default player ID to use.
:return: The player ID entered by the user or the default.
"""
prompt = f"Enter player ID (press Enter for default '{default_id}'): "
player_id = input(prompt).strip() or default_id
return player_id
def prompt_for_mapper_ids() -> List[int]:
default_mapper_ids = [
4285547, # Avexus
4330286, # VoltageO
4294118, # Spinvvy
4284542, # PogU (ForsenCDPogU)
4285738, # Lekrkoekj
113133 # Cush
]
prompt = f"Enter mapper IDs (Default: {default_mapper_ids}): "
mapper_ids = input(prompt).strip() or ",".join(map(str, default_mapper_ids))
return [int(id) for id in mapper_ids.split(',')]
def format_time_ago(time_difference: timedelta) -> str:
"""
Format a timedelta object into a human-readable string.
:param time_difference: The time difference to format.
:return: A string representing the time difference (e.g., '5d', '2w').
"""
days = time_difference.days
if days < 7:
return f"{days} day(s)"
elif days < 30:
weeks = days // 7
return f"{weeks} week(s)"
elif days < 365:
months = days // 30
return f"{months} month(s)"
else:
years = days // 365
return f"{years} year(s)"
def normalize_difficulty_name(difficulty_name):
difficulty_names = {
# ScoreSaber
'_ExpertPlus_SoloStandard': 'expertplus',
'_Expert_SoloStandard': 'expert',
'_Hard_SoloStandard': 'hard',
'_Normal_SoloStandard': 'normal',
'_Easy_SoloStandard': 'easy',
# BeatLeader
1: 'easy',
3: 'normal',
5: 'hard',
7: 'expert',
9: 'expertplus',
}
# Return the mapped value or the original name if there is no mapping
return difficulty_names.get(difficulty_name, difficulty_name)
"""deprecated in favor of using undocumented api call
def infer_beatleader_leaderboard_id(song_id: str, difficulty: str) -> str:
difficulty_map = {
'Easy': 1,
'Normal': 3,
'Hard': 5,
'Expert': 7,
'ExpertPlus': 9,
}
return f"{song_id}{difficulty_map[difficulty]}1"
"""
def reset_history(strategy: str) -> None:
"""
Reset the history for a given playlist strategy.
:param strategy: The strategy to reset history for.
"""
history = load_history()
if strategy in history:
del history[strategy]
if 'playlist_counts' in history and strategy in history['playlist_counts']:
history['playlist_counts'][strategy] = 0
save_history(history)
logging.info(f"History and playlist count for '{strategy}' have been reset.")
else:
logging.info(f"No history found for '{strategy}'. Nothing to reset.")