From 7a16f33addf02e0246a4e58148eb31ad649d8adc Mon Sep 17 00:00:00 2001 From: Xargana Date: Mon, 14 Jul 2025 00:28:45 +0300 Subject: [PATCH] she mongo my db till i 429 Too many requests --- cli.py | 34 +++-- data/users.json | 1 + main.py | 6 +- requirements.txt | 2 +- src/client.py | 4 +- src/database.py | 380 ++++++++++++++++++++++++++++++++++++++++++++++- 6 files changed, 407 insertions(+), 20 deletions(-) create mode 100644 data/users.json diff --git a/cli.py b/cli.py index 1b1bf46..ff1c2db 100644 --- a/cli.py +++ b/cli.py @@ -13,14 +13,15 @@ from pathlib import Path sys.path.append(str(Path(__file__).parent)) from src.config import Config -from src.database import JSONDatabase +from src.database import MongoDatabase from src.client import DiscordDataClient async def export_data(format_type: str, output_path: str = None): """Export collected data.""" config = Config() - database = JSONDatabase(config.database_path) + database = MongoDatabase() + await database.initialize() if output_path is None: from datetime import datetime @@ -37,7 +38,8 @@ async def export_data(format_type: str, output_path: str = None): async def show_stats(): """Show database statistics.""" config = Config() - database = JSONDatabase(config.database_path) + database = MongoDatabase() + await database.initialize() stats = await database.get_statistics() @@ -55,7 +57,8 @@ async def show_stats(): async def search_user(query: str): """Search for users.""" config = Config() - database = JSONDatabase(config.database_path) + database = MongoDatabase() + await database.initialize() all_users = await database.get_all_users() @@ -86,33 +89,44 @@ async def search_user(query: str): async def backup_database(): """Create a manual backup of the database.""" config = Config() - database = JSONDatabase(config.database_path) + database = MongoDatabase() + await database.initialize() from datetime import datetime timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = f"data/backups/manual_backup_{timestamp}.json" - # Copy current database - import shutil - shutil.copy2(database.database_path, backup_path) + # Export all data to JSON for backup + users = await database.get_all_users() + backup_data = [user.to_dict() for user in users] + + # Ensure backup directory exists + Path("data/backups").mkdir(parents=True, exist_ok=True) + + with open(backup_path, 'w', encoding='utf-8') as f: + json.dump(backup_data, f, indent=2, ensure_ascii=False) print(f"Database backed up to {backup_path}") + await database.close() async def cleanup_data(): """Clean up old data and backups.""" config = Config() - database = JSONDatabase(config.database_path) + database = MongoDatabase() + await database.initialize() await database.cleanup_old_backups(max_backups=5) print("Cleanup completed") + await database.close() async def test_connection(): """Test Discord connection.""" try: config = Config() - database = JSONDatabase(config.database_path) + database = MongoDatabase() + await database.initialize() client = DiscordDataClient(config, database) print("Testing Discord connection...") diff --git a/data/users.json b/data/users.json new file mode 100644 index 0000000..9e26dfe --- /dev/null +++ b/data/users.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/main.py b/main.py index f7959c8..88602bb 100644 --- a/main.py +++ b/main.py @@ -20,7 +20,7 @@ sys.path.insert(0, str(Path(__file__).parent)) try: from src.client import DiscordDataClient from src.config import Config - from src.database import JSONDatabase + from src.database import MongoDatabase from src.logger import setup_logger except ImportError as e: print(f"❌ Import error: {e}") @@ -42,8 +42,8 @@ async def main(): logger.info("Starting Discord Data Collector") # Initialize database - raise NotImplementedError("put shit in here i cba to install mariadb locally") - database = MariaDBDatabase(config.database_path) + database = MongoDatabase() + await database.initialize() # Initialize Discord client client = DiscordDataClient(config, database) diff --git a/requirements.txt b/requirements.txt index 308085c..ab41cd5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,7 +7,7 @@ discord.py-self>=2.0.0 python-dotenv>=1.0.0 toml>=0.10.2 -# For future MongoDB integration +# MongoDB integration pymongo>=4.0.0 # Logging diff --git a/src/client.py b/src/client.py index c73352a..f57f4dc 100644 --- a/src/client.py +++ b/src/client.py @@ -14,14 +14,14 @@ except ImportError: raise ImportError("discord.py-self is required. Install with: pip install discord.py-self") from .config import Config -from .database import JSONDatabase, UserData +from .database import MongoDatabase, UserData from .rate_limiter import RateLimiter class DiscordDataClient(discord.Client): """Custom Discord client for collecting user data.""" - def __init__(self, config: Config, database: JSONDatabase): + def __init__(self, config: Config, database: MongoDatabase): super().__init__() diff --git a/src/database.py b/src/database.py index a86d1c6..5255352 100644 --- a/src/database.py +++ b/src/database.py @@ -1,14 +1,31 @@ """ -MariaDB database manager for Discord user data storage. +Database managers for Discord user data storage. """ import asyncio +import json +import shutil from datetime import datetime from typing import Dict, List, Optional, Any -from dataclasses import dataclass +from dataclasses import dataclass, asdict +from pathlib import Path import logging -from asyncmy import connect, Connection, Cursor -from asyncmy.errors import MySQLError + +# MongoDB support +try: + from pymongo import MongoClient + from pymongo.errors import PyMongoError + MONGODB_AVAILABLE = True +except ImportError: + MONGODB_AVAILABLE = False + +# Optional MariaDB support +try: + from asyncmy import connect, Connection + from asyncmy.errors import MySQLError + MARIADB_AVAILABLE = True +except ImportError: + MARIADB_AVAILABLE = False @dataclass @@ -35,6 +52,359 @@ class UserData: self.created_at = current_time self.updated_at = current_time + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for JSON serialization.""" + data = asdict(self) + # Convert datetime objects to ISO strings + if self.created_at: + data['created_at'] = self.created_at.isoformat() + if self.updated_at: + data['updated_at'] = self.updated_at.isoformat() + return data + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'UserData': + """Create UserData from dictionary.""" + # Convert ISO strings back to datetime objects + if 'created_at' in data and isinstance(data['created_at'], str): + data['created_at'] = datetime.fromisoformat(data['created_at']) + if 'updated_at' in data and isinstance(data['updated_at'], str): + data['updated_at'] = datetime.fromisoformat(data['updated_at']) + return cls(**data) + + +class MongoDatabase: + """MongoDB-based database for storing Discord user data.""" + + def __init__(self, connection_string: str = "mongodb://localhost:27017", database_name: str = "discorddb"): + """Initialize the MongoDB connection.""" + if not MONGODB_AVAILABLE: + raise ImportError("pymongo is required for MongoDB support. Install with: pip install pymongo") + + self.connection_string = connection_string + self.database_name = database_name + self.logger = logging.getLogger(__name__) + self.client = None + self.db = None + self.users_collection = None + + async def initialize(self): + """Initialize database connection and ensure collections exist.""" + try: + self.client = MongoClient(self.connection_string) + self.db = self.client[self.database_name] + self.users_collection = self.db.users + + # Create indexes for better performance + self.users_collection.create_index("user_id", unique=True) + self.users_collection.create_index("username") + self.users_collection.create_index("servers") + + # Test connection + self.client.admin.command('ping') + self.logger.info(f"MongoDB connection established to {self.database_name}") + except Exception as e: + self.logger.error(f"MongoDB connection failed: {e}") + raise + + async def get_user(self, user_id: int) -> Optional[UserData]: + """Get user data by ID.""" + try: + doc = self.users_collection.find_one({"user_id": user_id}) + if doc: + # Remove MongoDB's _id field + doc.pop('_id', None) + return UserData.from_dict(doc) + return None + except Exception as e: + self.logger.error(f"Error getting user {user_id}: {e}") + return None + + async def save_user(self, user_data: UserData): + """Save or update user data.""" + try: + doc = user_data.to_dict() + self.users_collection.replace_one( + {"user_id": user_data.user_id}, + doc, + upsert=True + ) + self.logger.debug(f"Saved user {user_data.username}#{user_data.discriminator}") + except Exception as e: + self.logger.error(f"Error saving user {user_data.user_id}: {e}") + raise + + async def add_server_to_user(self, user_id: int, server_id: int): + """Add a server to user's server list.""" + try: + self.users_collection.update_one( + {"user_id": user_id}, + {"$addToSet": {"servers": server_id}} + ) + except Exception as e: + self.logger.error(f"Error adding server {server_id} to user {user_id}: {e}") + + async def get_all_users(self) -> List[UserData]: + """Get all users from the database.""" + try: + users = [] + for doc in self.users_collection.find(): + doc.pop('_id', None) + users.append(UserData.from_dict(doc)) + return users + except Exception as e: + self.logger.error(f"Error getting all users: {e}") + return [] + + async def get_users_by_server(self, server_id: int) -> List[UserData]: + """Get all users that are members of a specific server.""" + try: + users = [] + for doc in self.users_collection.find({"servers": server_id}): + doc.pop('_id', None) + users.append(UserData.from_dict(doc)) + return users + except Exception as e: + self.logger.error(f"Error getting users by server {server_id}: {e}") + return [] + + async def get_user_count(self) -> int: + """Get total number of users in database.""" + try: + return self.users_collection.count_documents({}) + except Exception as e: + self.logger.error(f"Error getting user count: {e}") + return 0 + + async def get_server_count(self) -> int: + """Get total number of unique servers.""" + try: + pipeline = [ + {"$unwind": "$servers"}, + {"$group": {"_id": "$servers"}}, + {"$count": "total"} + ] + result = list(self.users_collection.aggregate(pipeline)) + return result[0]["total"] if result else 0 + except Exception as e: + self.logger.error(f"Error getting server count: {e}") + return 0 + + async def get_statistics(self) -> Dict[str, Any]: + """Get database statistics.""" + try: + total_users = await self.get_user_count() + total_servers = await self.get_server_count() + + # Get most active servers + pipeline = [ + {"$unwind": "$servers"}, + {"$group": {"_id": "$servers", "user_count": {"$sum": 1}}}, + {"$sort": {"user_count": -1}}, + {"$limit": 10} + ] + + most_active = [] + for doc in self.users_collection.aggregate(pipeline): + most_active.append((doc["_id"], doc["user_count"])) + + # Get database size (estimate) + try: + stats = self.db.command("collStats", "users") + database_size = stats.get("size", 0) + except: + database_size = 0 + + return { + 'total_users': total_users, + 'total_servers': total_servers, + 'most_active_servers': most_active, + 'database_size': database_size + } + except Exception as e: + self.logger.error(f"Error getting statistics: {e}") + return {'total_users': 0, 'total_servers': 0, 'most_active_servers': [], 'database_size': 0} + + async def export_to_csv(self, output_path: str): + """Export data to CSV format.""" + import csv + users = await self.get_all_users() + + with open(output_path, 'w', newline='', encoding='utf-8') as csvfile: + fieldnames = ['user_id', 'username', 'discriminator', 'display_name', + 'avatar_url', 'banner_url', 'bio', 'status', 'activity', + 'servers', 'created_at', 'updated_at'] + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + + writer.writeheader() + for user in users: + row = user.to_dict() + row['servers'] = ','.join(map(str, user.servers)) + writer.writerow(row) + + async def cleanup_old_backups(self, max_backups: int = 5): + """Clean up old backup documents (placeholder for MongoDB).""" + # In MongoDB, we might implement this as removing old backup collections + # or documents with old timestamps + self.logger.info("MongoDB cleanup completed") + + async def close(self): + """Close database connection.""" + if self.client: + self.client.close() + self.logger.info("MongoDB connection closed") + + +# Keep the original JSONDatabase class for fallback +class JSONDatabase: + """JSON file-based database for storing Discord user data.""" + + def __init__(self, database_path: str = "data/users.json"): + """Initialize the JSON database.""" + self.database_path = Path(database_path) + self.backup_dir = self.database_path.parent / "backups" + self.logger = logging.getLogger(__name__) + self._lock = asyncio.Lock() + + # Ensure directories exist + self.database_path.parent.mkdir(parents=True, exist_ok=True) + self.backup_dir.mkdir(parents=True, exist_ok=True) + + # Initialize empty database if it doesn't exist + if not self.database_path.exists(): + self._save_data({}) + + async def initialize(self): + """Initialize database (no-op for JSON database).""" + self.logger.info("JSON database ready") + + def _load_data(self) -> Dict[str, Any]: + """Load data from JSON file.""" + try: + with open(self.database_path, 'r', encoding='utf-8') as f: + return json.load(f) + except (FileNotFoundError, json.JSONDecodeError) as e: + self.logger.warning(f"Could not load database: {e}, creating new one") + return {} + + def _save_data(self, data: Dict[str, Any]): + """Save data to JSON file.""" + with open(self.database_path, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + + async def get_user(self, user_id: int) -> Optional[UserData]: + """Get user data by ID.""" + async with self._lock: + data = self._load_data() + if str(user_id) in data: + return UserData.from_dict(data[str(user_id)]) + return None + + async def save_user(self, user_data: UserData): + """Save or update user data.""" + async with self._lock: + data = self._load_data() + data[str(user_data.user_id)] = user_data.to_dict() + self._save_data(data) + self.logger.debug(f"Saved user {user_data.username}#{user_data.discriminator}") + + async def add_server_to_user(self, user_id: int, server_id: int): + """Add a server to user's server list.""" + user = await self.get_user(user_id) + if user and server_id not in user.servers: + user.servers.append(server_id) + await self.save_user(user) + + async def get_all_users(self) -> List[UserData]: + """Get all users from the database.""" + async with self._lock: + data = self._load_data() + return [UserData.from_dict(user_data) for user_data in data.values()] + + async def get_users_by_server(self, server_id: int) -> List[UserData]: + """Get all users that are members of a specific server.""" + users = await self.get_all_users() + return [user for user in users if server_id in user.servers] + + async def get_user_count(self) -> int: + """Get total number of users in database.""" + data = self._load_data() + return len(data) + + async def get_server_count(self) -> int: + """Get total number of unique servers.""" + users = await self.get_all_users() + servers = set() + for user in users: + servers.update(user.servers) + return len(servers) + + async def get_statistics(self) -> Dict[str, Any]: + """Get database statistics.""" + users = await self.get_all_users() + server_counts = {} + + for user in users: + for server_id in user.servers: + server_counts[server_id] = server_counts.get(server_id, 0) + 1 + + most_active_servers = sorted( + server_counts.items(), + key=lambda x: x[1], + reverse=True + )[:10] + + return { + 'total_users': len(users), + 'total_servers': len(server_counts), + 'most_active_servers': most_active_servers, + 'database_size': self.database_path.stat().st_size if self.database_path.exists() else 0 + } + + async def export_to_csv(self, output_path: str): + """Export data to CSV format.""" + import csv + users = await self.get_all_users() + + with open(output_path, 'w', newline='', encoding='utf-8') as csvfile: + fieldnames = ['user_id', 'username', 'discriminator', 'display_name', + 'avatar_url', 'banner_url', 'bio', 'status', 'activity', + 'servers', 'created_at', 'updated_at'] + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + + writer.writeheader() + for user in users: + row = user.to_dict() + row['servers'] = ','.join(map(str, user.servers)) + writer.writerow(row) + + async def cleanup_old_backups(self, max_backups: int = 5): + """Clean up old backup files.""" + backups = list(self.backup_dir.glob("*.json")) + backups.sort(key=lambda x: x.stat().st_mtime, reverse=True) + + for backup in backups[max_backups:]: + backup.unlink() + self.logger.info(f"Removed old backup: {backup}") + + async def close(self): + """Close database (no-op for JSON database).""" + self.logger.info("JSON database closed") + + +# Database factory function +def create_database(use_mongodb: bool = True, **kwargs): + """Create appropriate database instance based on availability and preference.""" + if use_mongodb and MONGODB_AVAILABLE: + try: + return MongoDatabase(**kwargs) + except Exception as e: + logging.getLogger(__name__).warning(f"MongoDB initialization failed: {e}, falling back to JSON") + + # Fallback to JSON database + database_path = kwargs.get('database_path', 'data/users.json') + return JSONDatabase(database_path) + class MariaDBDatabase: """MariaDB-based database for storing Discord user data.""" @@ -46,6 +416,8 @@ class MariaDBDatabase: database: str, port: int = 3306): """Initialize the MariaDB connection.""" + if not MARIADB_AVAILABLE: + raise ImportError("asyncmy is required for MariaDB support. Install with: pip install asyncmy") self.db_config = { 'host': host, 'port': port,