319 lines
12 KiB
Python

from datetime import datetime, timezone
from typing import Dict, Any, List
import logging
import os
from dotenv import load_dotenv
load_dotenv()
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO').upper()
import logging
logging.basicConfig(
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=LOG_LEVEL
)
from helpers.BeatLeaderAPI import BeatLeaderAPI
from helpers.SimpleBeatLeaderAPI import SimpleBeatLeaderAPI
from helpers.SimpleScoreSaberAPI import SimpleScoreSaberAPI
from saberlist.utils import prompt_for_player_id, load_history, save_history, format_time_ago, normalize_difficulty_name
def playlist_strategy_ranked_both(
beatleader_api: SimpleBeatLeaderAPI,
scoresaber_api: SimpleScoreSaberAPI,
song_count: int = 40
) -> List[Dict[str, Any]]:
"""
Build and format a list of songs that are ranked on both BeatLeader and ScoreSaber,
avoiding reusing the same song+difficulty.
Returns:
Tuple[List[Dict[str, Any]], str]: A list of playlist songs and a formatted playlist identifier.
"""
history = load_history()
history.setdefault('ranked_both', {})
history.setdefault('playlist_counts', {})
# Get the current count for the ranked_both strategy and increment it
count_key = 'ranked_both'
current_count = history['playlist_counts'].get(count_key, 0)
new_count = current_count + 1
history['playlist_counts'][count_key] = new_count
# Fetch ranked maps from both APIs
logging.debug("Fetching ranked maps from BeatLeader...")
beatleader_ranked_maps = beatleader_api.get_ranked_maps(stars_from=5, stars_to=10)
beatleader_song_hashes = {
map_data['song']['hash']
for map_data in beatleader_ranked_maps
if 'song' in map_data and 'hash' in map_data['song']
}
logging.info(f"Retrieved {len(beatleader_song_hashes)} ranked maps from BeatLeader.")
logging.debug("Fetching ranked maps from ScoreSaber...")
scoresaber_ranked_maps = scoresaber_api.get_ranked_maps(min_star=5, max_star=10)
scoresaber_song_hashes = {
map_data['songHash']
for map_data in scoresaber_ranked_maps
if 'songHash' in map_data
}
logging.info(f"Retrieved {len(scoresaber_song_hashes)} ranked maps from ScoreSaber.")
# Find intersection of hashes to get songs ranked on both platforms
common_song_hashes = beatleader_song_hashes.intersection(scoresaber_song_hashes)
logging.info(f"Found {len(common_song_hashes)} songs ranked on both BeatLeader and ScoreSaber.")
if not common_song_hashes:
logging.warning("No common ranked songs found between BeatLeader and ScoreSaber.")
return [], ""
playlist_data = []
for song_hash in common_song_hashes:
if len(playlist_data) >= song_count:
logging.debug(f"Reached the desired song count: {song_count}.")
break
# avoid reusing songs
if song_hash in history['ranked_both']:
logging.debug(f"Skipping song {song_hash} as it's in history.")
continue
# Format the song data for PlaylistBuilder
song_dict = {
'hash': song_hash,
}
# Add the song to the playlist
playlist_data.append(song_dict)
# Update history
history['ranked_both'].setdefault(song_hash, [])
# Log the final playlist
if not playlist_data:
logging.info("No new songs found to add to the playlist based on history for ranked_both.")
else:
for song in playlist_data:
song_hash = song['hash']
logging.info(f"Song added: Hash={song_hash}.")
logging.info(f"Total songs added to playlist from ranked_both: {len(playlist_data)}")
save_history(history)
return playlist_data, f"ranked_both-{new_count:02d}"
def playlist_strategy_beatleader_oldscores_stars(
beatleader_api: SimpleBeatLeaderAPI,
song_count: int = 40,
stars: float = 6.0
) -> List[Dict[str, Any]]:
"""
Build and format a list of songs from BeatLeader that have a star rating
between 'stars' (default: 6) and stars+1 (exclusive). Avoid reusing the same
song+difficulty from history.
Args:
beatleader_api (SimpleBeatLeaderAPI): API interface to fetch player scores.
song_count (int, optional): Number of songs to return in the playlist. Defaults to 20.
stars (float, optional): Starting range of star rating (x). Only songs with
x <= star < x+1 are included. Defaults to 6.0.
Returns:
List[Dict[str, Any]]: A playlist list of dictionary items suitable for building a playlist
(along with a formatted playlist identifier).
"""
player_id = prompt_for_player_id()
history = load_history()
history.setdefault('beatleader_oldscores_stars', {})
history.setdefault('playlist_counts', {})
# Get the current count for the star-based strategy and increment it
count_key = 'beatleader_oldscores_stars'
current_count = history['playlist_counts'].get(count_key, 0)
new_count = current_count + 1
history['playlist_counts'][count_key] = new_count
# Fetch player scores
scores_data = beatleader_api.get_player_scores(player_id).get('data', [])
if not scores_data:
logging.warning(f"No scores found for player ID {player_id} on BeatLeader.")
return [], ""
logging.debug(f"Found {len(scores_data)} total scores for player ID {player_id} on BeatLeader.")
# Sort scores by epochTime in ascending order (if you want "oldest" style, similar to oldscores).
# Remove this sort if you prefer a different ordering.
scores_data.sort(key=lambda x: x.get('timepost', 0))
playlist_data = []
current_time = datetime.now(timezone.utc)
for score_entry in scores_data:
if len(playlist_data) >= song_count:
break # Stop if we've reached the desired number of songs
leaderboard = score_entry.get('leaderboard', {})
difficulty_data = leaderboard.get('difficulty', {})
star_rating = difficulty_data.get('stars')
song_hash = leaderboard.get('song', {}).get('hash')
difficulty_raw = difficulty_data.get('value')
game_mode = difficulty_data.get('modeName', 'Standard')
epoch_time = score_entry.get('timepost')
# Skip if we have missing data
if not song_hash or not difficulty_raw or star_rating is None:
continue
# Filter by star rating range [stars, stars + 1)
if not (stars <= star_rating < stars + 1):
continue
# Normalize difficulty name
difficulty = normalize_difficulty_name(difficulty_raw)
# Avoid reusing song+difficulty
if (song_hash in history['beatleader_oldscores_stars'] and
difficulty in history['beatleader_oldscores_stars'][song_hash]):
logging.debug(f"Skipping song {song_hash} (difficulty {difficulty}) as it's in history.")
continue
# Calculate how long ago the score was set
try:
time_set = datetime.fromtimestamp(epoch_time, tz=timezone.utc)
except (ValueError, OSError) as e:
logging.error(f"Invalid epochTime for this score: {e}")
continue
time_difference = current_time - time_set
time_ago = format_time_ago(time_difference)
# Format the song data for PlaylistBuilder
song_dict = {
'hash': song_hash,
'difficulties': [
{
'name': difficulty,
'characteristic': game_mode
}
]
}
# Add to playlist
playlist_data.append(song_dict)
logging.debug(
f"Selected song for playlist: Hash={song_hash}, Difficulty={difficulty}, "
f"Stars={star_rating:.2f}. Last played {time_ago} ago."
)
# Update history
history['beatleader_oldscores_stars'].setdefault(song_hash, []).append(difficulty)
if len(playlist_data) >= song_count:
break
if not playlist_data:
logging.info("No new songs found to add to the playlist based on history for BeatLeader (star-based).")
else:
for song in playlist_data:
song_hash = song['hash']
diff_name = song['difficulties'][0]['name']
logging.info(f"Song added: Hash={song_hash}, Difficulty={diff_name}.")
logging.info(f"Total songs added to playlist from BeatLeader (star-based): {len(playlist_data)}")
save_history(history)
return playlist_data, f"beatleader_oldscores_stars-{new_count:02d}"
def playlist_strategy_beatleader_oldscores(
api: BeatLeaderAPI,
song_count: int = 20
) -> List[Dict[str, Any]]:
player_id = prompt_for_player_id()
history = load_history()
history.setdefault('beatleader_oldscores', {})
history.setdefault('playlist_counts', {})
# Get the current count for BeatLeader old scores and increment it
count_key = 'beatleader_oldscores'
current_count = history['playlist_counts'].get(count_key, 0)
new_count = current_count + 1
history['playlist_counts'][count_key] = new_count
scores_data = api.get_player_scores(player_id)
all_scores = scores_data.get('playerScores', [])
if not all_scores:
logging.warning(f"No scores found for player ID {player_id} on BeatLeader.")
return [], ""
logging.debug(f"Found {len(all_scores)} scores for player ID {player_id} on BeatLeader.")
# Sort scores by epochTime in ascending order (oldest first)
all_scores.sort(key=lambda x: x.get('score', {}).get('epochTime', 0))
playlist_data = []
current_time = datetime.now(timezone.utc)
for score_entry in all_scores:
if len(playlist_data) >= song_count:
break # Stop if we've reached the desired number of songs
score = score_entry.get('score', {})
leaderboard = score_entry.get('leaderboard', {})
song_hash = leaderboard.get('songHash')
difficulty_raw = int(leaderboard.get('difficulty', ''))
game_mode = leaderboard.get('modeName', 'Standard')
epoch_time = score.get('epochTime')
if not song_hash or not difficulty_raw or not epoch_time:
logging.debug(f"Skipping score due to missing song_hash or difficulty_raw: {score_entry}")
continue
difficulty = normalize_difficulty_name(difficulty_raw)
# avoid reusing song+difficulty
if song_hash in history['beatleader_oldscores'] and difficulty in history['beatleader_oldscores'][song_hash]:
logging.debug(f"Skipping song {song_hash} with difficulty {difficulty} as it's in history.")
continue # Skip if already used
# Calculate time ago
try:
time_set = datetime.fromtimestamp(epoch_time, tz=timezone.utc)
except (ValueError, OSError) as e:
logging.error(f"Invalid epochTime for score ID {score.get('id')}: {e}")
continue
time_difference = current_time - time_set
time_ago = format_time_ago(time_difference)
# Format the song data for PlaylistBuilder
song_dict = {
'hash': song_hash,
'difficulties': [
{
'name': difficulty,
'characteristic': game_mode
}
]
}
# Add the song to the playlist
playlist_data.append(song_dict)
logging.debug(f"Selected song for playlist: Hash={song_hash}, Difficulty={difficulty}. Last played {time_ago} ago.")
# Update history
history['beatleader_oldscores'].setdefault(song_hash, []).append(difficulty)
# Log the final playlist
if not playlist_data:
logging.info("No new songs found to add to the playlist based on history for BeatLeader.")
else:
for song in playlist_data:
song_hash = song['hash']
difficulty = song['difficulties'][0]['name']
logging.info(f"Song added: Hash={song_hash}, Difficulty={difficulty}.")
logging.info(f"Total songs added to playlist from BeatLeader: {len(playlist_data)}")
save_history(history)
return playlist_data, f"beatleader_oldscores-{new_count:02d}"