Initial commit v3
This commit is contained in:
parent
8edda894db
commit
a9bcce85d6
24
main.py
24
main.py
|
@ -9,10 +9,26 @@ import logging
|
|||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from src.client import DiscordDataClient
|
||||
from src.config import Config
|
||||
from src.database import JSONDatabase
|
||||
from src.logger import setup_logger
|
||||
# Check if we're in the right directory
|
||||
if not Path("src").exists():
|
||||
print("❌ Error: 'src' directory not found. Please run from the project root directory.")
|
||||
sys.exit(1)
|
||||
|
||||
# Add src to Python path
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
|
||||
try:
|
||||
from src.client import DiscordDataClient
|
||||
from src.config import Config
|
||||
from src.database import JSONDatabase
|
||||
from src.logger import setup_logger
|
||||
except ImportError as e:
|
||||
print(f"❌ Import error: {e}")
|
||||
print("\n🔧 To fix this, try:")
|
||||
print("1. Run: python setup.py")
|
||||
print("2. Or run: python test_imports.py")
|
||||
print("3. Or install dependencies: pip install discord.py-self python-dotenv toml colorlog")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
async def main():
|
||||
|
|
|
@ -7,14 +7,14 @@ discord.py-self>=2.0.0
|
|||
python-dotenv>=1.0.0
|
||||
toml>=0.10.2
|
||||
|
||||
# Database (for future MongoDB integration)
|
||||
# For future MongoDB integration
|
||||
pymongo>=4.0.0
|
||||
|
||||
# Async utilities
|
||||
asyncio-throttle>=1.0.0
|
||||
|
||||
# Data processing
|
||||
pandas>=1.5.0
|
||||
|
||||
# Logging
|
||||
colorlog>=6.0.0
|
||||
colorlog>=6.0.0
|
||||
|
||||
# Standard library backports (if needed)
|
||||
typing-extensions>=4.0.0
|
||||
|
||||
# Optional: For better async performance
|
||||
uvloop>=0.17.0; sys_platform != "win32"
|
165
setup.py
Normal file
165
setup.py
Normal file
|
@ -0,0 +1,165 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Setup script for Discord Data Collector
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def check_python_version():
|
||||
"""Check if Python version is compatible."""
|
||||
if sys.version_info < (3, 8):
|
||||
print("❌ Python 3.8 or higher is required")
|
||||
sys.exit(1)
|
||||
print(f"✅ Python {sys.version_info.major}.{sys.version_info.minor} detected")
|
||||
|
||||
|
||||
def install_dependencies():
|
||||
"""Install required dependencies."""
|
||||
print("📦 Installing dependencies...")
|
||||
|
||||
dependencies = [
|
||||
"discord.py-self>=2.0.0",
|
||||
"python-dotenv>=1.0.0",
|
||||
"toml>=0.10.2",
|
||||
"colorlog>=6.0.0"
|
||||
]
|
||||
|
||||
for dep in dependencies:
|
||||
try:
|
||||
print(f"Installing {dep}...")
|
||||
subprocess.check_call([sys.executable, "-m", "pip", "install", dep])
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"❌ Failed to install {dep}: {e}")
|
||||
return False
|
||||
|
||||
print("✅ All dependencies installed successfully")
|
||||
return True
|
||||
|
||||
|
||||
def create_directories():
|
||||
"""Create necessary directories."""
|
||||
directories = [
|
||||
"data",
|
||||
"data/backups",
|
||||
"logs",
|
||||
"src"
|
||||
]
|
||||
|
||||
for directory in directories:
|
||||
Path(directory).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
print("✅ Directories created")
|
||||
|
||||
|
||||
def create_config_files():
|
||||
"""Create configuration files if they don't exist."""
|
||||
|
||||
# Create .env file
|
||||
env_file = Path(".env")
|
||||
if not env_file.exists():
|
||||
env_content = """# Discord Data Collector Environment Variables
|
||||
# Add your Discord user token here
|
||||
DISCORD_TOKEN=your_discord_user_token_here
|
||||
"""
|
||||
with open(env_file, "w") as f:
|
||||
f.write(env_content)
|
||||
print("✅ Created .env file")
|
||||
|
||||
# Create config.toml file
|
||||
config_file = Path("config.toml")
|
||||
if not config_file.exists():
|
||||
config_content = """# Discord Data Collector Configuration
|
||||
|
||||
[database]
|
||||
path = "data/users.json"
|
||||
backup_interval = 3600
|
||||
|
||||
[collection]
|
||||
profile_pictures = true
|
||||
bio = true
|
||||
status = true
|
||||
server_membership = true
|
||||
|
||||
[rate_limiting]
|
||||
request_delay = 1.0
|
||||
max_requests_per_minute = 30
|
||||
|
||||
[monitoring]
|
||||
target_servers = []
|
||||
monitor_all_servers = true
|
||||
|
||||
[logging]
|
||||
level = "INFO"
|
||||
file = "logs/collector.log"
|
||||
"""
|
||||
with open(config_file, "w") as f:
|
||||
f.write(config_content)
|
||||
print("✅ Created config.toml file")
|
||||
|
||||
|
||||
def test_imports():
|
||||
"""Test if all imports work correctly."""
|
||||
print("🧪 Testing imports...")
|
||||
|
||||
try:
|
||||
import discord
|
||||
print("✅ discord.py-self imported successfully")
|
||||
except ImportError as e:
|
||||
print(f"❌ Failed to import discord.py-self: {e}")
|
||||
return False
|
||||
|
||||
try:
|
||||
import toml
|
||||
print("✅ toml imported successfully")
|
||||
except ImportError as e:
|
||||
print(f"❌ Failed to import toml: {e}")
|
||||
return False
|
||||
|
||||
try:
|
||||
from dotenv import load_dotenv
|
||||
print("✅ python-dotenv imported successfully")
|
||||
except ImportError as e:
|
||||
print(f"❌ Failed to import python-dotenv: {e}")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def main():
|
||||
"""Main setup function."""
|
||||
print("🚀 Discord Data Collector Setup")
|
||||
print("=" * 40)
|
||||
|
||||
# Check Python version
|
||||
check_python_version()
|
||||
|
||||
# Create directories
|
||||
create_directories()
|
||||
|
||||
# Install dependencies
|
||||
if not install_dependencies():
|
||||
print("❌ Setup failed during dependency installation")
|
||||
sys.exit(1)
|
||||
|
||||
# Test imports
|
||||
if not test_imports():
|
||||
print("❌ Setup failed during import testing")
|
||||
sys.exit(1)
|
||||
|
||||
# Create config files
|
||||
create_config_files()
|
||||
|
||||
print("\n✅ Setup completed successfully!")
|
||||
print("\n📝 Next steps:")
|
||||
print("1. Edit .env file and add your Discord token")
|
||||
print("2. Optionally modify config.toml settings")
|
||||
print("3. Run: python main.py")
|
||||
print("\n⚠️ Remember: This tool is for educational/research purposes only")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
399
src/client.py
399
src/client.py
|
@ -1,205 +1,244 @@
|
|||
"""
|
||||
JSON database manager for Discord user data storage.
|
||||
Discord client implementation for data collection.
|
||||
"""
|
||||
|
||||
import json
|
||||
import asyncio
|
||||
import shutil
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Any
|
||||
from dataclasses import dataclass, asdict
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Optional, Set
|
||||
|
||||
try:
|
||||
import discord
|
||||
from discord.ext import tasks
|
||||
except ImportError:
|
||||
raise ImportError("discord.py-self is required. Install with: pip install discord.py-self")
|
||||
|
||||
from .config import Config
|
||||
from .database import JSONDatabase, UserData
|
||||
from .rate_limiter import RateLimiter
|
||||
|
||||
|
||||
@dataclass
|
||||
class UserData:
|
||||
"""Data structure for storing user information."""
|
||||
user_id: int
|
||||
username: str
|
||||
discriminator: str
|
||||
display_name: Optional[str] = None
|
||||
avatar_url: Optional[str] = None
|
||||
banner_url: Optional[str] = None
|
||||
bio: Optional[str] = None
|
||||
status: Optional[str] = None
|
||||
activity: Optional[str] = None
|
||||
servers: List[int] = None
|
||||
created_at: str = None
|
||||
updated_at: str = None
|
||||
class DiscordDataClient(discord.Client):
|
||||
"""Custom Discord client for collecting user data."""
|
||||
|
||||
def __post_init__(self):
|
||||
if self.servers is None:
|
||||
self.servers = []
|
||||
def __init__(self, config: Config, database: JSONDatabase):
|
||||
|
||||
|
||||
super().__init__()
|
||||
|
||||
self.config = config
|
||||
self.database = database
|
||||
self.rate_limiter = RateLimiter(
|
||||
requests_per_minute=config.max_requests_per_minute,
|
||||
delay_between_requests=config.request_delay
|
||||
)
|
||||
|
||||
current_time = datetime.utcnow().isoformat()
|
||||
if self.created_at is None:
|
||||
self.created_at = current_time
|
||||
self.updated_at = current_time
|
||||
|
||||
|
||||
class JSONDatabase:
|
||||
"""JSON-based database for storing Discord user data."""
|
||||
|
||||
def __init__(self, database_path: str):
|
||||
"""Initialize the JSON database."""
|
||||
self.database_path = Path(database_path)
|
||||
self.backup_path = Path("data/backups")
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self._lock = asyncio.Lock()
|
||||
self._data: Dict[str, Dict] = {}
|
||||
self.processed_users: Set[int] = set()
|
||||
self.target_servers = set(config.get_target_servers())
|
||||
|
||||
# Ensure database directory exists
|
||||
self.database_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
self.backup_path.mkdir(parents=True, exist_ok=True)
|
||||
# Start background tasks
|
||||
self.cleanup_task.start()
|
||||
self.stats_task.start()
|
||||
|
||||
async def on_ready(self):
|
||||
"""Called when the client is ready."""
|
||||
self.logger.info(f"Logged in as {self.user} (ID: {self.user.id})")
|
||||
self.logger.info(f"Connected to {len(self.guilds)} servers")
|
||||
|
||||
# Load existing data
|
||||
self._load_data()
|
||||
# Initial scan of server members
|
||||
await self._scan_all_servers()
|
||||
|
||||
def _load_data(self):
|
||||
"""Load data from JSON file."""
|
||||
if self.database_path.exists():
|
||||
try:
|
||||
with open(self.database_path, 'r', encoding='utf-8') as f:
|
||||
self._data = json.load(f)
|
||||
self.logger.info(f"Loaded {len(self._data)} users from database")
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error loading database: {e}")
|
||||
self._data = {}
|
||||
else:
|
||||
self._data = {}
|
||||
self.logger.info("Created new database")
|
||||
|
||||
async def _save_data(self):
|
||||
"""Save data to JSON file."""
|
||||
async with self._lock:
|
||||
try:
|
||||
# Create backup before saving
|
||||
if self.database_path.exists():
|
||||
backup_filename = f"users_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
||||
backup_path = self.backup_path / backup_filename
|
||||
shutil.copy2(self.database_path, backup_path)
|
||||
|
||||
# Save data
|
||||
with open(self.database_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(self._data, f, indent=2, ensure_ascii=False)
|
||||
|
||||
self.logger.debug(f"Saved {len(self._data)} users to database")
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error saving database: {e}")
|
||||
|
||||
async def get_user(self, user_id: int) -> Optional[UserData]:
|
||||
"""Get user data by ID."""
|
||||
user_key = str(user_id)
|
||||
if user_key in self._data:
|
||||
user_dict = self._data[user_key]
|
||||
return UserData(**user_dict)
|
||||
return None
|
||||
|
||||
async def save_user(self, user_data: UserData):
|
||||
"""Save or update user data."""
|
||||
user_key = str(user_data.user_id)
|
||||
async def on_message(self, message):
|
||||
"""Handle incoming messages."""
|
||||
# Skip messages from bots
|
||||
if message.author.bot:
|
||||
return
|
||||
|
||||
# If user exists, preserve created_at timestamp
|
||||
if user_key in self._data:
|
||||
user_data.created_at = self._data[user_key]['created_at']
|
||||
# Check if we should monitor this server
|
||||
if not self._should_monitor_server(message.guild.id):
|
||||
return
|
||||
|
||||
# Update timestamp
|
||||
user_data.updated_at = datetime.utcnow().isoformat()
|
||||
# Process the message author
|
||||
await self._process_user(message.author, message.guild.id)
|
||||
|
||||
async def on_member_join(self, member):
|
||||
"""Handle member join events."""
|
||||
if not self._should_monitor_server(member.guild.id):
|
||||
return
|
||||
|
||||
# Save to memory
|
||||
self._data[user_key] = asdict(user_data)
|
||||
await self._process_user(member, member.guild.id)
|
||||
|
||||
async def on_member_update(self, before, after):
|
||||
"""Handle member update events."""
|
||||
if not self._should_monitor_server(after.guild.id):
|
||||
return
|
||||
|
||||
# Save to disk
|
||||
await self._save_data()
|
||||
# Only process if relevant data changed
|
||||
if (before.display_name != after.display_name or
|
||||
before.avatar != after.avatar or
|
||||
before.status != after.status):
|
||||
await self._process_user(after, after.guild.id)
|
||||
|
||||
async def on_user_update(self, before, after):
|
||||
"""Handle user update events."""
|
||||
# Process user if they're in any monitored servers
|
||||
for guild in self.guilds:
|
||||
if self._should_monitor_server(guild.id):
|
||||
member = guild.get_member(after.id)
|
||||
if member:
|
||||
await self._process_user(member, guild.id)
|
||||
break
|
||||
|
||||
def _should_monitor_server(self, server_id: int) -> bool:
|
||||
"""Check if we should monitor this server."""
|
||||
if self.config.monitor_all_servers:
|
||||
return True
|
||||
return server_id in self.target_servers
|
||||
|
||||
async def _scan_all_servers(self):
|
||||
"""Scan all server members initially."""
|
||||
self.logger.info("Starting initial server scan...")
|
||||
|
||||
self.logger.debug(f"Saved user {user_data.username}#{user_data.discriminator} ({user_data.user_id})")
|
||||
|
||||
async def add_server_to_user(self, user_id: int, server_id: int):
|
||||
"""Add a server to user's server list."""
|
||||
user_key = str(user_id)
|
||||
if user_key in self._data:
|
||||
if server_id not in self._data[user_key]['servers']:
|
||||
self._data[user_key]['servers'].append(server_id)
|
||||
self._data[user_key]['updated_at'] = datetime.utcnow().isoformat()
|
||||
await self._save_data()
|
||||
|
||||
async def get_all_users(self) -> List[UserData]:
|
||||
"""Get all users from the database."""
|
||||
return [UserData(**user_dict) for user_dict in self._data.values()]
|
||||
|
||||
async def get_users_by_server(self, server_id: int) -> List[UserData]:
|
||||
"""Get all users that are members of a specific server."""
|
||||
users = []
|
||||
for user_dict in self._data.values():
|
||||
if server_id in user_dict.get('servers', []):
|
||||
users.append(UserData(**user_dict))
|
||||
return users
|
||||
|
||||
async def get_user_count(self) -> int:
|
||||
"""Get total number of users in database."""
|
||||
return len(self._data)
|
||||
|
||||
async def get_server_count(self) -> int:
|
||||
"""Get total number of unique servers."""
|
||||
servers = set()
|
||||
for user_dict in self._data.values():
|
||||
servers.update(user_dict.get('servers', []))
|
||||
return len(servers)
|
||||
|
||||
async def cleanup_old_backups(self, max_backups: int = 10):
|
||||
"""Clean up old backup files, keeping only the most recent ones."""
|
||||
backup_files = sorted(self.backup_path.glob("users_backup_*.json"))
|
||||
|
||||
if len(backup_files) > max_backups:
|
||||
files_to_remove = backup_files[:-max_backups]
|
||||
for file_path in files_to_remove:
|
||||
try:
|
||||
file_path.unlink()
|
||||
self.logger.info(f"Removed old backup: {file_path.name}")
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error removing backup {file_path.name}: {e}")
|
||||
|
||||
async def export_to_csv(self, output_path: str):
|
||||
"""Export user data to CSV format."""
|
||||
import csv
|
||||
|
||||
output_path = Path(output_path)
|
||||
|
||||
try:
|
||||
with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
|
||||
fieldnames = ['user_id', 'username', 'discriminator', 'display_name',
|
||||
'avatar_url', 'bio', 'status', 'servers', 'created_at', 'updated_at']
|
||||
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
||||
|
||||
writer.writeheader()
|
||||
for user_dict in self._data.values():
|
||||
# Convert servers list to string
|
||||
user_dict_copy = user_dict.copy()
|
||||
user_dict_copy['servers'] = ','.join(map(str, user_dict.get('servers', [])))
|
||||
writer.writerow(user_dict_copy)
|
||||
for guild in self.guilds:
|
||||
if not self._should_monitor_server(guild.id):
|
||||
continue
|
||||
|
||||
self.logger.info(f"Exported {len(self._data)} users to {output_path}")
|
||||
self.logger.info(f"Scanning server: {guild.name} ({guild.id})")
|
||||
|
||||
try:
|
||||
# Get all members
|
||||
members = [member async for member in guild.fetch_members(limit=None)]
|
||||
|
||||
for member in members:
|
||||
if not member.bot:
|
||||
await self._process_user(member, guild.id)
|
||||
|
||||
# Rate limiting
|
||||
await self.rate_limiter.wait()
|
||||
|
||||
self.logger.info(f"Processed {len(members)} members from {guild.name}")
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error scanning server {guild.name}: {e}")
|
||||
|
||||
self.logger.info("Initial server scan completed")
|
||||
|
||||
async def _process_user(self, user, server_id: int):
|
||||
"""Process a user and save their data."""
|
||||
try:
|
||||
# Check if we've already processed this user recently
|
||||
if user.id in self.processed_users:
|
||||
# Just add server to existing user
|
||||
await self.database.add_server_to_user(user.id, server_id)
|
||||
return
|
||||
|
||||
# Rate limiting
|
||||
await self.rate_limiter.wait()
|
||||
|
||||
# Get existing user data
|
||||
existing_user = await self.database.get_user(user.id)
|
||||
|
||||
# Create user data
|
||||
user_data = UserData(
|
||||
user_id=user.id,
|
||||
username=user.name,
|
||||
discriminator=user.discriminator,
|
||||
display_name=getattr(user, 'display_name', None),
|
||||
avatar_url=str(user.avatar.url) if user.avatar else None,
|
||||
banner_url=str(user.banner.url) if hasattr(user, 'banner') and user.banner else None,
|
||||
bio=await self._get_user_bio(user),
|
||||
status=str(user.status) if hasattr(user, 'status') else None,
|
||||
activity=str(user.activity) if hasattr(user, 'activity') and user.activity else None,
|
||||
servers=[server_id] if existing_user is None else existing_user.servers,
|
||||
created_at=existing_user.created_at if existing_user else None
|
||||
)
|
||||
|
||||
# Add server to list if not already there
|
||||
if server_id not in user_data.servers:
|
||||
user_data.servers.append(server_id)
|
||||
|
||||
# Save user data
|
||||
await self.database.save_user(user_data)
|
||||
|
||||
# Mark as processed
|
||||
self.processed_users.add(user.id)
|
||||
|
||||
self.logger.debug(f"Processed user: {user.name}#{user.discriminator}")
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error exporting to CSV: {e}")
|
||||
self.logger.error(f"Error processing user {user.name}: {e}")
|
||||
|
||||
async def get_statistics(self) -> Dict[str, Any]:
|
||||
"""Get database statistics."""
|
||||
stats = {
|
||||
'total_users': await self.get_user_count(),
|
||||
'total_servers': await self.get_server_count(),
|
||||
'database_size': self.database_path.stat().st_size if self.database_path.exists() else 0
|
||||
}
|
||||
async def _get_user_bio(self, user) -> Optional[str]:
|
||||
"""Get user bio/about me section."""
|
||||
if not self.config.collect_bio:
|
||||
return None
|
||||
|
||||
# Most active servers
|
||||
server_counts = {}
|
||||
for user_dict in self._data.values():
|
||||
for server_id in user_dict.get('servers', []):
|
||||
server_counts[server_id] = server_counts.get(server_id, 0) + 1
|
||||
try:
|
||||
# Try to get user profile
|
||||
if hasattr(user, 'id'):
|
||||
profile = await self.fetch_user(user.id)
|
||||
return getattr(profile, 'bio', None)
|
||||
except Exception as e:
|
||||
self.logger.debug(f"Could not fetch bio for user {user.name}: {e}")
|
||||
|
||||
stats['most_active_servers'] = sorted(server_counts.items(),
|
||||
key=lambda x: x[1], reverse=True)[:10]
|
||||
return None
|
||||
|
||||
@tasks.loop(hours=1)
|
||||
async def cleanup_task(self):
|
||||
"""Periodic cleanup task."""
|
||||
try:
|
||||
# Clean up old backups
|
||||
await self.database.cleanup_old_backups()
|
||||
|
||||
# Clear processed users set to allow re-processing
|
||||
self.processed_users.clear()
|
||||
|
||||
self.logger.info("Cleanup task completed")
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error in cleanup task: {e}")
|
||||
|
||||
@tasks.loop(minutes=30)
|
||||
async def stats_task(self):
|
||||
"""Periodic statistics logging."""
|
||||
try:
|
||||
stats = await self.database.get_statistics()
|
||||
self.logger.info(f"Database stats: {stats['total_users']} users, "
|
||||
f"{stats['total_servers']} servers, "
|
||||
f"{stats['database_size']} bytes")
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error in stats task: {e}")
|
||||
|
||||
async def export_data(self, format_type: str = "csv", output_path: str = None):
|
||||
"""Export collected data."""
|
||||
if output_path is None:
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
output_path = f"data/export_{timestamp}.{format_type}"
|
||||
|
||||
return stats
|
||||
if format_type == "csv":
|
||||
await self.database.export_to_csv(output_path)
|
||||
else:
|
||||
raise ValueError(f"Unsupported export format: {format_type}")
|
||||
|
||||
self.logger.info(f"Data exported to {output_path}")
|
||||
|
||||
async def get_user_info(self, user_id: int) -> Optional[UserData]:
|
||||
"""Get information about a specific user."""
|
||||
return await self.database.get_user(user_id)
|
||||
|
||||
async def get_server_users(self, server_id: int) -> list:
|
||||
"""Get all users from a specific server."""
|
||||
return await self.database.get_users_by_server(server_id)
|
||||
|
||||
async def close(self):
|
||||
"""Clean shutdown."""
|
||||
# Cancel background tasks
|
||||
self.cleanup_task.cancel()
|
||||
self.stats_task.cancel()
|
||||
|
||||
# Close parent client
|
||||
await super().close()
|
||||
|
||||
self.logger.info("Discord client closed")
|
94
test_imports.py
Normal file
94
test_imports.py
Normal file
|
@ -0,0 +1,94 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test script to verify all imports work correctly
|
||||
"""
|
||||
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
def test_import(module_name, import_statement):
|
||||
"""Test a specific import."""
|
||||
try:
|
||||
exec(import_statement)
|
||||
print(f"✅ {module_name}: OK")
|
||||
return True
|
||||
except ImportError as e:
|
||||
print(f"❌ {module_name}: {e}")
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"❌ {module_name}: Unexpected error - {e}")
|
||||
return False
|
||||
|
||||
def main():
|
||||
"""Test all required imports."""
|
||||
print("🧪 Testing Discord Data Collector Imports")
|
||||
print("=" * 50)
|
||||
|
||||
tests = [
|
||||
("discord.py-self", "import discord"),
|
||||
("toml", "import toml"),
|
||||
("python-dotenv", "from dotenv import load_dotenv"),
|
||||
("pathlib", "from pathlib import Path"),
|
||||
("asyncio", "import asyncio"),
|
||||
("logging", "import logging"),
|
||||
("datetime", "from datetime import datetime"),
|
||||
("json", "import json"),
|
||||
("dataclasses", "from dataclasses import dataclass, asdict"),
|
||||
("collections", "from collections import deque"),
|
||||
("time", "import time"),
|
||||
("typing", "from typing import Optional, Set, Dict, List, Any"),
|
||||
]
|
||||
|
||||
failed = 0
|
||||
for module_name, import_statement in tests:
|
||||
if not test_import(module_name, import_statement):
|
||||
failed += 1
|
||||
|
||||
print(f"\n📊 Results: {len(tests) - failed}/{len(tests)} imports successful")
|
||||
|
||||
if failed == 0:
|
||||
print("✅ All imports successful! Testing local modules...")
|
||||
|
||||
# Test local modules
|
||||
try:
|
||||
# Add current directory to path
|
||||
sys.path.insert(0, '.')
|
||||
|
||||
# Test config
|
||||
from src.config import Config
|
||||
print("✅ src.config: OK")
|
||||
|
||||
# Test database
|
||||
from src.database import JSONDatabase, UserData
|
||||
print("✅ src.database: OK")
|
||||
|
||||
# Test rate limiter
|
||||
from src.rate_limiter import RateLimiter
|
||||
print("✅ src.rate_limiter: OK")
|
||||
|
||||
# Test logger
|
||||
from src.logger import setup_logger
|
||||
print("✅ src.logger: OK")
|
||||
|
||||
# Test client
|
||||
from src.client import DiscordDataClient
|
||||
print("✅ src.client: OK")
|
||||
|
||||
print("\n🎉 All tests passed! The application should work correctly.")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Local module test failed: {e}")
|
||||
print("\nDetailed error:")
|
||||
traceback.print_exc()
|
||||
return False
|
||||
|
||||
else:
|
||||
print(f"\n❌ {failed} import(s) failed. Please install missing dependencies:")
|
||||
print("pip install discord.py-self python-dotenv toml colorlog")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
if __name__ == "__main__":
|
||||
success = main()
|
||||
sys.exit(0 if success else 1)
|
Loading…
Reference in a new issue