408 lines
17 KiB
Python
408 lines
17 KiB
Python
"""
|
|
Discord client implementation for data collection.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Optional, Set
|
|
|
|
try:
|
|
import discord
|
|
from discord.ext import tasks
|
|
except ImportError:
|
|
raise ImportError("discord.py-self is required. Install with: pip install discord.py-self")
|
|
|
|
from .config import Config
|
|
from .database import UserData
|
|
from .rate_limiter import RateLimiter
|
|
|
|
|
|
class DiscordDataClient(discord.Client):
|
|
"""Custom Discord client for collecting user data."""
|
|
|
|
def __init__(self, config: Config, database):
|
|
super().__init__()
|
|
|
|
self.config = config
|
|
self.database = database
|
|
self.rate_limiter = RateLimiter(
|
|
requests_per_minute=config.max_requests_per_minute,
|
|
delay_between_requests=config.request_delay
|
|
)
|
|
|
|
self.logger = logging.getLogger(__name__)
|
|
self.processed_users: Set[int] = set()
|
|
self.target_servers = set(config.get_target_servers())
|
|
|
|
# Initialize tasks properly - don't start them yet
|
|
self._setup_tasks()
|
|
|
|
def _setup_tasks(self):
|
|
"""Set up the background tasks."""
|
|
@tasks.loop(hours=1)
|
|
async def cleanup_task():
|
|
"""Periodic cleanup task."""
|
|
try:
|
|
# Clean up old backups
|
|
await self.database.cleanup_old_backups()
|
|
|
|
# Clear processed users set to allow re-processing
|
|
self.processed_users.clear()
|
|
|
|
self.logger.info("Cleanup task completed")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error in cleanup task: {e}")
|
|
|
|
@tasks.loop(minutes=30)
|
|
async def stats_task():
|
|
"""Periodic statistics logging."""
|
|
try:
|
|
stats = await self.database.get_statistics()
|
|
self.logger.info(f"Database stats: {stats['total_users']} users, "
|
|
f"{stats['total_servers']} servers, "
|
|
f"{stats['database_size']} bytes")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error in stats task: {e}")
|
|
|
|
# Assign tasks to instance
|
|
self.cleanup_task = cleanup_task
|
|
self.stats_task = stats_task
|
|
|
|
async def on_ready(self):
|
|
"""Called when the client is ready."""
|
|
self.logger.info(f"Logged in as {self.user} (ID: {self.user.id})")
|
|
self.logger.info(f"Connected to {len(self.guilds)} servers")
|
|
|
|
# Start background tasks after we're ready
|
|
self.cleanup_task.start()
|
|
self.stats_task.start()
|
|
|
|
# Initial scan of server members
|
|
await self._scan_all_servers()
|
|
|
|
async def on_message(self, message):
|
|
"""Handle incoming messages."""
|
|
# Skip messages from bots
|
|
if message.author.bot:
|
|
return
|
|
|
|
# Check if we should monitor this server
|
|
if not self._should_monitor_server(message.guild.id):
|
|
return
|
|
|
|
# Process the message author
|
|
await self._process_user(message.author, message.guild.id)
|
|
|
|
async def on_member_join(self, member):
|
|
"""Handle member join events."""
|
|
if not self._should_monitor_server(member.guild.id):
|
|
return
|
|
|
|
await self._process_user(member, member.guild.id)
|
|
|
|
async def on_member_update(self, before, after):
|
|
"""Handle member update events."""
|
|
if not self._should_monitor_server(after.guild.id):
|
|
return
|
|
|
|
# Only process if relevant data changed
|
|
if (before.display_name != after.display_name or
|
|
before.avatar != after.avatar or
|
|
before.status != after.status):
|
|
await self._process_user(after, after.guild.id)
|
|
|
|
async def on_user_update(self, before, after):
|
|
"""Handle user update events."""
|
|
# Process user if they're in any monitored servers
|
|
for guild in self.guilds:
|
|
if self._should_monitor_server(guild.id):
|
|
member = guild.get_member(after.id)
|
|
if member:
|
|
await self._process_user(member, guild.id)
|
|
break
|
|
|
|
def _should_monitor_server(self, server_id: int) -> bool:
|
|
"""Check if we should monitor this server."""
|
|
if self.config.monitor_all_servers:
|
|
return True
|
|
return server_id in self.target_servers
|
|
|
|
async def _update_status(self):
|
|
while True:
|
|
server_count = len(self.guilds)
|
|
await self.change_presence(
|
|
status=discord.Status.online,
|
|
activity=discord.CustomActivity(name=f"watching {server_count} servers")
|
|
)
|
|
await asyncio.sleep(30) # Update every 30 seconds
|
|
|
|
async def _scan_all_servers(self):
|
|
"""Scan all server members initially."""
|
|
self.logger.info("Starting initial server scan...")
|
|
|
|
for guild in self.guilds:
|
|
if not self._should_monitor_server(guild.id):
|
|
continue
|
|
|
|
self.logger.info(f"Scanning server: {guild.name} ({guild.id})")
|
|
|
|
try:
|
|
# Get all members - discord.py-self API
|
|
members = []
|
|
member_iterator = await guild.fetch_members()
|
|
async for member in member_iterator:
|
|
members.append(member)
|
|
|
|
for member in members:
|
|
if not member.bot:
|
|
await self._process_user(member, guild.id)
|
|
|
|
# Rate limiting
|
|
await self.rate_limiter.wait()
|
|
|
|
self.logger.info(f"Processed {len(members)} members from {guild.name}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error scanning server {guild.name}: {e}")
|
|
|
|
self.logger.info("Initial server scan completed")
|
|
|
|
async def _process_user(self, user, server_id: int):
|
|
"""Process a user and save their data."""
|
|
try:
|
|
# Check if we've already processed this user recently
|
|
if user.id in self.processed_users:
|
|
# Just add server to existing user
|
|
await self.database.add_server_to_user(user.id, server_id)
|
|
return
|
|
|
|
# Rate limiting
|
|
await self.rate_limiter.wait()
|
|
|
|
# Get existing user data
|
|
existing_user = await self.database.get_user(user.id)
|
|
|
|
# Create user data
|
|
user_data = UserData(
|
|
user_id=user.id,
|
|
username=user.name,
|
|
discriminator=user.discriminator,
|
|
display_name=getattr(user, 'display_name', None),
|
|
avatar_url=str(user.avatar.url) if user.avatar else None,
|
|
banner_url=str(user.banner.url) if hasattr(user, 'banner') and user.banner else None,
|
|
bio=await self._get_user_bio(user),
|
|
status=self._get_user_status(user),
|
|
activity=self._get_user_activity(user),
|
|
servers=[server_id] if existing_user is None else existing_user.servers,
|
|
created_at=existing_user.created_at if existing_user else None
|
|
)
|
|
|
|
# Add server to list if not already there
|
|
if server_id not in user_data.servers:
|
|
user_data.servers.append(server_id)
|
|
|
|
# Save user data
|
|
await self.database.save_user(user_data)
|
|
|
|
# Mark as processed
|
|
self.processed_users.add(user.id)
|
|
|
|
self.logger.debug(f"Processed user: {user.name}#{user.discriminator}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error processing user {user.name}: {e}")
|
|
|
|
async def _get_user_bio(self, user) -> Optional[str]:
|
|
"""Get user bio/about me section."""
|
|
if not self.config.collect_bio:
|
|
return None
|
|
|
|
try:
|
|
bio = None
|
|
|
|
# Debug logging
|
|
self.logger.debug(f"User object type: {type(user)}")
|
|
self.logger.debug(f"User attributes: {[attr for attr in dir(user) if not attr.startswith('_')]}")
|
|
self.logger.debug(f"Client methods: {[method for method in dir(self) if 'profile' in method.lower()]}")
|
|
|
|
# Method 1: Check if user object already has bio (for ClientUser)
|
|
if hasattr(user, 'bio') and user.bio:
|
|
bio = user.bio
|
|
self.logger.debug(f"Found bio via user.bio for {user.name}")
|
|
return bio[:500] if bio else None
|
|
|
|
# Method 2: Try to fetch user profile specifically
|
|
if hasattr(user, 'id'):
|
|
try:
|
|
# Try multiple profile fetch methods
|
|
profile = None
|
|
|
|
# Try different possible method names
|
|
if hasattr(self, 'fetch_user_profile'):
|
|
profile = await self.fetch_user_profile(user.id)
|
|
elif hasattr(self, 'fetch_profile'):
|
|
profile = await self.fetch_profile(user.id)
|
|
elif hasattr(user, 'fetch_profile'):
|
|
profile = await user.fetch_profile()
|
|
else:
|
|
# Fallback to regular fetch_user and check for profile attr
|
|
fetched_user = await self.fetch_user(user.id)
|
|
if hasattr(fetched_user, 'profile'):
|
|
profile = fetched_user.profile
|
|
else:
|
|
profile = fetched_user
|
|
|
|
if profile:
|
|
# Check all possible bio attributes
|
|
bio_attrs = ['bio', 'display_bio', 'about', 'about_me', 'description']
|
|
for attr in bio_attrs:
|
|
if hasattr(profile, attr):
|
|
bio_value = getattr(profile, attr)
|
|
if bio_value:
|
|
bio = bio_value
|
|
self.logger.debug(f"Found {attr} via profile fetch for {user.name}")
|
|
break
|
|
|
|
if not bio:
|
|
self.logger.debug(f"Profile found but no bio attributes for {user.name}")
|
|
# Debug: log available attributes
|
|
#attrs = [attr for attr in dir(profile) if not attr.startswith('_')]
|
|
#self.logger.debug(f"Available profile attributes: {attrs}")
|
|
else:
|
|
self.logger.debug(f"No profile method available for {user.name}")
|
|
|
|
except discord.Forbidden:
|
|
self.logger.debug(f"Access denied to profile for {user.name} - user may have privacy settings enabled")
|
|
return None
|
|
except discord.NotFound:
|
|
self.logger.debug(f"Profile not found for {user.name}")
|
|
return None
|
|
except Exception as e:
|
|
self.logger.debug(f"Profile fetch failed for {user.name}: {e}")
|
|
|
|
# Method 3: Try member profile if in a guild context
|
|
if not bio and hasattr(user, 'guild') and user.guild:
|
|
try:
|
|
member_profile = None
|
|
if hasattr(user.guild, 'fetch_member_profile'):
|
|
member_profile = await user.guild.fetch_member_profile(user.id)
|
|
elif hasattr(user, 'fetch_member_profile'):
|
|
member_profile = await user.fetch_member_profile()
|
|
|
|
if member_profile:
|
|
bio_attrs = ['bio', 'display_bio', 'guild_bio', 'about', 'about_me']
|
|
for attr in bio_attrs:
|
|
if hasattr(member_profile, attr):
|
|
bio_value = getattr(member_profile, attr)
|
|
if bio_value:
|
|
bio = bio_value
|
|
self.logger.debug(f"Found {attr} via member profile for {user.name}")
|
|
break
|
|
except discord.Forbidden:
|
|
self.logger.debug(f"Access denied to member profile for {user.name}")
|
|
except Exception as e:
|
|
self.logger.debug(f"Member profile fetch failed for {user.name}: {e}")
|
|
|
|
# Method 4: Fallback to activities (last resort)
|
|
if not bio and hasattr(user, 'activities'):
|
|
for activity in user.activities:
|
|
if hasattr(activity, 'name') and activity.name and len(activity.name) > 20:
|
|
bio = f"Activity: {activity.name}"
|
|
self.logger.debug(f"Using activity as bio for {user.name}: {activity.name}")
|
|
break
|
|
|
|
if not bio:
|
|
self.logger.debug(f"No bio found for user {user.name}")
|
|
|
|
return bio[:500] if bio else None
|
|
|
|
except Exception as e:
|
|
self.logger.debug(f"Could not fetch bio for user {user.name}: {e}")
|
|
return None
|
|
|
|
def _get_user_status(self, user) -> Optional[str]:
|
|
"""Get user status with better handling."""
|
|
if not self.config.collect_status:
|
|
return None
|
|
|
|
try:
|
|
status_info = []
|
|
|
|
# Get basic status
|
|
if hasattr(user, 'status') and user.status:
|
|
status_info.append(str(user.status))
|
|
|
|
# Get desktop/mobile/web status
|
|
if hasattr(user, 'desktop_status') and user.desktop_status != discord.Status.offline:
|
|
status_info.append(f"desktop:{user.desktop_status}")
|
|
if hasattr(user, 'mobile_status') and user.mobile_status != discord.Status.offline:
|
|
status_info.append(f"mobile:{user.mobile_status}")
|
|
if hasattr(user, 'web_status') and user.web_status != discord.Status.offline:
|
|
status_info.append(f"web:{user.web_status}")
|
|
|
|
return ", ".join(status_info) if status_info else None
|
|
|
|
except Exception as e:
|
|
self.logger.debug(f"Could not get status for user {user.name}: {e}")
|
|
return None
|
|
|
|
def _get_user_activity(self, user) -> Optional[str]:
|
|
"""Get user activity with better handling."""
|
|
try:
|
|
activities = []
|
|
|
|
# Check for single activity
|
|
if hasattr(user, 'activity') and user.activity:
|
|
activities.append(str(user.activity))
|
|
|
|
# Check for multiple activities
|
|
elif hasattr(user, 'activities') and user.activities:
|
|
for activity in user.activities[:3]: # Limit to first 3 activities
|
|
if activity and hasattr(activity, 'name'):
|
|
activity_str = activity.name
|
|
if hasattr(activity, 'type') and activity.type:
|
|
activity_str = f"{activity.type.name}: {activity_str}"
|
|
activities.append(activity_str)
|
|
|
|
return " | ".join(activities) if activities else None
|
|
|
|
except Exception as e:
|
|
self.logger.debug(f"Could not get activity for user {user.name}: {e}")
|
|
return None
|
|
|
|
async def export_data(self, format_type: str = "csv", output_path: str = None):
|
|
"""Export collected data."""
|
|
if output_path is None:
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
output_path = f"data/export_{timestamp}.{format_type}"
|
|
|
|
if format_type == "csv":
|
|
await self.database.export_to_csv(output_path)
|
|
else:
|
|
raise ValueError(f"Unsupported export format: {format_type}")
|
|
|
|
self.logger.info(f"Data exported to {output_path}")
|
|
|
|
async def get_user_info(self, user_id: int) -> Optional[UserData]:
|
|
"""Get information about a specific user."""
|
|
return await self.database.get_user(user_id)
|
|
|
|
async def get_server_users(self, server_id: int) -> list:
|
|
"""Get all users from a specific server."""
|
|
return await self.database.get_users_by_server(server_id)
|
|
|
|
async def close(self):
|
|
"""Clean shutdown."""
|
|
# Cancel background tasks if they exist and are running
|
|
if hasattr(self, 'cleanup_task') and not self.cleanup_task.is_finished():
|
|
self.cleanup_task.cancel()
|
|
if hasattr(self, 'stats_task') and not self.stats_task.is_finished():
|
|
self.stats_task.cancel()
|
|
|
|
# Close parent client
|
|
await super().close()
|
|
|
|
self.logger.info("Discord client closed") |