mirror of
https://github.com/camel-ai/owl.git
synced 2026-03-22 05:57:17 +08:00
275 lines
11 KiB
Python
275 lines
11 KiB
Python
import os
|
|
import logging
|
|
import time
|
|
import functools
|
|
import inspect
|
|
import re
|
|
from typing import Dict, Any, List, Tuple, Callable, Optional
|
|
import queue
|
|
|
|
# Create a singleton log queue that can be shared between modules
|
|
class LogQueueSingleton:
|
|
_instance = None
|
|
|
|
@classmethod
|
|
def get_instance(cls):
|
|
if cls._instance is None:
|
|
cls._instance = queue.Queue()
|
|
return cls._instance
|
|
|
|
# Custom logging wrapper for tools
|
|
def log_tool_usage(func):
|
|
"""
|
|
Decorator to log when a tool is being used.
|
|
"""
|
|
@functools.wraps(func)
|
|
async def wrapper(*args, **kwargs):
|
|
tool_name = func.__name__
|
|
logging.info(f"🔧 TOOL TRIGGERED: {tool_name}")
|
|
try:
|
|
# Sanitize arguments to avoid logging sensitive info
|
|
safe_args = sanitize_args(args)
|
|
safe_kwargs = {k: sanitize_value(v) for k, v in kwargs.items()}
|
|
logging.info(f"🔍 TOOL ARGS: {tool_name} called with {len(safe_kwargs)} parameters")
|
|
|
|
result = await func(*args, **kwargs)
|
|
|
|
# Log completion but not the actual result content (might be large or sensitive)
|
|
logging.info(f"✅ TOOL COMPLETED: {tool_name}")
|
|
return result
|
|
except Exception as e:
|
|
logging.error(f"❌ TOOL ERROR: {tool_name} - {str(e)}")
|
|
raise
|
|
return wrapper
|
|
|
|
# Non-async version for synchronous functions
|
|
def log_tool_usage_sync(func):
|
|
"""
|
|
Decorator to log when a synchronous tool is being used.
|
|
"""
|
|
@functools.wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
tool_name = func.__name__
|
|
logging.info(f"🔧 TOOL TRIGGERED: {tool_name}")
|
|
try:
|
|
# Sanitize arguments to avoid logging sensitive info
|
|
safe_args = sanitize_args(args)
|
|
safe_kwargs = {k: sanitize_value(v) for k, v in kwargs.items()}
|
|
logging.info(f"🔍 TOOL ARGS: {tool_name} called with {len(safe_kwargs)} parameters")
|
|
|
|
result = func(*args, **kwargs)
|
|
|
|
# Log completion but not the actual result content (might be large or sensitive)
|
|
logging.info(f"✅ TOOL COMPLETED: {tool_name}")
|
|
return result
|
|
except Exception as e:
|
|
logging.error(f"❌ TOOL ERROR: {tool_name} - {str(e)}")
|
|
raise
|
|
return wrapper
|
|
|
|
def sanitize_args(args):
|
|
"""Sanitize arguments for logging to avoid sensitive data."""
|
|
safe_args = []
|
|
for arg in args:
|
|
safe_args.append(sanitize_value(arg))
|
|
return safe_args
|
|
|
|
def sanitize_value(value):
|
|
"""Sanitize a value for logging."""
|
|
if isinstance(value, str):
|
|
if len(value) > 50:
|
|
return value[:47] + "..."
|
|
return value
|
|
elif isinstance(value, (list, tuple)):
|
|
return f"{type(value).__name__} with {len(value)} items"
|
|
elif isinstance(value, dict):
|
|
return f"dict with {len(value)} items"
|
|
else:
|
|
return f"{type(value).__name__}"
|
|
|
|
class LoggingToolkitWrapper:
|
|
"""
|
|
Wrapper class to add logging to toolkit methods.
|
|
"""
|
|
def __init__(self, toolkit):
|
|
self.toolkit = toolkit
|
|
self.toolkit_name = toolkit.__class__.__name__
|
|
logging.info(f"📦 TOOLKIT INITIALIZED: {self.toolkit_name}")
|
|
|
|
def __getattr__(self, name):
|
|
attr = getattr(self.toolkit, name)
|
|
|
|
if callable(attr) and not name.startswith('_'):
|
|
if inspect.iscoroutinefunction(attr):
|
|
# It's an async function, wrap it with our async decorator
|
|
return log_tool_usage(attr)
|
|
else:
|
|
# For non-async functions
|
|
@functools.wraps(attr)
|
|
def wrapper(*args, **kwargs):
|
|
logging.info(f"🔧 TOOL TRIGGERED: {self.toolkit_name}.{name}")
|
|
try:
|
|
# Sanitize arguments to avoid logging sensitive info
|
|
safe_args = sanitize_args(args)
|
|
safe_kwargs = {k: sanitize_value(v) for k, v in kwargs.items()}
|
|
logging.info(f"🔍 TOOL ARGS: {name} called with {len(safe_kwargs)} parameters")
|
|
|
|
result = attr(*args, **kwargs)
|
|
|
|
logging.info(f"✅ TOOL COMPLETED: {self.toolkit_name}.{name}")
|
|
return result
|
|
except Exception as e:
|
|
logging.error(f"❌ TOOL ERROR: {self.toolkit_name}.{name} - {str(e)}")
|
|
raise
|
|
return wrapper
|
|
|
|
return attr
|
|
|
|
def wrap_toolkits(toolkits_list):
|
|
"""
|
|
Wrap a list of toolkits with logging functionality.
|
|
"""
|
|
wrapped_toolkits = []
|
|
for toolkit in toolkits_list:
|
|
wrapped_toolkits.append(LoggingToolkitWrapper(toolkit))
|
|
return wrapped_toolkits
|
|
# Find this function in logging_utils.py and replace it with this corrected version
|
|
|
|
# Enhanced run_society function with logging
|
|
def enhanced_run_society(society, verbose=True):
|
|
"""
|
|
Enhanced wrapper around the OWL run_society function with detailed logging.
|
|
"""
|
|
from owl.utils import run_society as original_run_society
|
|
|
|
# Log the society setup
|
|
user_role = getattr(society, 'user_role_name', 'User')
|
|
assistant_role = getattr(society, 'assistant_role_name', 'Assistant')
|
|
|
|
logging.info(f"🚀 STARTING AGENT SOCIETY: {user_role} & {assistant_role}")
|
|
logging.info(f"📝 TASK: {society.task_prompt[:100]}...")
|
|
|
|
# Log agent initialization
|
|
logging.info(f"🤖 INITIALIZING AGENT: {assistant_role}")
|
|
|
|
# Add hooks to log message exchanges if possible
|
|
original_send_message = None
|
|
if hasattr(society, 'assistant_agent') and hasattr(society.assistant_agent, 'send_message'):
|
|
original_send_message = society.assistant_agent.send_message
|
|
|
|
@functools.wraps(original_send_message)
|
|
def logged_send_message(*args, **kwargs):
|
|
logging.info(f"💬 AGENT MESSAGE: {assistant_role} is processing...")
|
|
result = original_send_message(*args, **kwargs)
|
|
logging.info(f"📨 AGENT RESPONSE RECEIVED from {assistant_role}")
|
|
return result
|
|
|
|
society.assistant_agent.send_message = logged_send_message
|
|
|
|
# Try to log tool usage if possible
|
|
if hasattr(society, 'assistant_agent') and hasattr(society.assistant_agent, 'tools'):
|
|
tools = getattr(society.assistant_agent, 'tools', [])
|
|
logging.info(f"🧰 AGENT HAS {len(tools)} TOOLS AVAILABLE")
|
|
|
|
# Attempt to wrap each tool with logging
|
|
for i, tool in enumerate(tools):
|
|
if callable(tool):
|
|
tool_name = getattr(tool, '__name__', f"tool_{i}")
|
|
logging.info(f"🔧 TOOL AVAILABLE: {tool_name}")
|
|
|
|
# Run the original function
|
|
start_time = time.time()
|
|
try:
|
|
logging.info(f"⏳ RUNNING SOCIETY...")
|
|
# Remove the verbose parameter from the call to original_run_society
|
|
answer, chat_history, token_count = original_run_society(society)
|
|
end_time = time.time()
|
|
duration = end_time - start_time
|
|
|
|
# Log prompt and completion tokens separately if available
|
|
if isinstance(token_count, dict):
|
|
prompt_tokens = token_count.get('prompt_token_count', 0)
|
|
completion_tokens = token_count.get('completion_token_count', 0)
|
|
logging.info(f"💰 TOKEN USAGE: Prompt={prompt_tokens}, Completion={completion_tokens}, Total={prompt_tokens + completion_tokens}")
|
|
else:
|
|
logging.info(f"💰 TOKEN USAGE: {token_count}")
|
|
|
|
logging.info(f"✅ AGENT SOCIETY COMPLETED: Duration {duration:.2f}s")
|
|
|
|
return answer, chat_history, token_count
|
|
except Exception as e:
|
|
logging.error(f"❌ AGENT SOCIETY ERROR: {str(e)}")
|
|
raise
|
|
finally:
|
|
# Restore original method if we modified it
|
|
if original_send_message and hasattr(society, 'assistant_agent'):
|
|
society.assistant_agent.send_message = original_send_message
|
|
|
|
|
|
|
|
# Function to sanitize logs to avoid exposing sensitive information
|
|
def sanitize_log(log_message):
|
|
"""
|
|
Sanitize log messages to avoid exposing sensitive information like IPs.
|
|
"""
|
|
# Simple IP address pattern matching
|
|
ip_pattern = r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b'
|
|
sanitized = re.sub(ip_pattern, '[REDACTED_IP]', log_message)
|
|
|
|
# Redact API keys (common patterns)
|
|
api_key_pattern = r'(api[_-]?key|apikey|key|token)["\']?\s*[:=]\s*["\']?([a-zA-Z0-9]{20,})["\']?'
|
|
sanitized = re.sub(api_key_pattern, r'\1: [REDACTED_API_KEY]', sanitized, flags=re.IGNORECASE)
|
|
|
|
# Redact URLs with authentication information
|
|
url_auth_pattern = r'(https?://)([^:@/]+:[^@/]+@)([^\s/]+)'
|
|
sanitized = re.sub(url_auth_pattern, r'\1[REDACTED_AUTH]@\3', sanitized)
|
|
|
|
return sanitized
|
|
|
|
# Enhanced StreamlitLogHandler that sanitizes logs
|
|
class EnhancedStreamlitLogHandler(logging.Handler):
|
|
def __init__(self, log_queue):
|
|
super().__init__()
|
|
self.log_queue = log_queue
|
|
self.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
|
|
|
def emit(self, record):
|
|
log_entry = self.format(record)
|
|
# Sanitize the log to remove sensitive information
|
|
sanitized_log = sanitize_log(log_entry)
|
|
self.log_queue.put(sanitized_log)
|
|
|
|
# Add logging to specific OWL functions if possible
|
|
# Add this updated function to logging_utils.py
|
|
|
|
# Add logging to specific OWL functions if possible
|
|
def patch_owl_logging():
|
|
"""Try to patch specific OWL functions to add logging."""
|
|
try:
|
|
from owl import utils
|
|
|
|
# If run_society exists in utils, patch it to log
|
|
if hasattr(utils, 'run_society'):
|
|
original_run = utils.run_society
|
|
|
|
def logged_run_society(*args, **kwargs):
|
|
logging.info("🦉 OWL run_society called")
|
|
try:
|
|
result = original_run(*args, **kwargs)
|
|
logging.info("🦉 OWL run_society completed")
|
|
return result
|
|
except Exception as e:
|
|
logging.error(f"🦉 OWL run_society error: {str(e)}")
|
|
raise
|
|
|
|
# Replace the original function
|
|
utils.run_society = logged_run_society
|
|
logging.info("🦉 OWL run_society patched with logging")
|
|
|
|
return True
|
|
except ImportError:
|
|
logging.warning("⚠️ Could not patch OWL logging - module not found")
|
|
return False
|
|
except Exception as e:
|
|
logging.warning(f"⚠️ Error patching OWL logging: {str(e)}")
|
|
return False |