196 lines
5.9 KiB
Python
196 lines
5.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Command-line interface for Discord Data Collector.
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Add src to path
|
|
sys.path.append(str(Path(__file__).parent))
|
|
|
|
from src.config import Config
|
|
from src.database import MongoDatabase
|
|
from src.client import DiscordDataClient
|
|
|
|
|
|
async def export_data(format_type: str, output_path: str = None):
|
|
"""Export collected data."""
|
|
config = Config()
|
|
database = MongoDatabase()
|
|
await database.initialize()
|
|
|
|
if output_path is None:
|
|
from datetime import datetime
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
output_path = f"data/export_{timestamp}.{format_type}"
|
|
|
|
if format_type == "csv":
|
|
await database.export_to_csv(output_path)
|
|
print(f"Data exported to {output_path}")
|
|
else:
|
|
print(f"Unsupported format: {format_type}")
|
|
|
|
|
|
async def show_stats():
|
|
"""Show database statistics."""
|
|
config = Config()
|
|
database = MongoDatabase()
|
|
await database.initialize()
|
|
|
|
stats = await database.get_statistics()
|
|
|
|
print("\n=== Database Statistics ===")
|
|
print(f"Total users: {stats['total_users']}")
|
|
print(f"Total servers: {stats['total_servers']}")
|
|
print(f"Database size: {stats['database_size']} bytes")
|
|
|
|
if stats['most_active_servers']:
|
|
print("\nMost active servers:")
|
|
for server_id, user_count in stats['most_active_servers'][:5]:
|
|
print(f" Server {server_id}: {user_count} users")
|
|
|
|
|
|
async def search_user(query: str):
|
|
"""Search for users."""
|
|
config = Config()
|
|
database = MongoDatabase()
|
|
await database.initialize()
|
|
|
|
all_users = await database.get_all_users()
|
|
|
|
# Search by username or user ID
|
|
results = []
|
|
for user in all_users:
|
|
if (query.lower() in user.username.lower() or
|
|
query.lower() in (user.display_name or "").lower() or
|
|
query == str(user.user_id)):
|
|
results.append(user)
|
|
|
|
if not results:
|
|
print("No users found matching the query.")
|
|
return
|
|
|
|
print(f"\n=== Found {len(results)} users ===")
|
|
for user in results[:10]: # Show first 10 results
|
|
print(f"{user.username}#{user.discriminator} (ID: {user.user_id})")
|
|
if user.display_name:
|
|
print(f" Display name: {user.display_name}")
|
|
if user.bio:
|
|
print(f" Bio: {user.bio[:100]}...")
|
|
print(f" Servers: {len(user.servers)}")
|
|
print(f" Last updated: {user.updated_at}")
|
|
print()
|
|
|
|
|
|
async def backup_database():
|
|
"""Create a manual backup of the database."""
|
|
config = Config()
|
|
database = MongoDatabase()
|
|
await database.initialize()
|
|
|
|
from datetime import datetime
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
backup_path = f"data/backups/manual_backup_{timestamp}.json"
|
|
|
|
# Export all data to JSON for backup
|
|
users = await database.get_all_users()
|
|
backup_data = [user.to_dict() for user in users]
|
|
|
|
# Ensure backup directory exists
|
|
Path("data/backups").mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(backup_path, 'w', encoding='utf-8') as f:
|
|
json.dump(backup_data, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"Database backed up to {backup_path}")
|
|
await database.close()
|
|
|
|
|
|
async def cleanup_data():
|
|
"""Clean up old data and backups."""
|
|
config = Config()
|
|
database = MongoDatabase()
|
|
await database.initialize()
|
|
|
|
await database.cleanup_old_backups(max_backups=5)
|
|
print("Cleanup completed")
|
|
await database.close()
|
|
|
|
|
|
async def test_connection():
|
|
"""Test Discord connection."""
|
|
try:
|
|
config = Config()
|
|
database = MongoDatabase()
|
|
await database.initialize()
|
|
client = DiscordDataClient(config, database)
|
|
|
|
print("Testing Discord connection...")
|
|
|
|
# This will test the connection without starting the full bot
|
|
await client.login(config.discord_token)
|
|
user_info = client.user
|
|
|
|
print(f"✓ Successfully connected as {user_info.name}#{user_info.discriminator}")
|
|
print(f"✓ User ID: {user_info.id}")
|
|
|
|
await client.close()
|
|
|
|
except Exception as e:
|
|
print(f"✗ Connection failed: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
def main():
|
|
"""Main CLI entry point."""
|
|
parser = argparse.ArgumentParser(description="Discord Data Collector CLI")
|
|
subparsers = parser.add_subparsers(dest="command", help="Available commands")
|
|
|
|
# Export command
|
|
export_parser = subparsers.add_parser("export", help="Export collected data")
|
|
export_parser.add_argument("format", choices=["csv"], help="Export format")
|
|
export_parser.add_argument("-o", "--output", help="Output file path")
|
|
|
|
# Stats command
|
|
subparsers.add_parser("stats", help="Show database statistics")
|
|
|
|
# Search command
|
|
search_parser = subparsers.add_parser("search", help="Search for users")
|
|
search_parser.add_argument("query", help="Search query (username or user ID)")
|
|
|
|
# Backup command
|
|
subparsers.add_parser("backup", help="Create manual database backup")
|
|
|
|
# Cleanup command
|
|
subparsers.add_parser("cleanup", help="Clean up old data and backups")
|
|
|
|
# Test command
|
|
subparsers.add_parser("test", help="Test Discord connection")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.command:
|
|
parser.print_help()
|
|
return
|
|
|
|
# Run the appropriate command
|
|
if args.command == "export":
|
|
asyncio.run(export_data(args.format, args.output))
|
|
elif args.command == "stats":
|
|
asyncio.run(show_stats())
|
|
elif args.command == "search":
|
|
asyncio.run(search_user(args.query))
|
|
elif args.command == "backup":
|
|
asyncio.run(backup_database())
|
|
elif args.command == "cleanup":
|
|
asyncio.run(cleanup_data())
|
|
elif args.command == "test":
|
|
asyncio.run(test_connection())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |