Add Beatleader class, tests, and a fun playlist builder.

This commit is contained in:
Brian Lee 2024-09-16 22:35:20 -07:00
parent 1116a4c59f
commit cb66d4dc2d
9 changed files with 588 additions and 15 deletions

78
BEATLEADER.md Normal file
View File

@ -0,0 +1,78 @@
# BeatLeaderAPI Python Wrapper
This Python class provides a convenient wrapper for interacting with the BeatLeader API, specifically for retrieving player scores and song data for the game Beat Saber.
## Features
- Fetch player scores and song data from the BeatLeader API
- Local caching of API responses to reduce API calls and improve performance
- Automatic pagination handling to retrieve all available data
- Configurable cache expiration
- Methods to retrieve both full and minimal song data
## Usage
### Basic Usage
```python
from saberlist.beatleaderAPI import BeatLeaderAPI
# Initialize the API wrapper
api = BeatLeaderAPI(cache_expiry_days=1)
# Fetch player scores
player_id = '76561199407393962'
scores = api.get_player_scores(player_id)
# Get full song data
songs = api.get_player_songs(player_id)
# Get minimal song data
minimal_songs = api.get_player_songs_minimal(player_id)
```
### Caching
The class uses a local cache to store API responses. By default, the cache is located at:
- `~/.cache/saberlist/` on Unix-like systems (if `~/.cache/` exists)
- `./.cache/` in the current working directory (as a fallback)
You can control cache behavior:
```python
# Set cache expiry (in days)
api = BeatLeaderAPI(cache_expiry_days=7)
# Force a fresh API call (ignore cache)
fresh_scores = api.get_player_scores(player_id, use_cache=False)
# Clear cache for a specific player
api.clear_cache(player_id)
# Clear all cache
api.clear_cache()
# Get the current cache directory
cache_dir = api.get_cache_dir()
```
### Pagination
The `get_player_scores` method automatically handles pagination to retrieve all available scores. You can control this behavior:
```python
# Set a custom page size (default is 100)
scores = api.get_player_scores(player_id, page_size=50)
# Limit the number of pages fetched
scores = api.get_player_scores(player_id, max_pages=5)
```
## Methods
- `get_player_scores(player_id, use_cache=True, page_size=100, max_pages=None)`: Retrieves all scores for a player.
- `get_player_songs(player_id, page=1, count=100, use_cache=True)`: Retrieves full song data for all unique songs a player has played.
- `get_player_songs_minimal(player_id, page=1, count=100, use_cache=True)`: Retrieves minimal song data (id, name, author, mapper, hash, bpm, duration) for all unique songs a player has played.
- `clear_cache(player_id=None)`: Clears the cache for a specific player or all cached data.
- `get_cache_dir()`: Returns the path to the current cache directory.

View File

@ -1,5 +1,8 @@
# playlist helper
TODO: use https://github.com/megamaz/beatsaver-python
## Usage
```sh
@ -70,3 +73,9 @@ Avoid printing covers in console.
```shell
jq 'del(.image)' < playlist.bplist
```
Find the most common mappers the player has played:
```shell
jq -r '.songs[].levelAuthorName' *.bplist | sort | uniq -c | sort -rn | head -n 10
```

View File

@ -14,7 +14,8 @@ dependencies = [
'build>=1.2.1',
'requests>=2.31.0',
'pytest>=8.1.1',
'PyScoreSaber>=1.0.10'
'PyScoreSaber>=1.0.10',
'beatsaver>=1.0.1'
]
requires-python = ">=3.8.10"
classifiers = [
@ -36,4 +37,7 @@ Homepage = "https://git.satstack.dev/blee/beatsaber-playlist-tool"
player_scores_by_stars = "saberlist.scoresaber:replay_ranked"
replay_all_by_acc = "saberlist.scoresaber:replay_all_by_acc"
leaderboard_songs_by_stars = "saberlist.scoresaber:leaderboard_songs"
#replay_ranked_bl = "saberlist.beatleader:replay_ranked"
star_ladder = "saberlist.beatleader:star_ladder"
[tool.pytest.ini_options]
pythonpath = ["src"]

View File

View File

@ -1,11 +1,101 @@
import requests
import json
from saberlist.beatleaderAPI import BeatLeaderAPI
from collections import defaultdict
from datetime import datetime, timedelta
# Specify the URL
blee_id = '76561199407393962'
#url = f'https://api.beatleader.xyz/player/{blee_id}/rankedMaps'
url = f'https://api.beatleader.xyz/player/{blee_id}/scores'
import logging
logging.basicConfig(
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.DEBUG
)
logger = logging.getLogger(__name__)
response = requests.get(url)
from collections import defaultdict
from datetime import datetime
response_content = json.loads(response.content)
def build_difficulty_based_playlist(api, player_id):
"""
Builds a custom difficulty-based playlist for a given player and saves it as a bplist file.
This function creates a playlist with the following structure:
- 5 songs with difficulty 0 to 3 stars (Easy to Normal)
- 5 songs with difficulty 4 to <6 stars (Hard to Expert)
- 5 songs with difficulty 6 to <7 stars (Expert+)
- 5 songs with difficulty 7+ stars (Expert++)
For each difficulty range, it selects the 5 songs that were played longest ago,
ensuring a mix of nostalgic tracks and skill-appropriate challenges.
The function handles duplicate songs by keeping only the oldest play for each unique song
within each difficulty group.
Parameters:
api (BeatLeaderAPI): An instance of the BeatLeaderAPI class to fetch player scores.
player_id (str): The unique identifier of the player.
Returns:
tuple: A tuple containing two elements:
- str: The file path of the created bplist file.
- list: A list of score objects included in the playlist.
The created bplist file is saved in the current directory with a name format:
"Custom Playlist - YYYY-MM-DD HH:MM:SS.bplist"
Note:
- This function uses cached score data if available. To force a fresh API call,
you may need to clear the cache or modify the api.get_player_scores() call.
- The function assumes that the BeatLeaderAPI class has a create_bplist() method
to generate the bplist file.
"""
# Get all scores for the player
scores_data = api.get_player_scores(player_id, use_cache=True)
all_scores = scores_data['data']
# Sort scores by play time (oldest first)
all_scores.sort(key=lambda x: x['timepost'])
# Group scores by difficulty
difficulty_groups = defaultdict(list)
for score in all_scores:
stars = score['leaderboard']['difficulty']['stars']
if 0 <= stars <= 3:
difficulty_groups[0].append(score)
elif 4 <= stars < 6:
difficulty_groups[1].append(score)
elif 6 <= stars < 7:
difficulty_groups[2].append(score)
elif stars >= 7:
difficulty_groups[3].append(score)
# Build the playlist
playlist_scores = []
for difficulty, count in [(0, 5), (1, 5), (2, 5), (3, 5)]:
# Remove duplicates (keep the oldest play for each unique song)
unique_songs = {}
for score in difficulty_groups[difficulty]:
song_id = score['leaderboard']['song']['id']
if song_id not in unique_songs or score['timepost'] < unique_songs[song_id]['timepost']:
unique_songs[song_id] = score
# Add the oldest unique songs to the playlist
playlist_scores.extend(sorted(unique_songs.values(), key=lambda x: x['timepost'])[:count])
# Create the bplist file
playlist_title = f"Custom Playlist - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
playlist_file = api.create_bplist(playlist_scores, playlist_title)
return playlist_file, playlist_scores
def star_ladder():
api = BeatLeaderAPI()
player_id = '76561199407393962'
playlist_file, playlist_scores = build_difficulty_based_playlist(api, player_id)
print(f"Playlist created: {playlist_file}")
print("Playlist contents:")
for i, score in enumerate(playlist_scores, 1):
song = score['leaderboard']['song']
difficulty = score['leaderboard']['difficulty']
print(f"{i}. {song['name']} by {song['author']} (Mapper: {song['mapper']}) - {difficulty['stars']:.2f} stars - Last played: {datetime.fromtimestamp(score['timepost'])}")

View File

@ -0,0 +1,202 @@
import requests
import json
import os
from datetime import datetime, timedelta
import logging
import time
import logging
logging.basicConfig(
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.DEBUG
)
class BeatLeaderAPI:
BASE_URL = "https://api.beatleader.xyz"
def __init__(self, cache_expiry_days=1):
self.session = requests.Session()
self.cache_expiry_days = cache_expiry_days
self.CACHE_DIR = self._determine_cache_dir()
if not os.path.exists(self.CACHE_DIR):
os.makedirs(self.CACHE_DIR)
def _determine_cache_dir(self):
home_cache = os.path.expanduser("~/.cache")
saberlist_cache = os.path.join(home_cache, "saberlist")
if os.path.exists(home_cache):
if not os.path.exists(saberlist_cache):
try:
os.makedirs(saberlist_cache)
logging.info(f"Created cache directory: {saberlist_cache}")
except OSError as e:
logging.warning(f"Failed to create {saberlist_cache}: {e}")
return os.path.join(os.getcwd(), ".cache")
return saberlist_cache
else:
logging.info("~/.cache doesn't exist, using local .cache directory")
return os.path.join(os.getcwd(), ".cache")
def _get_cache_filename(self, player_id):
return os.path.join(self.CACHE_DIR, f"player_{player_id}_scores.json")
def _is_cache_valid(self, cache_file):
if not os.path.exists(cache_file):
return False
file_modified_time = datetime.fromtimestamp(os.path.getmtime(cache_file))
return datetime.now() - file_modified_time < timedelta(days=self.cache_expiry_days)
def get_player_scores(self, player_id, use_cache=True, page_size=100, max_pages=None):
cache_file = self._get_cache_filename(player_id)
if use_cache and self._is_cache_valid(cache_file):
logging.debug(f"Using cached data for player {player_id}")
with open(cache_file, 'r') as f:
return json.load(f)
logging.debug(f"Fetching fresh data for player {player_id}")
url = f"{self.BASE_URL}/player/{player_id}/scores"
all_scores = []
page = 1
total_items = None
while max_pages is None or page <= max_pages:
params = {
"page": page,
"count": page_size
}
response = self.session.get(url, params=params)
response.raise_for_status()
data = response.json()
all_scores.extend(data['data'])
if total_items is None:
total_items = data['metadata']['total']
if len(all_scores) >= total_items:
break
page += 1
time.sleep(1) # Add a small delay to avoid rate limiting
result = {
'metadata': {
'total': total_items,
'itemsPerPage': page_size,
'page': page
},
'data': all_scores
}
with open(cache_file, 'w') as f:
json.dump(result, f)
return result
def get_player_songs(self, player_id, page=1, count=100, use_cache=True):
scores = self.get_player_scores(player_id, page, count, use_cache)
songs = []
song_ids = set()
for score in scores['data']:
song = score['leaderboard']['song']
if song['id'] not in song_ids:
songs.append(song)
song_ids.add(song['id'])
return songs
def get_player_songs_minimal(self, player_id, page=1, count=100, use_cache=True):
full_songs = self.get_player_songs(player_id, page, count, use_cache)
return [self._get_minimal_song_data(song) for song in full_songs]
def _get_minimal_song_data(self, song):
return {
'id': song['id'],
'name': song['name'],
'author': song['author'],
'mapper': song['mapper'],
'hash': song['hash'],
'bpm': song['bpm'],
'duration': song['duration']
}
def clear_cache(self, player_id=None):
if player_id:
cache_file = self._get_cache_filename(player_id)
if os.path.exists(cache_file):
os.remove(cache_file)
logging.debug(f"Cleared cache for player {player_id}")
else:
for file in os.listdir(self.CACHE_DIR):
os.remove(os.path.join(self.CACHE_DIR, file))
logging.debug("Cleared all cache")
def get_cache_dir(self):
return self.CACHE_DIR
def create_bplist(self, scores, playlist_title="playlist", playlist_author="SaberList Tool", song_limit=0):
"""
Create a bplist (JSON) file in the current directory from the given scores data.
:param scores: List of score data from get_player_scores
:param playlist_title: Title of the playlist (default: "playlist")
:param playlist_author: Author of the playlist (default: "SaberList Tool")
:param song_limit: Maximum number of songs to include (0 for no limit)
:return: Path to the created bplist file
"""
playlist = {
"playlistTitle": playlist_title,
"playlistAuthor": playlist_author,
"songs": []
}
# Determine the number of songs to include
num_songs = len(scores) if song_limit == 0 else min(song_limit, len(scores))
for score in scores[:num_songs]:
song = score['leaderboard']['song']
difficulty = score['leaderboard']['difficulty']
song_entry = {
"hash": song['hash'],
"songName": song['name'],
"difficulties": [
{
"name": difficulty['difficultyName'].lower(),
"characteristic": difficulty['modeName'].lower()
}
],
"levelAuthorName": song['mapper']
}
playlist["songs"].append(song_entry)
# Generate a unique filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{playlist_title}_{timestamp}.bplist"
# Write the playlist to a file
with open(filename, 'w') as file:
json.dump(playlist, file, indent=4)
logging.info(f"Playlist written to {filename} with {num_songs} songs")
return os.path.abspath(filename)
def create_player_playlist(self, player_id, playlist_title="playlist", playlist_author="SaberList Tool", song_limit=0, use_cache=True):
"""
Create a bplist (JSON) file for a player's scores.
:param player_id: ID of the player
:param playlist_title: Title of the playlist (default: "playlist")
:param playlist_author: Author of the playlist (default: "SaberList Tool")
:param song_limit: Maximum number of songs to include (0 for no limit)
:param use_cache: Whether to use cached scores data (default: True)
:return: Path to the created bplist file
"""
scores_data = self.get_player_scores(player_id, use_cache=use_cache)
return self.create_bplist(scores_data['data'], playlist_title, playlist_author, song_limit)

View File

@ -55,14 +55,17 @@ def leaderboards_to_playlist(leaderboards: list, playlist_title: str, playlist_a
logging.info(f"Playlist written to {playlist_title}.bplist")
return playlist_json
def scores_to_playlist(scores: list, playlist_title, playlist_author = "SaberList Tool"):
def scores_to_playlist(scores: list, playlist_title, playlist_author="SaberList Tool", song_limit=0):
playlist = {
"playlistTitle": playlist_title,
"playlistAuthor": playlist_author,
"songs": []
}
for score in scores:
# Determine the number of songs to include
num_songs = len(scores) if song_limit == 0 else min(song_limit, len(scores))
for score in scores[:num_songs]:
song_entry = {
"hash": score.leaderboard.song_hash,
"songName": score.leaderboard.song_name,
@ -79,7 +82,7 @@ def scores_to_playlist(scores: list, playlist_title, playlist_author = "SaberLis
playlist_json = json.dumps(playlist, indent=4)
with open(f"{playlist_title}.bplist", 'w') as file:
file.write(playlist_json)
logging.info(f"Playlist written to {playlist_title}.bplist")
logging.info(f"Playlist written to {playlist_title}.bplist with {num_songs} songs")
return playlist_json
@ -94,6 +97,7 @@ async def async_replay_all_by_acc():
default_title = "Replay SS"
default_player_id = '76561199407393962'
player_id = input(f"Enter the playerid (Default: {default_player_id}): ") or default_player_id
song_limit = input("Limit the playlist but number of songs (Default: 0, ie. no limit): ") or 0
async for player_scores in scoresaber.player_scores_all(player_id, score_sort):
scores.extend(player_scores)
@ -105,7 +109,7 @@ async def async_replay_all_by_acc():
playlist_title = f"{default_title} {stars}"
filtered_sorted_scores = filter_and_sort_scores_by_acc(scores, stars, stars + 1)
if filtered_sorted_scores:
scores_to_playlist(filtered_sorted_scores, playlist_title)
scores_to_playlist(filtered_sorted_scores, playlist_title, song_limit=int(song_limit))
else:
print(f"No scores found for {stars}")
else:
@ -158,7 +162,7 @@ def leaderboard_songs():
min_stars = float(input(f"Enter the minimum starlevel to include on the playlist (Default: {default_min_stars}): ") or default_min_stars)
default_max_stars = min_stars + 0.10
max_stars = float(input(f"Enter the maximum starlevel to include on the playlist (Default: {default_max_stars}): ") or default_max_stars)
default_title = f"SS Leaderboard {min_stars}"
default_title = f"SS{min_stars}"
playlist_title = input(f"Enter the filename for the playlist (Default: {default_title}): ") or default_title
try:

View File

@ -0,0 +1,80 @@
# This test hits the live api, and serves as a sanity check for the BeatLeaderAPI class.
# Run this sparingly! Ideally you should only need it just before tagging a release version.
import pytest
import os
import logging
from saberlist.beatleaderAPI import BeatLeaderAPI
# Set up logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# Test player ID (you may want to use a known player ID for consistent testing)
TEST_PLAYER_ID = '76561199407393962'
@pytest.fixture(scope="module")
def api():
return BeatLeaderAPI(cache_expiry_days=0) # Set to 0 to always get fresh data during tests
def test_cache_directory(api):
cache_dir = api.get_cache_dir()
assert os.path.exists(cache_dir), f"Cache directory {cache_dir} does not exist"
assert cache_dir.endswith('saberlist'), f"Cache directory {cache_dir} does not end with 'saberlist'"
def test_get_player_scores(api):
scores = api.get_player_scores(TEST_PLAYER_ID, use_cache=False)
assert isinstance(scores, dict), "get_player_scores should return a dictionary"
assert 'data' in scores, "Scores response should contain 'data' key"
assert len(scores['data']) > 0, "Player should have at least one score"
def test_get_player_songs(api):
songs = api.get_player_songs(TEST_PLAYER_ID, use_cache=False)
assert isinstance(songs, list), "get_player_songs should return a list"
assert len(songs) > 0, "Player should have at least one song"
# Check if all expected keys are present in the first song
expected_keys = ['id', 'name', 'author', 'mapper', 'hash', 'bpm', 'duration']
assert all(key in songs[0] for key in expected_keys), f"Song is missing some expected keys. Got: {songs[0].keys()}"
def test_get_player_songs_minimal(api):
songs = api.get_player_songs_minimal(TEST_PLAYER_ID, use_cache=False)
assert isinstance(songs, list), "get_player_songs_minimal should return a list"
assert len(songs) > 0, "Player should have at least one song"
# Check if only the minimal keys are present in the first song
expected_keys = ['id', 'name', 'author', 'mapper', 'hash', 'bpm', 'duration']
assert set(songs[0].keys()) == set(expected_keys), f"Minimal song data has unexpected keys. Got: {songs[0].keys()}"
def test_caching(api):
# First call should hit the API
songs1 = api.get_player_songs(TEST_PLAYER_ID, use_cache=True)
# Second call should use cache
songs2 = api.get_player_songs(TEST_PLAYER_ID, use_cache=True)
assert songs1 == songs2, "Cached result should be the same as the initial API call"
# Force a fresh API call
songs3 = api.get_player_songs(TEST_PLAYER_ID, use_cache=False)
# The results might be different if the player has new scores, but the structure should be the same
assert type(songs1) == type(songs3), "Fresh API call should return the same data structure"
def test_clear_cache(api):
# Ensure there's something in the cache
api.get_player_songs(TEST_PLAYER_ID, use_cache=True)
# Clear cache for the test player
api.clear_cache(TEST_PLAYER_ID)
cache_file = api._get_cache_filename(TEST_PLAYER_ID)
assert not os.path.exists(cache_file), f"Cache file for player {TEST_PLAYER_ID} should not exist after clearing"
# Clear all cache
api.get_player_songs(TEST_PLAYER_ID, use_cache=True) # Recreate some cache
api.clear_cache()
assert len(os.listdir(api.get_cache_dir())) == 0, "Cache directory should be empty after clearing all cache"
if __name__ == "__main__":
pytest.main([__file__, "-v"])

106
tests/test_beatleader.py Normal file
View File

@ -0,0 +1,106 @@
# tests/test_beatleader.py
import pytest
from unittest.mock import patch, Mock
from saberlist.beatleader import BeatLeaderAPI
@pytest.fixture
def mock_response():
mock = Mock()
mock.json.return_value = {
"metadata": {"itemsPerPage": 100, "page": 1, "total": 2},
"data": [
{
"leaderboard": {
"song": {
"id": "song1",
"name": "Test Song 1",
"author": "Test Author 1",
"mapper": "Test Mapper 1",
"hash": "testhash1",
"bpm": 120,
"duration": 180
}
}
},
{
"leaderboard": {
"song": {
"id": "song2",
"name": "Test Song 2",
"author": "Test Author 2",
"mapper": "Test Mapper 2",
"hash": "testhash2",
"bpm": 140,
"duration": 200
}
}
}
]
}
return mock
@pytest.fixture
def beat_leader_api():
return BeatLeaderAPI()
@patch('requests.Session.get')
def test_get_player_scores(mock_get, beat_leader_api, mock_response):
mock_get.return_value = mock_response
player_id = '123456'
result = beat_leader_api.get_player_scores(player_id)
assert result == mock_response.json.return_value
mock_get.assert_called_once_with(
f"{BeatLeaderAPI.BASE_URL}/player/{player_id}/scores",
params={"page": 1, "count": 100}
)
@patch('requests.Session.get')
def test_get_player_songs(mock_get, beat_leader_api, mock_response):
mock_get.return_value = mock_response
player_id = '123456'
result = beat_leader_api.get_player_songs(player_id)
expected_songs = [
{
'id': 'song1',
'name': 'Test Song 1',
'author': 'Test Author 1',
'mapper': 'Test Mapper 1',
'hash': 'testhash1',
'bpm': 120,
'duration': 180
},
{
'id': 'song2',
'name': 'Test Song 2',
'author': 'Test Author 2',
'mapper': 'Test Mapper 2',
'hash': 'testhash2',
'bpm': 140,
'duration': 200
}
]
assert result == expected_songs
mock_get.assert_called_once_with(
f"{BeatLeaderAPI.BASE_URL}/player/{player_id}/scores",
params={"page": 1, "count": 100}
)
def test_get_player_songs_unique(beat_leader_api):
with patch.object(beat_leader_api, 'get_player_scores') as mock_get_scores:
mock_get_scores.return_value = {
"data": [
{"leaderboard": {"song": {"id": "song1", "name": "Song 1", "author": "Author 1", "mapper": "Mapper 1", "hash": "hash1", "bpm": 120, "duration": 180}}},
{"leaderboard": {"song": {"id": "song1", "name": "Song 1", "author": "Author 1", "mapper": "Mapper 1", "hash": "hash1", "bpm": 120, "duration": 180}}},
{"leaderboard": {"song": {"id": "song2", "name": "Song 2", "author": "Author 2", "mapper": "Mapper 2", "hash": "hash2", "bpm": 140, "duration": 200}}}
]
}
result = beat_leader_api.get_player_songs('123456')
assert len(result) == 2
assert result[0]['id'] == 'song1'
assert result[1]['id'] == 'song2'