""" Discord client implementation for data collection. """ import asyncio import logging from datetime import datetime from typing import Optional, Set try: import discord from discord.ext import tasks except ImportError: raise ImportError("discord.py-self is required. Install with: pip install discord.py-self") from .config import Config from .database import UserData from .rate_limiter import RateLimiter class DiscordDataClient(discord.Client): """Custom Discord client for collecting user data.""" def __init__(self, config: Config, database): super().__init__() self.config = config self.database = database self.rate_limiter = RateLimiter( requests_per_minute=config.max_requests_per_minute, delay_between_requests=config.request_delay ) self.logger = logging.getLogger(__name__) self.processed_users: Set[int] = set() self.target_servers = set(config.get_target_servers()) # Start background tasks self.cleanup_task.start() self.stats_task.start() async def on_ready(self): """Called when the client is ready.""" self.logger.info(f"Logged in as {self.user} (ID: {self.user.id})") self.logger.info(f"Connected to {len(self.guilds)} servers") # Initial scan of server members await self._scan_all_servers() async def on_message(self, message): """Handle incoming messages.""" # Skip messages from bots if message.author.bot: return # Check if we should monitor this server if not self._should_monitor_server(message.guild.id): return # Process the message author await self._process_user(message.author, message.guild.id) async def on_member_join(self, member): """Handle member join events.""" if not self._should_monitor_server(member.guild.id): return await self._process_user(member, member.guild.id) async def on_member_update(self, before, after): """Handle member update events.""" if not self._should_monitor_server(after.guild.id): return # Only process if relevant data changed if (before.display_name != after.display_name or before.avatar != after.avatar or before.status != after.status): await self._process_user(after, after.guild.id) async def on_user_update(self, before, after): """Handle user update events.""" # Process user if they're in any monitored servers for guild in self.guilds: if self._should_monitor_server(guild.id): member = guild.get_member(after.id) if member: await self._process_user(member, guild.id) break def _should_monitor_server(self, server_id: int) -> bool: """Check if we should monitor this server.""" if self.config.monitor_all_servers: return True return server_id in self.target_servers async def _scan_all_servers(self): """Scan all server members initially.""" self.logger.info("Starting initial server scan...") for guild in self.guilds: if not self._should_monitor_server(guild.id): continue self.logger.info(f"Scanning server: {guild.name} ({guild.id})") try: # Get all members - discord.py-self API members = [] member_iterator = await guild.fetch_members() async for member in member_iterator: members.append(member) for member in members: if not member.bot: await self._process_user(member, guild.id) # Rate limiting await self.rate_limiter.wait() self.logger.info(f"Processed {len(members)} members from {guild.name}") except Exception as e: self.logger.error(f"Error scanning server {guild.name}: {e}") self.logger.info("Initial server scan completed") async def _process_user(self, user, server_id: int): """Process a user and save their data.""" try: # Check if we've already processed this user recently if user.id in self.processed_users: # Just add server to existing user await self.database.add_server_to_user(user.id, server_id) return # Rate limiting await self.rate_limiter.wait() # Get existing user data existing_user = await self.database.get_user(user.id) # Create user data user_data = UserData( user_id=user.id, username=user.name, discriminator=user.discriminator, display_name=getattr(user, 'display_name', None), avatar_url=str(user.avatar.url) if user.avatar else None, banner_url=str(user.banner.url) if hasattr(user, 'banner') and user.banner else None, bio=await self._get_user_bio(user), status=self._get_user_status(user), activity=self._get_user_activity(user), servers=[server_id] if existing_user is None else existing_user.servers, created_at=existing_user.created_at if existing_user else None ) # Add server to list if not already there if server_id not in user_data.servers: user_data.servers.append(server_id) # Save user data await self.database.save_user(user_data) # Mark as processed self.processed_users.add(user.id) self.logger.debug(f"Processed user: {user.name}#{user.discriminator}") except Exception as e: self.logger.error(f"Error processing user {user.name}: {e}") async def get_user_bio(self, user) -> Optional[str]: """Get user bio/about me section.""" if not self.config.collect_bio: return None try: bio = None # Method 1: Check if user object already has bio (for ClientUser) if hasattr(user, 'bio') and user.bio: bio = user.bio self.logger.debug(f"Found bio via user.bio for {user.name}") return bio[:500] if bio else None # Method 2: Try to fetch user profile specifically if hasattr(user, 'id'): try: # Use fetch_user_profile instead of fetch_user # This is the key change - you need the profile endpoint profile = await self.fetch_user_profile(user.id) if hasattr(profile, 'bio') and profile.bio: bio = profile.bio self.logger.debug(f"Found bio via profile fetch for {user.name}") elif hasattr(profile, 'display_bio') and profile.display_bio: bio = profile.display_bio self.logger.debug(f"Found display_bio via profile fetch for {user.name}") else: self.logger.debug(f"No bio found in profile for {user.name}") except discord.Forbidden: self.logger.debug(f"Access denied to profile for {user.name} - user may have privacy settings enabled") return None except discord.NotFound: self.logger.debug(f"Profile not found for {user.name}") return None except Exception as e: self.logger.debug(f"Profile fetch failed for {user.name}: {e}") # Method 3: Try member profile if in a guild context if not bio and hasattr(user, 'guild') and user.guild: try: member_profile = await user.guild.fetch_member_profile(user.id) if hasattr(member_profile, 'bio') and member_profile.bio: bio = member_profile.bio self.logger.debug(f"Found bio via member profile for {user.name}") elif hasattr(member_profile, 'display_bio') and member_profile.display_bio: bio = member_profile.display_bio self.logger.debug(f"Found display_bio via member profile for {user.name}") elif hasattr(member_profile, 'guild_bio') and member_profile.guild_bio: bio = member_profile.guild_bio self.logger.debug(f"Found guild_bio via member profile for {user.name}") except discord.Forbidden: self.logger.debug(f"Access denied to member profile for {user.name}") except Exception as e: self.logger.debug(f"Member profile fetch failed for {user.name}: {e}") # Method 4: Fallback to activities (last resort) if not bio and hasattr(user, 'activities'): for activity in user.activities: if hasattr(activity, 'name') and activity.name and len(activity.name) > 20: bio = f"Activity: {activity.name}" self.logger.debug(f"Using activity as bio for {user.name}: {activity.name}") break if not bio: self.logger.debug(f"No bio found for user {user.name}") return bio[:500] if bio else None except Exception as e: self.logger.debug(f"Could not fetch bio for user {user.name}: {e}") return None def _get_user_status(self, user) -> Optional[str]: """Get user status with better handling.""" if not self.config.collect_status: return None try: status_info = [] # Get basic status if hasattr(user, 'status') and user.status: status_info.append(str(user.status)) # Get desktop/mobile/web status if hasattr(user, 'desktop_status') and user.desktop_status != discord.Status.offline: status_info.append(f"desktop:{user.desktop_status}") if hasattr(user, 'mobile_status') and user.mobile_status != discord.Status.offline: status_info.append(f"mobile:{user.mobile_status}") if hasattr(user, 'web_status') and user.web_status != discord.Status.offline: status_info.append(f"web:{user.web_status}") return ", ".join(status_info) if status_info else None except Exception as e: self.logger.debug(f"Could not get status for user {user.name}: {e}") return None def _get_user_activity(self, user) -> Optional[str]: """Get user activity with better handling.""" try: activities = [] # Check for single activity if hasattr(user, 'activity') and user.activity: activities.append(str(user.activity)) # Check for multiple activities elif hasattr(user, 'activities') and user.activities: for activity in user.activities[:3]: # Limit to first 3 activities if activity and hasattr(activity, 'name'): activity_str = activity.name if hasattr(activity, 'type') and activity.type: activity_str = f"{activity.type.name}: {activity_str}" activities.append(activity_str) return " | ".join(activities) if activities else None except Exception as e: self.logger.debug(f"Could not get activity for user {user.name}: {e}") return None @tasks.loop(hours=1) async def cleanup_task(self): """Periodic cleanup task.""" try: # Clean up old backups await self.database.cleanup_old_backups() # Clear processed users set to allow re-processing self.processed_users.clear() self.logger.info("Cleanup task completed") except Exception as e: self.logger.error(f"Error in cleanup task: {e}") @tasks.loop(minutes=30) async def stats_task(self): """Periodic statistics logging.""" try: stats = await self.database.get_statistics() self.logger.info(f"Database stats: {stats['total_users']} users, " f"{stats['total_servers']} servers, " f"{stats['database_size']} bytes") except Exception as e: self.logger.error(f"Error in stats task: {e}") async def export_data(self, format_type: str = "csv", output_path: str = None): """Export collected data.""" if output_path is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_path = f"data/export_{timestamp}.{format_type}" if format_type == "csv": await self.database.export_to_csv(output_path) else: raise ValueError(f"Unsupported export format: {format_type}") self.logger.info(f"Data exported to {output_path}") async def get_user_info(self, user_id: int) -> Optional[UserData]: """Get information about a specific user.""" return await self.database.get_user(user_id) async def get_server_users(self, server_id: int) -> list: """Get all users from a specific server.""" return await self.database.get_users_by_server(server_id) async def close(self): """Clean shutdown.""" # Cancel background tasks self.cleanup_task.cancel() self.stats_task.cancel() # Close parent client await super().close() self.logger.info("Discord client closed")