Add strategy to pick maps that are ranked on both SS and BL

This commit is contained in:
Brian Lee 2025-01-26 20:48:12 -08:00
parent 3b514c46a9
commit 3b1d66cb30
6 changed files with 350 additions and 10 deletions

View File

@ -23,6 +23,24 @@ scores_data = beatleader_api.get_player_scores(
max_pages=2 # Maximum number of pages to fetch
)
print(f"Got {len(scores_data.get('playerScores'))} scores for player {player_id}")
# getting all ranked songs
all_beatleader_ranked_maps = beatleader_api.get_player_scores(
player_id=player_id,
use_cache=True, # Use cached data if available
count=100, # Number of scores per page
sort_by=ScoresSortBy.DATE, # Sort scores by date
order=Order.DESC, # In descending order
max_pages=2 # Maximum number of pages to fetch
)
```
## SimpleScoreSaberClient
```python
from helpers.SimpleScoreSaberAPI import SimpleScoreSaberAPI
scoresaber_api = SimpleScoreSaberAPI()
scoresaber_all_ranked_maps = scoresaber_api.get_ranked_maps(use_cache=False)
```
## BeatSaverClient

View File

@ -15,7 +15,7 @@ This simple Python class provides a convenient wrapper for interacting with the
### Basic Usage
```python
from saberlist.SimpleBeatLeaderAPI import BeatLeaderAPI
from helpers.SimpleBeatLeaderAPI import SimpleBeatLeaderAPI
# Initialize the API wrapper
api = SimpleBeatLeaderAPI(cache_expiry_days=1)

View File

@ -219,3 +219,77 @@ class SimpleBeatLeaderAPI:
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching acc graph for player {player_id}: {e}")
return None
def get_ranked_maps(self, stars_from=5, stars_to=10, use_cache=True):
"""
Retrieve all ranked maps within the specified star range, handling pagination and caching.
:param stars_from: Minimum star rating
:param stars_to: Maximum star rating
:param use_cache: Whether to use cached data if available (default: True)
:return: List of ranked maps
"""
cache_file = os.path.join(self.CACHE_DIR, f"ranked_maps_{stars_from}_{stars_to}.json")
if use_cache and self._is_cache_valid(cache_file):
logging.debug(f"Using cached data for ranked maps (stars: {stars_from}-{stars_to})")
with open(cache_file, 'r') as f:
return json.load(f)
logging.debug(f"Fetching ranked maps from API (stars: {stars_from}-{stars_to})")
url = f"{self.BASE_URL}/leaderboards"
all_maps = []
page = 1
page_size = 100 # Number of items per page
total_items = None
while True:
params = {
"page": page,
"count": page_size,
# "sortBy": 0,
# "order": 0,
"type": "ranked",
# "allTypes": 0,
# "mapRequirements": -1,
# "allRequirements": 0,
# "songStatus": 62,
# "leaderboardContext": 254,
# "mytype": 0,
"stars_from": stars_from,
"stars_to": stars_to
}
try:
response = self.session.get(url, params=params)
response.raise_for_status()
data = response.json()
all_maps.extend(data.get('data', []))
if total_items is None:
total_items = data.get('metadata', {}).get('total', 0)
if total_items == 0:
logging.info("No ranked maps found for the specified star range.")
break
logging.debug(f"Fetched page {page} with {len(data.get('data', []))} maps.")
if len(all_maps) >= total_items:
break
page += 1
sleep(1) # To respect API rate limits
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching ranked maps: {e}")
break
if use_cache:
# Cache the results only if use_cache is True
with open(cache_file, 'w') as f:
json.dump(all_maps, f, indent=4)
logging.debug(f"Cached {len(all_maps)} ranked maps to {cache_file}")
return all_maps

View File

@ -0,0 +1,145 @@
import os
import json
import logging
from datetime import datetime, timedelta
from time import sleep
import requests
logging.basicConfig(
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.DEBUG
)
class SimpleScoreSaberAPI:
BASE_URL = "https://scoresaber.com/api"
def __init__(self, cache_expiry_days=1):
self.session = requests.Session()
self.cache_expiry_days = cache_expiry_days
self.CACHE_DIR = self._determine_cache_dir()
if not os.path.exists(self.CACHE_DIR):
os.makedirs(self.CACHE_DIR)
logging.info(f"Created cache directory: {self.CACHE_DIR}")
def _determine_cache_dir(self):
home_cache = os.path.expanduser("~/.cache")
saberlist_cache = os.path.join(home_cache, "saberlist")
scoresaber_cache = os.path.join(saberlist_cache, "scoresaber")
if os.path.exists(home_cache):
if not os.path.exists(saberlist_cache):
try:
os.makedirs(saberlist_cache)
logging.info(f"Created cache directory: {saberlist_cache}")
except OSError as e:
logging.warning(f"Failed to create {saberlist_cache}: {e}")
return os.path.join(os.getcwd(), ".cache")
if not os.path.exists(scoresaber_cache):
try:
os.makedirs(scoresaber_cache)
logging.info(f"Created cache directory: {scoresaber_cache}")
except OSError as e:
logging.warning(f"Failed to create {scoresaber_cache}: {e}")
return os.path.join(os.getcwd(), ".cache")
return scoresaber_cache
else:
logging.info("~/.cache doesn't exist, using local .cache directory")
return os.path.join(os.getcwd(), ".cache", "scoresaber")
def _get_cache_filename(self, min_star, max_star):
return os.path.join(self.CACHE_DIR, f"ranked_maps_{min_star}_{max_star}.json")
def _is_cache_valid(self, cache_file):
if not os.path.exists(cache_file):
return False
file_modified_time = datetime.fromtimestamp(os.path.getmtime(cache_file))
return datetime.now() - file_modified_time < timedelta(days=self.cache_expiry_days)
def clear_cache(self, min_star=None, max_star=None):
"""
Clear cached data.
:param min_star: Minimum star rating filter for specific cache file
:param max_star: Maximum star rating filter for specific cache file
"""
if min_star is not None and max_star is not None:
cache_file = self._get_cache_filename(min_star, max_star)
if os.path.exists(cache_file):
os.remove(cache_file)
logging.debug(f"Cleared cache for ranked maps (stars: {min_star}-{max_star})")
else:
for file in os.listdir(self.CACHE_DIR):
file_path = os.path.join(self.CACHE_DIR, file)
if os.path.isfile(file_path):
os.remove(file_path)
logging.debug("Cleared all ranked maps cache")
def get_cache_dir(self):
"""
Get the cache directory path.
:return: Cache directory path as a string
"""
return self.CACHE_DIR
def get_ranked_maps(self, min_star=5, max_star=10, use_cache=True, limit=100, max_pages=None):
"""
Retrieve all ranked maps within the specified star range.
The api doesn't actually tell you how many results there are, so we have to fetch all pages until we get back no data
:param min_star: Minimum star rating (inclusive)
:param max_star: Maximum star rating (inclusive)
:param use_cache: Whether to use cached data if available (default: True)
:param max_pages: Maximum number of pages to fetch (default: None, fetch all)
:return: List of ranked maps
"""
cache_file = self._get_cache_filename(min_star, max_star)
if use_cache and self._is_cache_valid(cache_file):
logging.debug(f"Using cached data for ranked maps (stars: {min_star}-{max_star})")
with open(cache_file, 'r') as f:
return json.load(f)
logging.debug(f"Fetching ranked maps from API (stars: {min_star}-{max_star})")
url = f"{self.BASE_URL}/leaderboards"
all_maps = []
page = 1
while max_pages is None or page <= max_pages:
params = {
"minStar": min_star,
"maxStar": max_star,
"unique": "true",
"ranked": "true",
"page": page
}
try:
response = self.session.get(url, params=params)
response.raise_for_status()
data = response.json()
leaderboards = data.get('leaderboards', [])
if not leaderboards:
logging.debug("Empty page reached, stopping")
break
all_maps.extend(leaderboards)
logging.info(f"Fetched page {page} with {len(leaderboards)} maps.")
page += 1
sleep(0.5)
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching ranked maps: {e}")
break
# Cache the results
with open(cache_file, 'w') as f:
json.dump(all_maps, f, indent=4)
logging.debug(f"Cached {len(all_maps)} ranked maps to {cache_file}")
return all_maps

View File

@ -18,12 +18,14 @@ from helpers.PlaylistBuilder import PlaylistBuilder
from helpers.ScoreSaberAPI import ScoreSaberAPI
from helpers.BeatLeaderAPI import BeatLeaderAPI
from helpers.SimpleBeatLeaderAPI import SimpleBeatLeaderAPI
from helpers.SimpleScoreSaberAPI import SimpleScoreSaberAPI
from helpers.SimpleBeatSaverAPI import SimpleBeatSaverAPI
from saberlist.utils import reset_history
from saberlist.playlist_strategies.oldscores import (
playlist_strategy_beatleader_oldscores,
playlist_strategy_scoresaber_oldscores,
playlist_strategy_ranked_both,
)
from saberlist.playlist_strategies.accuracy import (
playlist_strategy_beatleader_lowest_acc,
@ -67,6 +69,13 @@ def saberlist() -> None:
)
playlist_builder = PlaylistBuilder()
elif strategy == 'ranked_both':
playlist_data, playlist_title = playlist_strategy_ranked_both(
SimpleBeatLeaderAPI(cache_expiry_days=CACHE_EXPIRY_DAYS),
SimpleScoreSaberAPI(cache_expiry_days=CACHE_EXPIRY_DAYS)
)
playlist_builder = PlaylistBuilder(covers_dir='./covers/kaiju')
elif strategy == 'beatleader_accuracy_gaps':
playlist_data, playlist_title = playlist_strategy_beatleader_accuracy_gaps(
SimpleBeatLeaderAPI(cache_expiry_days=CACHE_EXPIRY_DAYS)
@ -127,28 +136,35 @@ def parse_args_subcommands():
help="Choose which sub-command (strategy) to run"
)
# 1) -------- scoresaber_oldscores --------
# -------- scoresaber_oldscores --------
parser_ss_old = subparsers.add_parser("scoresaber_oldscores",
help="Generate a playlist using ScoreSaber old-scores strategy")
parser_ss_old.add_argument("-r", "--reset",
action="store_true",
help="Reset the history for scoresaber_oldscores")
# 2) -------- beatleader_oldscores --------
# -------- beatleader_oldscores --------
parser_bl_old = subparsers.add_parser("beatleader_oldscores",
help="Generate a playlist using BeatLeader old-scores strategy")
parser_bl_old.add_argument("-r", "--reset",
action="store_true",
help="Reset the history for beatleader_oldscores")
# 3) -------- beatleader_accuracy_gaps --------
# -------- ranked_both --------
parser_ranked_both = subparsers.add_parser("ranked_both",
help="Generate a playlist using ranked_both strategy")
parser_ranked_both.add_argument("-r", "--reset",
action="store_true",
help="Reset the history for ranked_both")
# -------- beatleader_accuracy_gaps --------
parser_bl_acc_gaps = subparsers.add_parser("beatleader_accuracy_gaps",
help="Generate a playlist using BeatLeader accuracy gaps strategy")
parser_bl_acc_gaps.add_argument("-r", "--reset",
action="store_true",
help="Reset the history for beatleader_accuracy_gaps")
# 4) -------- beatleader_accuracy_gaps_star_range --------
# -------- beatleader_accuracy_gaps_star_range --------
parser_bl_acc_stars = subparsers.add_parser("beatleader_accuracy_gaps_star_range",
help="Generate a playlist for accuracy gaps within a star range (BeatLeader)")
parser_bl_acc_stars.add_argument("-r", "--reset",
@ -158,28 +174,28 @@ def parse_args_subcommands():
type=float,
help="Star level to filter on")
# 5) -------- scoresaber_accuracy_gaps --------
# -------- scoresaber_accuracy_gaps --------
parser_ss_acc_gaps = subparsers.add_parser("scoresaber_accuracy_gaps",
help="Generate a playlist using ScoreSaber accuracy gap strategy")
parser_ss_acc_gaps.add_argument("-r", "--reset",
action="store_true",
help="Reset the history for scoresaber_accuracy_gaps")
# 6) -------- beatsaver_curated --------
# -------- beatsaver_curated --------
parser_bs_curated = subparsers.add_parser("beatsaver_curated",
help="Generate a curated BeatSaver playlist")
parser_bs_curated.add_argument("-r", "--reset",
action="store_true",
help="Reset the history for beatsaver_curated")
# 7) -------- beatsaver_mappers --------
# -------- beatsaver_mappers --------
parser_bs_mappers = subparsers.add_parser("beatsaver_mappers",
help="Generate a playlist for specified BeatSaver mappers")
parser_bs_mappers.add_argument("-r", "--reset",
action="store_true",
help="Reset the history for beatsaver_mappers")
# 8) -------- blank_playlist --------
# -------- blank_playlist --------
parser_blank = subparsers.add_parser("blank_playlist",
help="Generate a blank playlist (no songs, just a descriptor)")
parser_blank.add_argument("-r", "--reset",

View File

@ -15,9 +15,96 @@ logging.basicConfig(
from helpers.ScoreSaberAPI import ScoreSaberAPI
from helpers.BeatLeaderAPI import BeatLeaderAPI
from helpers.SimpleBeatLeaderAPI import SimpleBeatLeaderAPI
from helpers.SimpleScoreSaberAPI import SimpleScoreSaberAPI
from saberlist.utils import prompt_for_player_id, load_history, save_history, format_time_ago, normalize_difficulty_name
def playlist_strategy_ranked_both(
beatleader_api: SimpleBeatLeaderAPI,
scoresaber_api: SimpleScoreSaberAPI,
song_count: int = 40
) -> List[Dict[str, Any]]:
"""
Build and format a list of songs that are ranked on both BeatLeader and ScoreSaber,
avoiding reusing the same song+difficulty.
Returns:
Tuple[List[Dict[str, Any]], str]: A list of playlist songs and a formatted playlist identifier.
"""
history = load_history()
history.setdefault('ranked_both', {})
history.setdefault('playlist_counts', {})
# Get the current count for the ranked_both strategy and increment it
count_key = 'ranked_both'
current_count = history['playlist_counts'].get(count_key, 0)
new_count = current_count + 1
history['playlist_counts'][count_key] = new_count
# Fetch ranked maps from both APIs
logging.debug("Fetching ranked maps from BeatLeader...")
beatleader_ranked_maps = beatleader_api.get_ranked_maps(stars_from=5, stars_to=10)
beatleader_song_hashes = {
map_data['song']['hash']
for map_data in beatleader_ranked_maps
if 'song' in map_data and 'hash' in map_data['song']
}
logging.info(f"Retrieved {len(beatleader_song_hashes)} ranked maps from BeatLeader.")
logging.debug("Fetching ranked maps from ScoreSaber...")
scoresaber_ranked_maps = scoresaber_api.get_ranked_maps(min_star=5, max_star=10)
scoresaber_song_hashes = {
map_data['songHash']
for map_data in scoresaber_ranked_maps
if 'songHash' in map_data
}
logging.info(f"Retrieved {len(scoresaber_song_hashes)} ranked maps from ScoreSaber.")
# Find intersection of hashes to get songs ranked on both platforms
common_song_hashes = beatleader_song_hashes.intersection(scoresaber_song_hashes)
logging.info(f"Found {len(common_song_hashes)} songs ranked on both BeatLeader and ScoreSaber.")
if not common_song_hashes:
logging.warning("No common ranked songs found between BeatLeader and ScoreSaber.")
return [], ""
playlist_data = []
for song_hash in common_song_hashes:
if len(playlist_data) >= song_count:
logging.debug(f"Reached the desired song count: {song_count}.")
break
# avoid reusing songs
if song_hash in history['ranked_both']:
logging.debug(f"Skipping song {song_hash} as it's in history.")
continue
# Format the song data for PlaylistBuilder
song_dict = {
'hash': song_hash,
}
# Add the song to the playlist
playlist_data.append(song_dict)
# Update history
history['ranked_both'].setdefault(song_hash, [])
# Log the final playlist
if not playlist_data:
logging.info("No new songs found to add to the playlist based on history for ranked_both.")
else:
for song in playlist_data:
song_hash = song['hash']
logging.info(f"Song added: Hash={song_hash}.")
logging.info(f"Total songs added to playlist from ranked_both: {len(playlist_data)}")
save_history(history)
return playlist_data, f"ranked_both-{new_count:02d}"
def playlist_strategy_beatleader_oldscores(
api: BeatLeaderAPI,
song_count: int = 20