import subprocess import json import csv import logging import time from datetime import datetime, timedelta logging.basicConfig( format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', level=logging.INFO ) console = logging.StreamHandler() console.setLevel(logging.INFO) formatter = logging.Formatter('%(levelname)s: %(message)s') console.setFormatter(formatter) logging.getLogger('').addHandler(console) logger = logging.getLogger(__name__) def get_reports(days_since: int = 1): unhandled_reports = 0 try: nostr_filter = json.dumps({ "kinds":[1984], "since": get_timestamp(days_since) }) logging.info(nostr_filter) result = subprocess.run( ["strfry", "scan", nostr_filter], capture_output=True, text=True, check=True ) strfry_reports = result.stdout.splitlines() logging.info(f"{len(strfry_reports)} reports have been received.") report_data = {} for strfry_report in strfry_reports: report = json.loads(strfry_report) pubkey = None event = None try: for tag in report.get('tags', []): if tag[0] == 'p': pubkey = tag[1] if tag[0] == 'e': event = (tag[1], tag[2]) if pubkey and event: if pubkey not in report_data: report_data[pubkey] = {'count': 0, 'events': list()} report_data[pubkey]['count'] += 1 report_data[pubkey]['events'].append(event) elif pubkey: if pubkey not in report_data: report_data[pubkey] = {'count': 0, 'events': list()} report_data[pubkey]['count'] += 1 else: # TODO: Handle NIP-69, maybe logging.warning(f"Un-handled report type: {report}") unhandled_reports += 1 except: logging.error(f"Invalid report: {report}") unhandled_reports += 1 logging.warning(f"Unhandled reports: {unhandled_reports}") return report_data except subprocess.CalledProcessError as e: logging.error("Error Output:", e.stderr) def display_user_posts(user_id): """ Display posts of a given user """ try: nostr_filter = json.dumps({"authors":[user_id],"limit":20}) result = subprocess.run( ["strfry", "scan", nostr_filter], capture_output=True, text=True, check=True ) try: posts = json.loads(result.stdout) for post in posts: if post.get("kind") == 1: print(post.get("content", "")[:240]) except: print(result.stdout) except subprocess.CalledProcessError as e: #print("Error occurred:", e) #print("Standard Output:", e.stdout) print("Error Output:", e.stderr) def delete_user_content(user_id): """ Delete content of a given user """ try: nostr_filter = json.dumps({"authors":[user_id]}) subprocess.run( ["strfry", "delete", "--filter", nostr_filter], check=True ) except subprocess.CalledProcessError as e: print("Error deleting content:", e) def check_user(user_id: str, report_count): display_user_posts(user_id) print(f"Displaying posts for user: {user_id}") # ask to ban the user answer = input(f"Do you want to ban this user that was reported {report_count} times? (y/N): ") if answer.lower() == 'y': print(f"Banning user: {user_id}") ban_user(user_id) else: print(f"Skipping ban for user: {user_id}") return answer = input("Do you want to delete this user's content? (y/N): ") if answer.lower() == 'y': print(f"Deleting content for user: {user_id}") delete_user_content(user_id) else: print(f"Skipping deletion for user: {user_id}") def ban_user(user_id): """Ban the user by writing the user_id to a file""" with open("banned_users.txt", "a") as f: f.write(f"{user_id}\n") def is_user_banned(user_id): """Check if the user is banned""" with open("banned_users.txt", "r") as f: banned_users = f.read().splitlines() return user_id in banned_users def read_csv_to_list(file_path) -> list: """ Reads a CSV file and returns a list of tuples. Each tuple contains the report count and reported public key from each row. :param file_path: Path to the CSV file. :return: List of tuples (report_count, reported_pubkey). """ data = [] with open(file_path, 'r') as file: reader = csv.reader(file) for row in reader: # Convert the first element (count) to an integer report_count = int(row[0]) reported_pubkey = row[1] data.append((report_count, reported_pubkey)) logging.info(f"{len(data)} npubs have been reported during this period.") return data def get_timestamp(days_ago: int) -> int: """ Returns a Unix timestamp for a given number of days ago. :param days_ago: Number of days ago. :return: Unix timestamp. """ time_since = datetime.now() - timedelta(days=days_ago) return int(time.mktime(time_since.timetuple())) def main(): unsorted_reports = get_reports(7) sorted_reports = sorted(unsorted_reports.items(), key=lambda x: x[1]['count'], reverse=True) reports = dict(sorted_reports) for reported_npub in reports: report = reports[reported_npub] if report['count'] > 5: for event in report['events']: print(event) check_user(reported_npub, report['count']) if __name__ == "__main__": main()