mirror of
https://github.com/OpenHands/OpenHands.git
synced 2026-03-22 05:37:20 +08:00
Remove Common Room sync scripts (#13347)
Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
@@ -1,562 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
Common Room Sync
|
|
||||||
|
|
||||||
This script queries the database to count conversations created by each user,
|
|
||||||
then creates or updates a signal in Common Room for each user with their
|
|
||||||
conversation count.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
import time
|
|
||||||
from datetime import UTC, datetime
|
|
||||||
from typing import Any, Dict, List, Optional, Set
|
|
||||||
|
|
||||||
import requests
|
|
||||||
from sqlalchemy import text
|
|
||||||
|
|
||||||
# Add the parent directory to the path so we can import from storage
|
|
||||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
||||||
from server.auth.token_manager import get_keycloak_admin
|
|
||||||
from storage.database import get_engine
|
|
||||||
|
|
||||||
# Configure logging
|
|
||||||
logging.basicConfig(
|
|
||||||
level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
||||||
)
|
|
||||||
logger = logging.getLogger('common_room_sync')
|
|
||||||
|
|
||||||
# Common Room API configuration
|
|
||||||
COMMON_ROOM_API_KEY = os.environ.get('COMMON_ROOM_API_KEY')
|
|
||||||
COMMON_ROOM_DESTINATION_SOURCE_ID = os.environ.get('COMMON_ROOM_DESTINATION_SOURCE_ID')
|
|
||||||
COMMON_ROOM_API_BASE_URL = 'https://api.commonroom.io/community/v1'
|
|
||||||
|
|
||||||
# Sync configuration
|
|
||||||
BATCH_SIZE = int(os.environ.get('BATCH_SIZE', '100'))
|
|
||||||
KEYCLOAK_BATCH_SIZE = int(os.environ.get('KEYCLOAK_BATCH_SIZE', '20'))
|
|
||||||
MAX_RETRIES = int(os.environ.get('MAX_RETRIES', '3'))
|
|
||||||
INITIAL_BACKOFF_SECONDS = float(os.environ.get('INITIAL_BACKOFF_SECONDS', '1'))
|
|
||||||
MAX_BACKOFF_SECONDS = float(os.environ.get('MAX_BACKOFF_SECONDS', '60'))
|
|
||||||
BACKOFF_FACTOR = float(os.environ.get('BACKOFF_FACTOR', '2'))
|
|
||||||
RATE_LIMIT = float(os.environ.get('RATE_LIMIT', '2')) # Requests per second
|
|
||||||
|
|
||||||
|
|
||||||
class CommonRoomSyncError(Exception):
|
|
||||||
"""Base exception for Common Room sync errors."""
|
|
||||||
|
|
||||||
|
|
||||||
class DatabaseError(CommonRoomSyncError):
|
|
||||||
"""Exception for database errors."""
|
|
||||||
|
|
||||||
|
|
||||||
class CommonRoomAPIError(CommonRoomSyncError):
|
|
||||||
"""Exception for Common Room API errors."""
|
|
||||||
|
|
||||||
|
|
||||||
class KeycloakClientError(CommonRoomSyncError):
|
|
||||||
"""Exception for Keycloak client errors."""
|
|
||||||
|
|
||||||
|
|
||||||
def get_recent_conversations(minutes: int = 60) -> List[Dict[str, Any]]:
|
|
||||||
"""Get conversations created in the past N minutes.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
minutes: Number of minutes to look back for new conversations.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A list of dictionaries, each containing conversation details.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
DatabaseError: If the database query fails.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
# Use a different syntax for the interval that works with pg8000
|
|
||||||
query = text("""
|
|
||||||
SELECT
|
|
||||||
conversation_id, user_id, title, created_at
|
|
||||||
FROM
|
|
||||||
conversation_metadata
|
|
||||||
WHERE
|
|
||||||
created_at >= NOW() - (INTERVAL '1 minute' * :minutes)
|
|
||||||
ORDER BY
|
|
||||||
created_at DESC
|
|
||||||
""")
|
|
||||||
|
|
||||||
with get_engine().connect() as connection:
|
|
||||||
result = connection.execute(query, {'minutes': minutes})
|
|
||||||
conversations = [
|
|
||||||
{
|
|
||||||
'conversation_id': row[0],
|
|
||||||
'user_id': row[1],
|
|
||||||
'title': row[2],
|
|
||||||
'created_at': row[3].isoformat() if row[3] else None,
|
|
||||||
}
|
|
||||||
for row in result
|
|
||||||
]
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
f'Retrieved {len(conversations)} conversations created in the past {minutes} minutes'
|
|
||||||
)
|
|
||||||
return conversations
|
|
||||||
except Exception as e:
|
|
||||||
logger.exception(f'Error querying recent conversations: {e}')
|
|
||||||
raise DatabaseError(f'Failed to query recent conversations: {e}')
|
|
||||||
|
|
||||||
|
|
||||||
async def get_users_from_keycloak(user_ids: Set[str]) -> Dict[str, Dict[str, Any]]:
|
|
||||||
"""Get user information from Keycloak for a set of user IDs.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
user_ids: A set of user IDs to look up.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A dictionary mapping user IDs to user information dictionaries.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
KeycloakClientError: If the Keycloak API call fails.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
# Get Keycloak admin client
|
|
||||||
keycloak_admin = get_keycloak_admin()
|
|
||||||
|
|
||||||
# Create a dictionary to store user information
|
|
||||||
user_info_dict = {}
|
|
||||||
|
|
||||||
# Convert set to list for easier batching
|
|
||||||
user_id_list = list(user_ids)
|
|
||||||
|
|
||||||
# Process user IDs in batches
|
|
||||||
for i in range(0, len(user_id_list), KEYCLOAK_BATCH_SIZE):
|
|
||||||
batch = user_id_list[i : i + KEYCLOAK_BATCH_SIZE]
|
|
||||||
batch_tasks = []
|
|
||||||
|
|
||||||
# Create tasks for each user ID in the batch
|
|
||||||
for user_id in batch:
|
|
||||||
# Use the Keycloak admin client to get user by ID
|
|
||||||
batch_tasks.append(get_user_by_id(keycloak_admin, user_id))
|
|
||||||
|
|
||||||
# Run the batch of tasks concurrently
|
|
||||||
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
|
|
||||||
|
|
||||||
# Process the results
|
|
||||||
for user_id, result in zip(batch, batch_results):
|
|
||||||
if isinstance(result, Exception):
|
|
||||||
logger.warning(f'Error getting user {user_id}: {result}')
|
|
||||||
continue
|
|
||||||
|
|
||||||
if result and isinstance(result, dict):
|
|
||||||
user_info_dict[user_id] = {
|
|
||||||
'username': result.get('username'),
|
|
||||||
'email': result.get('email'),
|
|
||||||
'id': result.get('id'),
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
f'Retrieved information for {len(user_info_dict)} users from Keycloak'
|
|
||||||
)
|
|
||||||
return user_info_dict
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
error_msg = f'Error getting users from Keycloak: {e}'
|
|
||||||
logger.exception(error_msg)
|
|
||||||
raise KeycloakClientError(error_msg)
|
|
||||||
|
|
||||||
|
|
||||||
async def get_user_by_id(keycloak_admin, user_id: str) -> Optional[Dict[str, Any]]:
|
|
||||||
"""Get a user from Keycloak by ID.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
keycloak_admin: The Keycloak admin client.
|
|
||||||
user_id: The user ID to look up.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A dictionary with the user's information, or None if not found.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
# Use the Keycloak admin client to get user by ID
|
|
||||||
user = keycloak_admin.get_user(user_id)
|
|
||||||
if user:
|
|
||||||
logger.debug(
|
|
||||||
f"Found user in Keycloak: {user.get('username')}, {user.get('email')}"
|
|
||||||
)
|
|
||||||
return user
|
|
||||||
else:
|
|
||||||
logger.warning(f'User {user_id} not found in Keycloak')
|
|
||||||
return None
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f'Error getting user {user_id} from Keycloak: {e}')
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def get_user_info(
|
|
||||||
user_id: str, user_info_cache: Dict[str, Dict[str, Any]]
|
|
||||||
) -> Optional[Dict[str, str]]:
|
|
||||||
"""Get the email address and GitHub username for a user from the cache.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
user_id: The user ID to look up.
|
|
||||||
user_info_cache: A dictionary mapping user IDs to user information.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A dictionary with the user's email and username, or None if not found.
|
|
||||||
"""
|
|
||||||
# Check if the user is in the cache
|
|
||||||
if user_id in user_info_cache:
|
|
||||||
user_info = user_info_cache[user_id]
|
|
||||||
logger.debug(
|
|
||||||
f"Found user info in cache: {user_info.get('username')}, {user_info.get('email')}"
|
|
||||||
)
|
|
||||||
return user_info
|
|
||||||
else:
|
|
||||||
logger.warning(f'User {user_id} not found in user info cache')
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def register_user_in_common_room(
|
|
||||||
user_id: str, email: str, github_username: str
|
|
||||||
) -> Dict[str, Any]:
|
|
||||||
"""Create or update a user in Common Room.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
user_id: The user ID.
|
|
||||||
email: The user's email address.
|
|
||||||
github_username: The user's GitHub username.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The API response from Common Room.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
CommonRoomAPIError: If the Common Room API request fails.
|
|
||||||
"""
|
|
||||||
if not COMMON_ROOM_API_KEY:
|
|
||||||
raise CommonRoomAPIError('COMMON_ROOM_API_KEY environment variable not set')
|
|
||||||
|
|
||||||
if not COMMON_ROOM_DESTINATION_SOURCE_ID:
|
|
||||||
raise CommonRoomAPIError(
|
|
||||||
'COMMON_ROOM_DESTINATION_SOURCE_ID environment variable not set'
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
headers = {
|
|
||||||
'Authorization': f'Bearer {COMMON_ROOM_API_KEY}',
|
|
||||||
'Content-Type': 'application/json',
|
|
||||||
}
|
|
||||||
|
|
||||||
# Create or update user in Common Room
|
|
||||||
user_data = {
|
|
||||||
'id': user_id,
|
|
||||||
'email': email,
|
|
||||||
'username': github_username,
|
|
||||||
'github': {'type': 'handle', 'value': github_username},
|
|
||||||
}
|
|
||||||
|
|
||||||
user_url = f'{COMMON_ROOM_API_BASE_URL}/source/{COMMON_ROOM_DESTINATION_SOURCE_ID}/user'
|
|
||||||
user_response = requests.post(user_url, headers=headers, json=user_data)
|
|
||||||
|
|
||||||
if user_response.status_code not in (200, 202):
|
|
||||||
logger.error(
|
|
||||||
f'Failed to create/update user in Common Room: {user_response.text}'
|
|
||||||
)
|
|
||||||
logger.error(f'Response status code: {user_response.status_code}')
|
|
||||||
raise CommonRoomAPIError(
|
|
||||||
f'Failed to create/update user: {user_response.text}'
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
f'Registered/updated user {user_id} (GitHub: {github_username}) in Common Room'
|
|
||||||
)
|
|
||||||
return user_response.json()
|
|
||||||
except requests.RequestException as e:
|
|
||||||
logger.exception(f'Error communicating with Common Room API: {e}')
|
|
||||||
raise CommonRoomAPIError(f'Failed to communicate with Common Room API: {e}')
|
|
||||||
|
|
||||||
|
|
||||||
def register_conversation_activity(
|
|
||||||
user_id: str,
|
|
||||||
conversation_id: str,
|
|
||||||
conversation_title: str,
|
|
||||||
created_at: datetime,
|
|
||||||
email: str,
|
|
||||||
github_username: str,
|
|
||||||
) -> Dict[str, Any]:
|
|
||||||
"""Create an activity in Common Room for a new conversation.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
user_id: The user ID who created the conversation.
|
|
||||||
conversation_id: The ID of the conversation.
|
|
||||||
conversation_title: The title of the conversation.
|
|
||||||
created_at: The datetime object when the conversation was created.
|
|
||||||
email: The user's email address.
|
|
||||||
github_username: The user's GitHub username.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The API response from Common Room.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
CommonRoomAPIError: If the Common Room API request fails.
|
|
||||||
"""
|
|
||||||
if not COMMON_ROOM_API_KEY:
|
|
||||||
raise CommonRoomAPIError('COMMON_ROOM_API_KEY environment variable not set')
|
|
||||||
|
|
||||||
if not COMMON_ROOM_DESTINATION_SOURCE_ID:
|
|
||||||
raise CommonRoomAPIError(
|
|
||||||
'COMMON_ROOM_DESTINATION_SOURCE_ID environment variable not set'
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
headers = {
|
|
||||||
'Authorization': f'Bearer {COMMON_ROOM_API_KEY}',
|
|
||||||
'Content-Type': 'application/json',
|
|
||||||
}
|
|
||||||
|
|
||||||
# Format the datetime object to the expected ISO format
|
|
||||||
formatted_timestamp = (
|
|
||||||
created_at.strftime('%Y-%m-%dT%H:%M:%SZ')
|
|
||||||
if created_at
|
|
||||||
else time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
|
|
||||||
)
|
|
||||||
|
|
||||||
# Create activity for the conversation
|
|
||||||
activity_data = {
|
|
||||||
'id': f'conversation_{conversation_id}', # Use conversation ID to ensure uniqueness
|
|
||||||
'activityType': 'started_session',
|
|
||||||
'user': {
|
|
||||||
'id': user_id,
|
|
||||||
'email': email,
|
|
||||||
'github': {'type': 'handle', 'value': github_username},
|
|
||||||
'username': github_username,
|
|
||||||
},
|
|
||||||
'activityTitle': {
|
|
||||||
'type': 'text',
|
|
||||||
'value': conversation_title or 'New Conversation',
|
|
||||||
},
|
|
||||||
'content': {
|
|
||||||
'type': 'text',
|
|
||||||
'value': f'Started a new conversation: {conversation_title or "Untitled"}',
|
|
||||||
},
|
|
||||||
'timestamp': formatted_timestamp,
|
|
||||||
'url': f'https://app.all-hands.dev/conversations/{conversation_id}',
|
|
||||||
}
|
|
||||||
|
|
||||||
# Log the activity data for debugging
|
|
||||||
logger.info(f'Activity data payload: {activity_data}')
|
|
||||||
|
|
||||||
activity_url = f'{COMMON_ROOM_API_BASE_URL}/source/{COMMON_ROOM_DESTINATION_SOURCE_ID}/activity'
|
|
||||||
activity_response = requests.post(
|
|
||||||
activity_url, headers=headers, json=activity_data
|
|
||||||
)
|
|
||||||
|
|
||||||
if activity_response.status_code not in (200, 202):
|
|
||||||
logger.error(
|
|
||||||
f'Failed to create activity in Common Room: {activity_response.text}'
|
|
||||||
)
|
|
||||||
logger.error(f'Response status code: {activity_response.status_code}')
|
|
||||||
raise CommonRoomAPIError(
|
|
||||||
f'Failed to create activity: {activity_response.text}'
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
f'Registered conversation activity for user {user_id}, conversation {conversation_id}'
|
|
||||||
)
|
|
||||||
return activity_response.json()
|
|
||||||
except requests.RequestException as e:
|
|
||||||
logger.exception(f'Error communicating with Common Room API: {e}')
|
|
||||||
raise CommonRoomAPIError(f'Failed to communicate with Common Room API: {e}')
|
|
||||||
|
|
||||||
|
|
||||||
def retry_with_backoff(func, *args, **kwargs):
|
|
||||||
"""Retry a function with exponential backoff.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
func: The function to retry.
|
|
||||||
*args: Positional arguments to pass to the function.
|
|
||||||
**kwargs: Keyword arguments to pass to the function.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The result of the function call.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
The last exception raised by the function.
|
|
||||||
"""
|
|
||||||
backoff = INITIAL_BACKOFF_SECONDS
|
|
||||||
last_exception = None
|
|
||||||
|
|
||||||
for attempt in range(MAX_RETRIES):
|
|
||||||
try:
|
|
||||||
return func(*args, **kwargs)
|
|
||||||
except Exception as e:
|
|
||||||
last_exception = e
|
|
||||||
logger.warning(f'Attempt {attempt + 1}/{MAX_RETRIES} failed: {e}')
|
|
||||||
|
|
||||||
if attempt < MAX_RETRIES - 1:
|
|
||||||
sleep_time = min(backoff, MAX_BACKOFF_SECONDS)
|
|
||||||
logger.info(f'Retrying in {sleep_time:.2f} seconds...')
|
|
||||||
time.sleep(sleep_time)
|
|
||||||
backoff *= BACKOFF_FACTOR
|
|
||||||
else:
|
|
||||||
logger.exception(f'All {MAX_RETRIES} attempts failed')
|
|
||||||
raise last_exception
|
|
||||||
|
|
||||||
|
|
||||||
async def retry_with_backoff_async(func, *args, **kwargs):
|
|
||||||
"""Retry an async function with exponential backoff.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
func: The async function to retry.
|
|
||||||
*args: Positional arguments to pass to the function.
|
|
||||||
**kwargs: Keyword arguments to pass to the function.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The result of the function call.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
The last exception raised by the function.
|
|
||||||
"""
|
|
||||||
backoff = INITIAL_BACKOFF_SECONDS
|
|
||||||
last_exception = None
|
|
||||||
|
|
||||||
for attempt in range(MAX_RETRIES):
|
|
||||||
try:
|
|
||||||
return await func(*args, **kwargs)
|
|
||||||
except Exception as e:
|
|
||||||
last_exception = e
|
|
||||||
logger.warning(f'Attempt {attempt + 1}/{MAX_RETRIES} failed: {e}')
|
|
||||||
|
|
||||||
if attempt < MAX_RETRIES - 1:
|
|
||||||
sleep_time = min(backoff, MAX_BACKOFF_SECONDS)
|
|
||||||
logger.info(f'Retrying in {sleep_time:.2f} seconds...')
|
|
||||||
await asyncio.sleep(sleep_time)
|
|
||||||
backoff *= BACKOFF_FACTOR
|
|
||||||
else:
|
|
||||||
logger.exception(f'All {MAX_RETRIES} attempts failed')
|
|
||||||
raise last_exception
|
|
||||||
|
|
||||||
|
|
||||||
async def async_sync_recent_conversations_to_common_room(minutes: int = 60):
|
|
||||||
"""Async main function to sync recent conversations to Common Room.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
minutes: Number of minutes to look back for new conversations.
|
|
||||||
"""
|
|
||||||
logger.info(
|
|
||||||
f'Starting Common Room recent conversations sync (past {minutes} minutes)'
|
|
||||||
)
|
|
||||||
|
|
||||||
stats = {
|
|
||||||
'total_conversations': 0,
|
|
||||||
'registered_users': 0,
|
|
||||||
'registered_activities': 0,
|
|
||||||
'errors': 0,
|
|
||||||
'missing_user_info': 0,
|
|
||||||
}
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Get conversations created in the past N minutes
|
|
||||||
recent_conversations = retry_with_backoff(get_recent_conversations, minutes)
|
|
||||||
stats['total_conversations'] = len(recent_conversations)
|
|
||||||
|
|
||||||
logger.info(f'Processing {len(recent_conversations)} recent conversations')
|
|
||||||
|
|
||||||
if not recent_conversations:
|
|
||||||
logger.info('No recent conversations found, exiting')
|
|
||||||
return
|
|
||||||
|
|
||||||
# Extract all unique user IDs
|
|
||||||
user_ids = {conv['user_id'] for conv in recent_conversations if conv['user_id']}
|
|
||||||
|
|
||||||
# Get user information for all users in batches
|
|
||||||
user_info_cache = await retry_with_backoff_async(
|
|
||||||
get_users_from_keycloak, user_ids
|
|
||||||
)
|
|
||||||
|
|
||||||
# Track registered users to avoid duplicate registrations
|
|
||||||
registered_users = set()
|
|
||||||
|
|
||||||
# Process each conversation
|
|
||||||
for conversation in recent_conversations:
|
|
||||||
conversation_id = conversation['conversation_id']
|
|
||||||
user_id = conversation['user_id']
|
|
||||||
title = conversation['title']
|
|
||||||
created_at = conversation[
|
|
||||||
'created_at'
|
|
||||||
] # This might be a string or datetime object
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Get user info from cache
|
|
||||||
user_info = get_user_info(user_id, user_info_cache)
|
|
||||||
if not user_info:
|
|
||||||
logger.warning(
|
|
||||||
f'Could not find user info for user {user_id}, skipping conversation {conversation_id}'
|
|
||||||
)
|
|
||||||
stats['missing_user_info'] += 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
email = user_info['email']
|
|
||||||
github_username = user_info['username']
|
|
||||||
|
|
||||||
if not email:
|
|
||||||
logger.warning(
|
|
||||||
f'User {user_id} has no email, skipping conversation {conversation_id}'
|
|
||||||
)
|
|
||||||
stats['errors'] += 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Register user in Common Room if not already registered in this run
|
|
||||||
if user_id not in registered_users:
|
|
||||||
register_user_in_common_room(user_id, email, github_username)
|
|
||||||
registered_users.add(user_id)
|
|
||||||
stats['registered_users'] += 1
|
|
||||||
|
|
||||||
# If created_at is a string, parse it to a datetime object
|
|
||||||
# If it's already a datetime object, use it as is
|
|
||||||
# If it's None, use current time
|
|
||||||
created_at_datetime = (
|
|
||||||
created_at
|
|
||||||
if isinstance(created_at, datetime)
|
|
||||||
else datetime.fromisoformat(created_at.replace('Z', '+00:00'))
|
|
||||||
if created_at
|
|
||||||
else datetime.now(UTC)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Register conversation activity with email and github username
|
|
||||||
register_conversation_activity(
|
|
||||||
user_id,
|
|
||||||
conversation_id,
|
|
||||||
title,
|
|
||||||
created_at_datetime,
|
|
||||||
email,
|
|
||||||
github_username,
|
|
||||||
)
|
|
||||||
stats['registered_activities'] += 1
|
|
||||||
|
|
||||||
# Sleep to respect rate limit
|
|
||||||
await asyncio.sleep(1 / RATE_LIMIT)
|
|
||||||
except Exception as e:
|
|
||||||
logger.exception(
|
|
||||||
f'Error processing conversation {conversation_id} for user {user_id}: {e}'
|
|
||||||
)
|
|
||||||
stats['errors'] += 1
|
|
||||||
except Exception as e:
|
|
||||||
logger.exception(f'Sync failed: {e}')
|
|
||||||
raise
|
|
||||||
finally:
|
|
||||||
logger.info(f'Sync completed. Stats: {stats}')
|
|
||||||
|
|
||||||
|
|
||||||
def sync_recent_conversations_to_common_room(minutes: int = 60):
|
|
||||||
"""Main function to sync recent conversations to Common Room.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
minutes: Number of minutes to look back for new conversations.
|
|
||||||
"""
|
|
||||||
# Run the async function in the event loop
|
|
||||||
asyncio.run(async_sync_recent_conversations_to_common_room(minutes))
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
# Default to looking back 60 minutes for new conversations
|
|
||||||
minutes = int(os.environ.get('SYNC_MINUTES', '60'))
|
|
||||||
sync_recent_conversations_to_common_room(minutes)
|
|
||||||
@@ -1,51 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
Test script for Common Room conversation count sync.
|
|
||||||
|
|
||||||
This script tests the functionality of the Common Room sync script
|
|
||||||
without making any API calls to Common Room or database connections.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
import unittest
|
|
||||||
from unittest.mock import MagicMock, patch
|
|
||||||
|
|
||||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
||||||
from sync.common_room_sync import (
|
|
||||||
retry_with_backoff,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class TestCommonRoomSync(unittest.TestCase):
|
|
||||||
"""Test cases for Common Room sync functionality."""
|
|
||||||
|
|
||||||
def test_retry_with_backoff(self):
|
|
||||||
"""Test the retry_with_backoff function."""
|
|
||||||
# Mock function that succeeds on the second attempt
|
|
||||||
mock_func = MagicMock(
|
|
||||||
side_effect=[Exception('First attempt failed'), 'success']
|
|
||||||
)
|
|
||||||
|
|
||||||
# Set environment variables for testing
|
|
||||||
with patch.dict(
|
|
||||||
os.environ,
|
|
||||||
{
|
|
||||||
'MAX_RETRIES': '3',
|
|
||||||
'INITIAL_BACKOFF_SECONDS': '0.01',
|
|
||||||
'BACKOFF_FACTOR': '2',
|
|
||||||
'MAX_BACKOFF_SECONDS': '1',
|
|
||||||
},
|
|
||||||
):
|
|
||||||
result = retry_with_backoff(mock_func, 'arg1', 'arg2', kwarg1='kwarg1')
|
|
||||||
|
|
||||||
# Check that the function was called twice
|
|
||||||
self.assertEqual(mock_func.call_count, 2)
|
|
||||||
# Check that the function was called with the correct arguments
|
|
||||||
mock_func.assert_called_with('arg1', 'arg2', kwarg1='kwarg1')
|
|
||||||
# Check that the function returned the expected result
|
|
||||||
self.assertEqual(result, 'success')
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
unittest.main()
|
|
||||||
@@ -1,83 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
"""Test script to verify the conversation count query.
|
|
||||||
|
|
||||||
This script tests the database query to count conversations by user,
|
|
||||||
without making any API calls to Common Room.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
|
|
||||||
from sqlalchemy import text
|
|
||||||
|
|
||||||
# Add the parent directory to the path so we can import from storage
|
|
||||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
||||||
|
|
||||||
from storage.database import get_engine
|
|
||||||
|
|
||||||
|
|
||||||
def test_conversation_count_query():
|
|
||||||
"""Test the query to count conversations by user."""
|
|
||||||
try:
|
|
||||||
# Query to count conversations by user
|
|
||||||
count_query = text("""
|
|
||||||
SELECT
|
|
||||||
user_id, COUNT(*) as conversation_count
|
|
||||||
FROM
|
|
||||||
conversation_metadata
|
|
||||||
GROUP BY
|
|
||||||
user_id
|
|
||||||
""")
|
|
||||||
|
|
||||||
engine = get_engine()
|
|
||||||
|
|
||||||
with engine.connect() as connection:
|
|
||||||
count_result = connection.execute(count_query)
|
|
||||||
user_counts = [
|
|
||||||
{'user_id': row[0], 'conversation_count': row[1]}
|
|
||||||
for row in count_result
|
|
||||||
]
|
|
||||||
|
|
||||||
print(f'Found {len(user_counts)} users with conversations')
|
|
||||||
|
|
||||||
# Print the first 5 results
|
|
||||||
for i, user_data in enumerate(user_counts[:5]):
|
|
||||||
print(
|
|
||||||
f"User {i+1}: {user_data['user_id']} - {user_data['conversation_count']} conversations"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Test the user_entity query for the first user (if any)
|
|
||||||
if user_counts:
|
|
||||||
first_user_id = user_counts[0]['user_id']
|
|
||||||
|
|
||||||
user_query = text("""
|
|
||||||
SELECT username, email, id
|
|
||||||
FROM user_entity
|
|
||||||
WHERE id = :user_id
|
|
||||||
""")
|
|
||||||
|
|
||||||
with engine.connect() as connection:
|
|
||||||
user_result = connection.execute(user_query, {'user_id': first_user_id})
|
|
||||||
user_row = user_result.fetchone()
|
|
||||||
|
|
||||||
if user_row:
|
|
||||||
print(f'\nUser details for {first_user_id}:')
|
|
||||||
print(f' GitHub Username: {user_row[0]}')
|
|
||||||
print(f' Email: {user_row[1]}')
|
|
||||||
print(f' ID: {user_row[2]}')
|
|
||||||
else:
|
|
||||||
print(
|
|
||||||
f'\nNo user details found for {first_user_id} in user_entity table'
|
|
||||||
)
|
|
||||||
|
|
||||||
print('\nTest completed successfully')
|
|
||||||
except Exception as e:
|
|
||||||
print(f'Error: {str(e)}')
|
|
||||||
import traceback
|
|
||||||
|
|
||||||
traceback.print_exc()
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
test_conversation_count_query()
|
|
||||||
Reference in New Issue
Block a user