bitcoiner.social/ansible/playbooks/host_tasks/gabite.bitcoiner.social/files/scripts/1984.py

181 lines
5.9 KiB
Python

import subprocess
import json
import csv
import logging
import time
from datetime import datetime, timedelta
logging.basicConfig(
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.INFO
)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter('%(levelname)s: %(message)s')
console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
logger = logging.getLogger(__name__)
def get_reports(days_since: int = 1):
unhandled_reports = 0
try:
nostr_filter = json.dumps({
"kinds":[1984],
"since": get_timestamp(days_since)
})
logging.info(nostr_filter)
result = subprocess.run(
["strfry", "scan", nostr_filter],
capture_output=True, text=True, check=True
)
strfry_reports = result.stdout.splitlines()
logging.info(f"{len(strfry_reports)} reports have been received.")
report_data = {}
for strfry_report in strfry_reports:
report = json.loads(strfry_report)
pubkey = None
event = None
try:
for tag in report.get('tags', []):
if tag[0] == 'p':
pubkey = tag[1]
if tag[0] == 'e':
event = (tag[1], tag[2])
if pubkey and event:
if pubkey not in report_data:
report_data[pubkey] = {'count': 0, 'events': list()}
report_data[pubkey]['count'] += 1
report_data[pubkey]['events'].append(event)
elif pubkey:
if pubkey not in report_data:
report_data[pubkey] = {'count': 0, 'events': list()}
report_data[pubkey]['count'] += 1
else:
# TODO: Handle NIP-69, maybe
logging.warning(f"Un-handled report type: {report}")
unhandled_reports += 1
except:
logging.error(f"Invalid report: {report}")
unhandled_reports += 1
logging.warning(f"Unhandled reports: {unhandled_reports}")
return report_data
except subprocess.CalledProcessError as e:
logging.error("Error Output:", e.stderr)
def display_user_posts(user_id):
""" Display posts of a given user """
try:
nostr_filter = json.dumps({"authors":[user_id],"limit":20})
result = subprocess.run(
["strfry", "scan", nostr_filter],
capture_output=True, text=True, check=True
)
try:
posts = json.loads(result.stdout)
for post in posts:
if post.get("kind") == 1:
print(post.get("content", "")[:240])
except:
print(result.stdout)
except subprocess.CalledProcessError as e:
#print("Error occurred:", e)
#print("Standard Output:", e.stdout)
print("Error Output:", e.stderr)
def delete_user_content(user_id):
""" Delete content of a given user """
try:
nostr_filter = json.dumps({"authors":[user_id]})
subprocess.run(
["strfry", "delete", "--filter", nostr_filter],
check=True
)
except subprocess.CalledProcessError as e:
print("Error deleting content:", e)
def check_user(user_id: str, report_count):
display_user_posts(user_id)
print(f"Displaying posts for user: {user_id}")
# ask to ban the user
answer = input(f"Do you want to ban this user that was reported {report_count} times? (y/N): ")
if answer.lower() == 'y':
print(f"Banning user: {user_id}")
ban_user(user_id)
else:
print(f"Skipping ban for user: {user_id}")
return
answer = input("Do you want to delete this user's content? (y/N): ")
if answer.lower() == 'y':
print(f"Deleting content for user: {user_id}")
delete_user_content(user_id)
else:
print(f"Skipping deletion for user: {user_id}")
def ban_user(user_id):
"""Ban the user by writing the user_id to a file"""
with open("banned_users.txt", "a") as f:
f.write(f"{user_id}\n")
def is_user_banned(user_id):
"""Check if the user is banned"""
with open("banned_users.txt", "r") as f:
banned_users = f.read().splitlines()
return user_id in banned_users
def read_csv_to_list(file_path) -> list:
"""
Reads a CSV file and returns a list of tuples.
Each tuple contains the report count and reported public key from each row.
:param file_path: Path to the CSV file.
:return: List of tuples (report_count, reported_pubkey).
"""
data = []
with open(file_path, 'r') as file:
reader = csv.reader(file)
for row in reader:
# Convert the first element (count) to an integer
report_count = int(row[0])
reported_pubkey = row[1]
data.append((report_count, reported_pubkey))
logging.info(f"{len(data)} npubs have been reported during this period.")
return data
def get_timestamp(days_ago: int) -> int:
"""
Returns a Unix timestamp for a given number of days ago.
:param days_ago: Number of days ago.
:return: Unix timestamp.
"""
time_since = datetime.now() - timedelta(days=days_ago)
return int(time.mktime(time_since.timetuple()))
def main():
unsorted_reports = get_reports(7)
sorted_reports = sorted(unsorted_reports.items(), key=lambda x: x[1]['count'], reverse=True)
reports = dict(sorted_reports)
for reported_npub in reports:
report = reports[reported_npub]
if report['count'] > 5:
for event in report['events']:
print(event)
check_user(reported_npub, report['count'])
if __name__ == "__main__":
main()