jank (misc fixes, switched to mariadb)

This commit is contained in:
Xargana 2025-07-14 01:23:39 +03:00
parent 7a16f33add
commit e4fdb14e6a
8 changed files with 375 additions and 235 deletions

View file

@ -3,7 +3,7 @@
## Commands
- **Setup**: `python setup.py` - Install dependencies and create config files
- **Run Main**: `python main.py` - Start Discord data collector
- **CLI Tools**: `python cli.py [command]` - Available: export, stats, search, backup, cleanup, test
- **CLI Tools**: `python cli.py [command]` - Available: export, stats, search, servers, user-servers, server-users, backup, cleanup, test
- **Test Imports**: `python test_imports.py` - Verify all dependencies work
- **Install Dependencies**: `pip install -r requirements.txt`
@ -11,11 +11,11 @@
- **Entry Points**: `main.py` (main app), `cli.py` (CLI tools), `setup.py` (installation)
- **Core Modules**:
- `src/client.py` - Discord client implementation with data collection
- `src/database.py` - MariaDB database manager with UserData dataclass
- `src/database.py` - Database manager with MariaDB->JSON fallback, UserData dataclass
- `src/config.py` - TOML/env configuration management
- `src/rate_limiter.py` - API rate limiting
- `src/logger.py` - Logging setup
- **Data Storage**: JSON files in `data/` directory, backups in `data/backups/`
- **Data Storage**: MariaDB (primary) or JSON files in `data/` directory, backups in `data/backups/`
- **Configuration**: `config.toml` for app settings, `.env` for Discord token
## Code Style

143
cli.py
View file

@ -13,15 +13,14 @@ from pathlib import Path
sys.path.append(str(Path(__file__).parent))
from src.config import Config
from src.database import MongoDatabase
from src.database import create_database
from src.client import DiscordDataClient
async def export_data(format_type: str, output_path: str = None):
"""Export collected data."""
config = Config()
database = MongoDatabase()
await database.initialize()
database = await create_database(mariadb_config=config.get_mariadb_config())
if output_path is None:
from datetime import datetime
@ -38,8 +37,7 @@ async def export_data(format_type: str, output_path: str = None):
async def show_stats():
"""Show database statistics."""
config = Config()
database = MongoDatabase()
await database.initialize()
database = await create_database(mariadb_config=config.get_mariadb_config())
stats = await database.get_statistics()
@ -57,8 +55,7 @@ async def show_stats():
async def search_user(query: str):
"""Search for users."""
config = Config()
database = MongoDatabase()
await database.initialize()
database = await create_database(mariadb_config=config.get_mariadb_config())
all_users = await database.get_all_users()
@ -80,17 +77,120 @@ async def search_user(query: str):
if user.display_name:
print(f" Display name: {user.display_name}")
if user.bio:
print(f" Bio: {user.bio[:100]}...")
print(f" Servers: {len(user.servers)}")
print(f" Bio: {user.bio[:100]}{'...' if len(user.bio) > 100 else ''}")
if user.status:
print(f" Status: {user.status}")
if user.activity:
print(f" Activity: {user.activity[:50]}{'...' if len(user.activity) > 50 else ''}")
print(f" Servers ({len(user.servers)}): {', '.join(map(str, user.servers[:5]))}{'...' if len(user.servers) > 5 else ''}")
print(f" Last updated: {user.updated_at}")
print()
async def list_servers():
"""List all servers with user counts."""
config = Config()
database = await create_database(mariadb_config=config.get_mariadb_config())
# Get all users
users = await database.get_all_users()
# Count users per server
server_counts = {}
for user in users:
for server_id in user.servers:
server_counts[server_id] = server_counts.get(server_id, 0) + 1
if not server_counts:
print("No servers found in database.")
return
# Sort by user count (descending)
sorted_servers = sorted(server_counts.items(), key=lambda x: x[1], reverse=True)
print(f"\n=== Servers ({len(sorted_servers)} total) ===")
for server_id, user_count in sorted_servers:
print(f"Server {server_id}: {user_count} users")
await database.close()
async def show_user_servers(user_id: str):
"""Show servers a user is in."""
config = Config()
database = await create_database(mariadb_config=config.get_mariadb_config())
try:
user_id_int = int(user_id)
user = await database.get_user(user_id_int)
if not user:
print(f"User {user_id} not found in database.")
return
print(f"\n=== User: {user.username}#{user.discriminator} ===")
print(f"User ID: {user.user_id}")
if user.display_name:
print(f"Display Name: {user.display_name}")
if user.bio:
print(f"Bio: {user.bio[:100]}{'...' if len(user.bio) > 100 else ''}")
if user.status:
print(f"Status: {user.status}")
if user.activity:
print(f"Activity: {user.activity}")
print(f"\nServers ({len(user.servers)}):")
for server_id in user.servers:
print(f" - {server_id}")
except ValueError:
print("Invalid user ID. Please provide a numeric user ID.")
finally:
await database.close()
async def show_server_users(server_id: str):
"""Show users in a server."""
config = Config()
database = await create_database(mariadb_config=config.get_mariadb_config())
try:
server_id_int = int(server_id)
users = await database.get_users_by_server(server_id_int)
if not users:
print(f"No users found for server {server_id}.")
return
print(f"\n=== Server {server_id} ({len(users)} users) ===")
# Sort users by username
users.sort(key=lambda u: u.username.lower())
for user in users:
status_info = ""
if user.status:
status_info = f" [{user.status}]"
if user.activity:
status_info += f" ({user.activity[:30]}{'...' if len(user.activity) > 30 else ''})"
print(f"{user.username}#{user.discriminator} (ID: {user.user_id}){status_info}")
if user.display_name and user.display_name != user.username:
print(f" Display: {user.display_name}")
if user.bio:
print(f" Bio: {user.bio[:80]}{'...' if len(user.bio) > 80 else ''}")
print()
except ValueError:
print("Invalid server ID. Please provide a numeric server ID.")
finally:
await database.close()
async def backup_database():
"""Create a manual backup of the database."""
config = Config()
database = MongoDatabase()
await database.initialize()
database = await create_database(mariadb_config=config.get_mariadb_config())
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
@ -113,8 +213,7 @@ async def backup_database():
async def cleanup_data():
"""Clean up old data and backups."""
config = Config()
database = MongoDatabase()
await database.initialize()
database = await create_database(mariadb_config=config.get_mariadb_config())
await database.cleanup_old_backups(max_backups=5)
print("Cleanup completed")
@ -125,8 +224,7 @@ async def test_connection():
"""Test Discord connection."""
try:
config = Config()
database = MongoDatabase()
await database.initialize()
database = await create_database(mariadb_config=config.get_mariadb_config())
client = DiscordDataClient(config, database)
print("Testing Discord connection...")
@ -162,6 +260,15 @@ def main():
search_parser = subparsers.add_parser("search", help="Search for users")
search_parser.add_argument("query", help="Search query (username or user ID)")
# Server commands
servers_parser = subparsers.add_parser("servers", help="List all servers with user counts")
user_servers_parser = subparsers.add_parser("user-servers", help="Show servers a user is in")
user_servers_parser.add_argument("user_id", help="User ID to lookup")
server_users_parser = subparsers.add_parser("server-users", help="Show users in a server")
server_users_parser.add_argument("server_id", help="Server ID to lookup")
# Backup command
subparsers.add_parser("backup", help="Create manual database backup")
@ -184,6 +291,12 @@ def main():
asyncio.run(show_stats())
elif args.command == "search":
asyncio.run(search_user(args.query))
elif args.command == "servers":
asyncio.run(list_servers())
elif args.command == "user-servers":
asyncio.run(show_user_servers(args.user_id))
elif args.command == "server-users":
asyncio.run(show_server_users(args.server_id))
elif args.command == "backup":
asyncio.run(backup_database())
elif args.command == "cleanup":

27
main.py
View file

@ -20,7 +20,7 @@ sys.path.insert(0, str(Path(__file__).parent))
try:
from src.client import DiscordDataClient
from src.config import Config
from src.database import MongoDatabase
from src.database import create_database
from src.logger import setup_logger
except ImportError as e:
print(f"❌ Import error: {e}")
@ -41,9 +41,28 @@ async def main():
logger = setup_logger(config.log_level, config.log_file)
logger.info("Starting Discord Data Collector")
# Initialize database
database = MongoDatabase()
await database.initialize()
# Initialize database with MariaDB->JSON fallback
mariadb_config = config.get_mariadb_config()
if mariadb_config:
logger.info(f"Found MariaDB config for {mariadb_config['host']}:{mariadb_config['port']}")
else:
logger.info("No MariaDB credentials found in .env, will use JSON fallback")
database = await create_database(mariadb_config=mariadb_config)
# Test Discord connectivity first
logger.info("Testing Discord connectivity...")
try:
import aiohttp
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=5)) as session:
async with session.get("https://discord.com/api/v10/gateway") as response:
if response.status != 200:
raise Exception(f"Discord API returned {response.status}")
logger.info("✅ Discord connectivity confirmed")
except Exception as e:
logger.error(f"❌ Discord connectivity failed: {e}")
logger.error("Please check network settings, firewall, or try a different network")
sys.exit(1)
# Initialize Discord client
client = DiscordDataClient(config, database)

View file

@ -7,8 +7,8 @@ discord.py-self>=2.0.0
python-dotenv>=1.0.0
toml>=0.10.2
# MongoDB integration
pymongo>=4.0.0
# Database integrations
asyncmy>=0.2.0
# Logging
colorlog>=6.0.0

View file

@ -14,14 +14,14 @@ except ImportError:
raise ImportError("discord.py-self is required. Install with: pip install discord.py-self")
from .config import Config
from .database import MongoDatabase, UserData
from .database import UserData
from .rate_limiter import RateLimiter
class DiscordDataClient(discord.Client):
"""Custom Discord client for collecting user data."""
def __init__(self, config: Config, database: MongoDatabase):
def __init__(self, config: Config, database):
super().__init__()
@ -148,8 +148,8 @@ class DiscordDataClient(discord.Client):
avatar_url=str(user.avatar.url) if user.avatar else None,
banner_url=str(user.banner.url) if hasattr(user, 'banner') and user.banner else None,
bio=await self._get_user_bio(user),
status=str(user.status) if hasattr(user, 'status') else None,
activity=str(user.activity) if hasattr(user, 'activity') and user.activity else None,
status=self._get_user_status(user),
activity=self._get_user_activity(user),
servers=[server_id] if existing_user is None else existing_user.servers,
created_at=existing_user.created_at if existing_user else None
)
@ -175,15 +175,86 @@ class DiscordDataClient(discord.Client):
return None
try:
# Try to get user profile
if hasattr(user, 'id'):
profile = await self.fetch_user(user.id)
return getattr(profile, 'bio', None)
# Multiple approaches to get bio data
bio = None
# Method 1: Check if user object already has bio
if hasattr(user, 'bio') and user.bio:
bio = user.bio
# Method 2: Try to get full user profile
elif hasattr(user, 'id'):
try:
profile = await self.fetch_user(user.id)
if hasattr(profile, 'bio') and profile.bio:
bio = profile.bio
except Exception:
pass
# Method 3: Check for activities that might contain bio-like info
if not bio and hasattr(user, 'activities'):
for activity in user.activities:
if hasattr(activity, 'name') and activity.name and len(activity.name) > 20:
bio = f"Activity: {activity.name}"
break
return bio[:500] if bio else None # Limit bio length
except Exception as e:
self.logger.debug(f"Could not fetch bio for user {user.name}: {e}")
return None
def _get_user_status(self, user) -> Optional[str]:
"""Get user status with better handling."""
if not self.config.collect_status:
return None
try:
status_info = []
# Get basic status
if hasattr(user, 'status') and user.status:
status_info.append(str(user.status))
# Get desktop/mobile/web status
if hasattr(user, 'desktop_status') and user.desktop_status != discord.Status.offline:
status_info.append(f"desktop:{user.desktop_status}")
if hasattr(user, 'mobile_status') and user.mobile_status != discord.Status.offline:
status_info.append(f"mobile:{user.mobile_status}")
if hasattr(user, 'web_status') and user.web_status != discord.Status.offline:
status_info.append(f"web:{user.web_status}")
return ", ".join(status_info) if status_info else None
except Exception as e:
self.logger.debug(f"Could not get status for user {user.name}: {e}")
return None
def _get_user_activity(self, user) -> Optional[str]:
"""Get user activity with better handling."""
try:
activities = []
# Check for single activity
if hasattr(user, 'activity') and user.activity:
activities.append(str(user.activity))
# Check for multiple activities
elif hasattr(user, 'activities') and user.activities:
for activity in user.activities[:3]: # Limit to first 3 activities
if activity and hasattr(activity, 'name'):
activity_str = activity.name
if hasattr(activity, 'type') and activity.type:
activity_str = f"{activity.type.name}: {activity_str}"
activities.append(activity_str)
return " | ".join(activities) if activities else None
except Exception as e:
self.logger.debug(f"Could not get activity for user {user.name}: {e}")
return None
@tasks.loop(hours=1)
async def cleanup_task(self):
"""Periodic cleanup task."""

View file

@ -31,6 +31,13 @@ class Config:
self.database_path = self.config_data.get("database", {}).get("path", "data/users.json")
self.backup_interval = self.config_data.get("database", {}).get("backup_interval", 3600)
# MariaDB settings from environment
self.db_host = os.getenv("DB_HOST")
self.db_port = int(os.getenv("DB_PORT", "3306"))
self.db_user = os.getenv("DB_USER")
self.db_password = os.getenv("DB_PASSWORD")
self.db_name = os.getenv("DB_NAME")
# Collection settings
collection_config = self.config_data.get("collection", {})
self.collect_profile_pics = collection_config.get("profile_pictures", True)
@ -118,4 +125,16 @@ class Config:
"""Get list of target server IDs."""
if self.monitor_all_servers:
return []
return [int(server_id) for server_id in self.target_servers]
return [int(server_id) for server_id in self.target_servers]
def get_mariadb_config(self) -> Optional[dict]:
"""Get MariaDB configuration if all required fields are present."""
if all([self.db_host, self.db_user, self.db_password, self.db_name]):
return {
'host': self.db_host,
'port': self.db_port,
'user': self.db_user,
'password': self.db_password,
'database': self.db_name
}
return None

View file

@ -11,17 +11,12 @@ from dataclasses import dataclass, asdict
from pathlib import Path
import logging
# MongoDB support
try:
from pymongo import MongoClient
from pymongo.errors import PyMongoError
MONGODB_AVAILABLE = True
except ImportError:
MONGODB_AVAILABLE = False
# MongoDB support removed for simplicity
# Optional MariaDB support
try:
from asyncmy import connect, Connection
from asyncmy import connect
from asyncmy.cursors import DictCursor
from asyncmy.errors import MySQLError
MARIADB_AVAILABLE = True
except ImportError:
@ -73,186 +68,41 @@ class UserData:
return cls(**data)
class MongoDatabase:
"""MongoDB-based database for storing Discord user data."""
# Database factory function
async def create_database(
mariadb_config: Dict[str, Any] = None,
json_fallback: bool = True
):
"""Create appropriate database instance with MariaDB->JSON fallback."""
logger = logging.getLogger(__name__)
def __init__(self, connection_string: str = "mongodb://localhost:27017", database_name: str = "discorddb"):
"""Initialize the MongoDB connection."""
if not MONGODB_AVAILABLE:
raise ImportError("pymongo is required for MongoDB support. Install with: pip install pymongo")
self.connection_string = connection_string
self.database_name = database_name
self.logger = logging.getLogger(__name__)
self.client = None
self.db = None
self.users_collection = None
async def initialize(self):
"""Initialize database connection and ensure collections exist."""
# Try MariaDB first if available and configured
if MARIADB_AVAILABLE and mariadb_config:
try:
self.client = MongoClient(self.connection_string)
self.db = self.client[self.database_name]
self.users_collection = self.db.users
# Create indexes for better performance
self.users_collection.create_index("user_id", unique=True)
self.users_collection.create_index("username")
self.users_collection.create_index("servers")
# Test connection
self.client.admin.command('ping')
self.logger.info(f"MongoDB connection established to {self.database_name}")
logger.info("Attempting MariaDB connection...")
db = MariaDBDatabase(**mariadb_config)
await db.initialize()
logger.info("✅ Using MariaDB database")
return db
except Exception as e:
self.logger.error(f"MongoDB connection failed: {e}")
raise
async def get_user(self, user_id: int) -> Optional[UserData]:
"""Get user data by ID."""
try:
doc = self.users_collection.find_one({"user_id": user_id})
if doc:
# Remove MongoDB's _id field
doc.pop('_id', None)
return UserData.from_dict(doc)
return None
except Exception as e:
self.logger.error(f"Error getting user {user_id}: {e}")
return None
async def save_user(self, user_data: UserData):
"""Save or update user data."""
try:
doc = user_data.to_dict()
self.users_collection.replace_one(
{"user_id": user_data.user_id},
doc,
upsert=True
)
self.logger.debug(f"Saved user {user_data.username}#{user_data.discriminator}")
except Exception as e:
self.logger.error(f"Error saving user {user_data.user_id}: {e}")
raise
async def add_server_to_user(self, user_id: int, server_id: int):
"""Add a server to user's server list."""
try:
self.users_collection.update_one(
{"user_id": user_id},
{"$addToSet": {"servers": server_id}}
)
except Exception as e:
self.logger.error(f"Error adding server {server_id} to user {user_id}: {e}")
async def get_all_users(self) -> List[UserData]:
"""Get all users from the database."""
try:
users = []
for doc in self.users_collection.find():
doc.pop('_id', None)
users.append(UserData.from_dict(doc))
return users
except Exception as e:
self.logger.error(f"Error getting all users: {e}")
return []
async def get_users_by_server(self, server_id: int) -> List[UserData]:
"""Get all users that are members of a specific server."""
try:
users = []
for doc in self.users_collection.find({"servers": server_id}):
doc.pop('_id', None)
users.append(UserData.from_dict(doc))
return users
except Exception as e:
self.logger.error(f"Error getting users by server {server_id}: {e}")
return []
async def get_user_count(self) -> int:
"""Get total number of users in database."""
try:
return self.users_collection.count_documents({})
except Exception as e:
self.logger.error(f"Error getting user count: {e}")
return 0
async def get_server_count(self) -> int:
"""Get total number of unique servers."""
try:
pipeline = [
{"$unwind": "$servers"},
{"$group": {"_id": "$servers"}},
{"$count": "total"}
]
result = list(self.users_collection.aggregate(pipeline))
return result[0]["total"] if result else 0
except Exception as e:
self.logger.error(f"Error getting server count: {e}")
return 0
async def get_statistics(self) -> Dict[str, Any]:
"""Get database statistics."""
try:
total_users = await self.get_user_count()
total_servers = await self.get_server_count()
# Get most active servers
pipeline = [
{"$unwind": "$servers"},
{"$group": {"_id": "$servers", "user_count": {"$sum": 1}}},
{"$sort": {"user_count": -1}},
{"$limit": 10}
]
most_active = []
for doc in self.users_collection.aggregate(pipeline):
most_active.append((doc["_id"], doc["user_count"]))
# Get database size (estimate)
try:
stats = self.db.command("collStats", "users")
database_size = stats.get("size", 0)
except:
database_size = 0
return {
'total_users': total_users,
'total_servers': total_servers,
'most_active_servers': most_active,
'database_size': database_size
}
except Exception as e:
self.logger.error(f"Error getting statistics: {e}")
return {'total_users': 0, 'total_servers': 0, 'most_active_servers': [], 'database_size': 0}
async def export_to_csv(self, output_path: str):
"""Export data to CSV format."""
import csv
users = await self.get_all_users()
with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['user_id', 'username', 'discriminator', 'display_name',
'avatar_url', 'banner_url', 'bio', 'status', 'activity',
'servers', 'created_at', 'updated_at']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for user in users:
row = user.to_dict()
row['servers'] = ','.join(map(str, user.servers))
writer.writerow(row)
async def cleanup_old_backups(self, max_backups: int = 5):
"""Clean up old backup documents (placeholder for MongoDB)."""
# In MongoDB, we might implement this as removing old backup collections
# or documents with old timestamps
self.logger.info("MongoDB cleanup completed")
async def close(self):
"""Close database connection."""
if self.client:
self.client.close()
self.logger.info("MongoDB connection closed")
logger.warning(f"MariaDB failed: {e}, falling back to JSON...")
elif mariadb_config:
logger.warning("MariaDB not available, falling back to JSON")
else:
logger.info("MariaDB not configured, using JSON database")
# Fallback to JSON
if json_fallback:
logger.info("Using JSON database")
db = JSONDatabase("data/users.json")
await db.initialize()
logger.info("✅ Using JSON database")
return db
raise RuntimeError("No database backend available")
# Keep the original JSONDatabase class for fallback
@ -392,18 +242,7 @@ class JSONDatabase:
self.logger.info("JSON database closed")
# Database factory function
def create_database(use_mongodb: bool = True, **kwargs):
"""Create appropriate database instance based on availability and preference."""
if use_mongodb and MONGODB_AVAILABLE:
try:
return MongoDatabase(**kwargs)
except Exception as e:
logging.getLogger(__name__).warning(f"MongoDB initialization failed: {e}, falling back to JSON")
# Fallback to JSON database
database_path = kwargs.get('database_path', 'data/users.json')
return JSONDatabase(database_path)
class MariaDBDatabase:
@ -435,6 +274,8 @@ class MariaDBDatabase:
async def initialize(self):
"""Initialize database connection and ensure tables exist."""
try:
# Add DictCursor to config for dictionary results
self.db_config['cursor_cls'] = DictCursor
self.pool = await connect(**self.db_config)
await self._create_tables()
self.logger.info("Database connection established")
@ -615,16 +456,16 @@ class MariaDBDatabase:
async def get_user_count(self) -> int:
"""Get total number of users in database."""
async with self.pool.cursor() as cursor:
await cursor.execute("SELECT COUNT(*) FROM users")
await cursor.execute("SELECT COUNT(*) as user_count FROM users")
result = await cursor.fetchone()
return result['COUNT(*)']
return result['user_count'] if result else 0
async def get_server_count(self) -> int:
"""Get total number of unique servers."""
async with self.pool.cursor() as cursor:
await cursor.execute("SELECT COUNT(DISTINCT server_id) FROM user_servers")
await cursor.execute("SELECT COUNT(DISTINCT server_id) as server_count FROM user_servers")
result = await cursor.fetchone()
return result['COUNT(DISTINCT server_id)']
return result['server_count'] if result else 0
async def get_statistics(self) -> Dict[str, Any]:
"""Get database statistics using optimized queries."""
@ -641,7 +482,19 @@ class MariaDBDatabase:
ORDER BY user_count DESC
LIMIT 10
""")
stats['most_active_servers'] = await cursor.fetchall()
most_active = await cursor.fetchall()
# Convert to list of tuples for consistency with JSON version
stats['most_active_servers'] = [(row['server_id'], row['user_count']) for row in most_active]
# Get database size
await cursor.execute("""
SELECT
ROUND(SUM(data_length + index_length) / 1024 / 1024, 2) AS database_size_mb
FROM information_schema.tables
WHERE table_schema = DATABASE()
""")
size_result = await cursor.fetchone()
stats['database_size'] = int((size_result['database_size_mb'] or 0) * 1024 * 1024) # Convert to bytes
return stats

View file

@ -0,0 +1,65 @@
#!/usr/bin/env python3
"""
Test Discord connectivity without the full application
"""
import asyncio
import aiohttp
import socket
async def test_discord_connectivity():
"""Test various Discord endpoints to diagnose connectivity issues."""
endpoints = [
"https://discord.com",
"https://discordapp.com",
"https://gateway.discord.gg",
"https://cdn.discordapp.com"
]
# Test basic socket connection first
print("🔍 Testing socket connection to discord.com:443...")
try:
sock = socket.create_connection(("discord.com", 443), timeout=10)
sock.close()
print("✅ Socket connection successful")
except Exception as e:
print(f"❌ Socket connection failed: {e}")
return False
# Test HTTP connections
print("\n🔍 Testing HTTPS endpoints...")
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) as session:
for endpoint in endpoints:
try:
async with session.get(endpoint) as response:
print(f"{endpoint}: {response.status}")
except Exception as e:
print(f"{endpoint}: {e}")
print("\n🔍 Testing Discord API directly...")
try:
async with aiohttp.ClientSession() as session:
async with session.get("https://discord.com/api/v10/gateway") as response:
if response.status == 200:
data = await response.json()
print(f"✅ Discord API: {data.get('url', 'No gateway URL')}")
return True
else:
print(f"❌ Discord API: HTTP {response.status}")
return False
except Exception as e:
print(f"❌ Discord API: {e}")
return False
if __name__ == "__main__":
result = asyncio.run(test_discord_connectivity())
if result:
print("\n🎉 Discord connectivity looks good!")
else:
print("\n💡 Possible solutions:")
print("1. Check firewall settings")
print("2. Try different DNS (1.1.1.1 or 8.8.8.8)")
print("3. Disable VPN if active")
print("4. Check corporate proxy settings")
print("5. Try from a different network")