#!/usr/bin/env python3 """ Command-line interface for Discord Data Collector. """ import argparse import asyncio import json import sys from pathlib import Path # Add src to path sys.path.append(str(Path(__file__).parent)) from src.config import Config from src.database import create_database from src.client import DiscordDataClient async def export_data(format_type: str, output_path: str = None): """Export collected data.""" config = Config() database = await create_database(mariadb_config=config.get_mariadb_config()) if output_path is None: from datetime import datetime timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_path = f"data/export_{timestamp}.{format_type}" if format_type == "csv": await database.export_to_csv(output_path) print(f"Data exported to {output_path}") else: print(f"Unsupported format: {format_type}") async def show_stats(): """Show database statistics.""" config = Config() database = await create_database(mariadb_config=config.get_mariadb_config()) stats = await database.get_statistics() print("\n=== Database Statistics ===") print(f"Total users: {stats['total_users']}") print(f"Total servers: {stats['total_servers']}") print(f"Database size: {stats['database_size']} bytes") if stats['most_active_servers']: print("\nMost active servers:") for server_id, user_count in stats['most_active_servers'][:5]: print(f" Server {server_id}: {user_count} users") async def search_user(query: str): """Search for users.""" config = Config() database = await create_database(mariadb_config=config.get_mariadb_config()) all_users = await database.get_all_users() # Search by username or user ID results = [] for user in all_users: if (query.lower() in user.username.lower() or query.lower() in (user.display_name or "").lower() or query == str(user.user_id)): results.append(user) if not results: print("No users found matching the query.") return print(f"\n=== Found {len(results)} users ===") for user in results[:10]: # Show first 10 results print(f"{user.username}#{user.discriminator} (ID: {user.user_id})") if user.display_name: print(f" Display name: {user.display_name}") if user.bio: print(f" Bio: {user.bio[:100]}{'...' if len(user.bio) > 100 else ''}") if user.status: print(f" Status: {user.status}") if user.activity: print(f" Activity: {user.activity[:50]}{'...' if len(user.activity) > 50 else ''}") # Get server names for display if user.servers: server_names = await database.get_server_names(user.servers) server_display = [] for server_id in user.servers[:5]: server_name = server_names.get(server_id, f"Unknown ({server_id})") server_display.append(server_name) more_text = "..." if len(user.servers) > 5 else "" print(f" Servers ({len(user.servers)}): {', '.join(server_display)}{more_text}") else: print(f" Servers: None") print(f" Last updated: {user.updated_at}") print() async def list_servers(): """List all servers with user counts.""" config = Config() database = await create_database(mariadb_config=config.get_mariadb_config()) # Get all users users = await database.get_all_users() # Count users per server server_counts = {} for user in users: for server_id in user.servers: server_counts[server_id] = server_counts.get(server_id, 0) + 1 if not server_counts: print("No servers found in database.") return # Sort by user count (descending) sorted_servers = sorted(server_counts.items(), key=lambda x: x[1], reverse=True) # Get server names server_ids = [server_id for server_id, _ in sorted_servers] server_names = await database.get_server_names(server_ids) print(f"\n=== Servers ({len(sorted_servers)} total) ===") for server_id, user_count in sorted_servers: server_name = server_names.get(server_id, f"Unknown Server ({server_id})") print(f"{server_name}: {user_count} users") await database.close() async def show_user_servers(user_id: str): """Show servers a user is in.""" config = Config() database = await create_database(mariadb_config=config.get_mariadb_config()) try: user_id_int = int(user_id) user = await database.get_user(user_id_int) if not user: print(f"User {user_id} not found in database.") return print(f"\n=== User: {user.username}#{user.discriminator} ===") print(f"User ID: {user.user_id}") if user.display_name: print(f"Display Name: {user.display_name}") if user.bio: print(f"Bio: {user.bio[:100]}{'...' if len(user.bio) > 100 else ''}") if user.status: print(f"Status: {user.status}") if user.activity: print(f"Activity: {user.activity}") print(f"\nServers ({len(user.servers)}):") for server_id in user.servers: print(f" - {server_id}") except ValueError: print("Invalid user ID. Please provide a numeric user ID.") finally: await database.close() async def show_server_users(server_id: str): """Show users in a server.""" config = Config() database = await create_database(mariadb_config=config.get_mariadb_config()) try: server_id_int = int(server_id) users = await database.get_users_by_server(server_id_int) if not users: print(f"No users found for server {server_id}.") return print(f"\n=== Server {server_id} ({len(users)} users) ===") # Sort users by username users.sort(key=lambda u: u.username.lower()) for user in users: status_info = "" if user.status: status_info = f" [{user.status}]" if user.activity: status_info += f" ({user.activity[:30]}{'...' if len(user.activity) > 30 else ''})" print(f"{user.username}#{user.discriminator} (ID: {user.user_id}){status_info}") if user.display_name and user.display_name != user.username: print(f" Display: {user.display_name}") if user.bio: print(f" Bio: {user.bio[:80]}{'...' if len(user.bio) > 80 else ''}") print() except ValueError: print("Invalid server ID. Please provide a numeric server ID.") finally: await database.close() async def backup_database(): """Create a manual backup of the database.""" config = Config() database = await create_database(mariadb_config=config.get_mariadb_config()) from datetime import datetime timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = f"data/backups/manual_backup_{timestamp}.json" # Export all data to JSON for backup users = await database.get_all_users() backup_data = [user.to_dict() for user in users] # Ensure backup directory exists Path("data/backups").mkdir(parents=True, exist_ok=True) with open(backup_path, 'w', encoding='utf-8') as f: json.dump(backup_data, f, indent=2, ensure_ascii=False) print(f"Database backed up to {backup_path}") await database.close() async def cleanup_data(): """Clean up old data and backups.""" config = Config() database = await create_database(mariadb_config=config.get_mariadb_config()) await database.cleanup_old_backups(max_backups=5) print("Cleanup completed") await database.close() async def test_connection(): """Test Discord connection.""" try: config = Config() database = await create_database(mariadb_config=config.get_mariadb_config()) client = DiscordDataClient(config, database) print("Testing Discord connection...") # This will test the connection without starting the full bot await client.login(config.discord_token) user_info = client.user print(f"✓ Successfully connected as {user_info.name}#{user_info.discriminator}") print(f"✓ User ID: {user_info.id}") await client.close() except Exception as e: print(f"✗ Connection failed: {e}") sys.exit(1) def main(): """Main CLI entry point.""" parser = argparse.ArgumentParser(description="Discord Data Collector CLI") subparsers = parser.add_subparsers(dest="command", help="Available commands") # Export command export_parser = subparsers.add_parser("export", help="Export collected data") export_parser.add_argument("format", choices=["csv"], help="Export format") export_parser.add_argument("-o", "--output", help="Output file path") # Stats command subparsers.add_parser("stats", help="Show database statistics") # Search command search_parser = subparsers.add_parser("search", help="Search for users") search_parser.add_argument("query", help="Search query (username or user ID)") # Server commands servers_parser = subparsers.add_parser("servers", help="List all servers with user counts") user_servers_parser = subparsers.add_parser("user-servers", help="Show servers a user is in") user_servers_parser.add_argument("user_id", help="User ID to lookup") server_users_parser = subparsers.add_parser("server-users", help="Show users in a server") server_users_parser.add_argument("server_id", help="Server ID to lookup") # Backup command subparsers.add_parser("backup", help="Create manual database backup") # Cleanup command subparsers.add_parser("cleanup", help="Clean up old data and backups") # Test command subparsers.add_parser("test", help="Test Discord connection") args = parser.parse_args() if not args.command: parser.print_help() return # Run the appropriate command if args.command == "export": asyncio.run(export_data(args.format, args.output)) elif args.command == "stats": asyncio.run(show_stats()) elif args.command == "search": asyncio.run(search_user(args.query)) elif args.command == "servers": asyncio.run(list_servers()) elif args.command == "user-servers": asyncio.run(show_user_servers(args.user_id)) elif args.command == "server-users": asyncio.run(show_server_users(args.server_id)) elif args.command == "backup": asyncio.run(backup_database()) elif args.command == "cleanup": asyncio.run(cleanup_data()) elif args.command == "test": asyncio.run(test_connection()) if __name__ == "__main__": main()