From 10b871f4ab51321243d145a78379ecf492fbbaca Mon Sep 17 00:00:00 2001 From: Eliot Jones Date: Mon, 15 Sep 2025 09:57:03 -0400 Subject: [PATCH] feat: Add Cygnal integration (#10898) --- openhands/runtime/base.py | 1 + openhands/security/README.md | 28 ++++ openhands/security/analyzer.py | 8 + openhands/security/grayswan/__init__.py | 3 + openhands/security/grayswan/analyzer.py | 204 ++++++++++++++++++++++++ openhands/security/grayswan/utils.py | 145 +++++++++++++++++ openhands/security/options.py | 2 + 7 files changed, 391 insertions(+) create mode 100644 openhands/security/grayswan/__init__.py create mode 100644 openhands/security/grayswan/analyzer.py create mode 100644 openhands/security/grayswan/utils.py diff --git a/openhands/runtime/base.py b/openhands/runtime/base.py index 3e5018aa39..48ff2882dd 100644 --- a/openhands/runtime/base.py +++ b/openhands/runtime/base.py @@ -199,6 +199,7 @@ class Runtime(FileEditRuntimeMixin): self.config.security.security_analyzer, SecurityAnalyzer ) self.security_analyzer = analyzer_cls() + self.security_analyzer.set_event_stream(self.event_stream) logger.debug( f'Security analyzer {analyzer_cls.__name__} initialized for runtime {self.sid}' ) diff --git a/openhands/security/README.md b/openhands/security/README.md index 3465d75e29..684c535c9a 100644 --- a/openhands/security/README.md +++ b/openhands/security/README.md @@ -99,3 +99,31 @@ Browsing Agent Safety: * If the guardrail evaluates either of the 2 conditions to be true, it emits a change_agent_state action and transforms the AgentState to ERROR. This stops the agent from proceeding further. * To enable this feature: In the InvariantAnalyzer object, set the check_browsing_alignment attribute to True and initialize the guardrail_llm attribute with an LLM object. + +### Gray Swan + +The Gray Swan Security Analyzer integrates with [Gray Swan AI's Cygnal API](https://docs.grayswan.ai/monitor-requests/monitor) to provide advanced AI safety monitoring for OpenHands agents. + +#### Getting Started +To get started with the Gray Swan security analyzer (powered by Cygnal): + +1. Navigate to [the Gray Swan platform](https://platform.grayswan.ai) and create an account if you don't already have one +2. Create a Gray Swan API key. +3. If you just want to use Cygnal's default protections, you can move to the next section. +4. If you want **even more** custom protection, you can create your own policy [here](https://platform.grayswan.ai/policies). Policies are composed of rules, which require a short title, e.g. "Git Operations", and then the rule itself, e.g. "The agent should never push code directly to the main branch". + +#### OpenHands Configuration: + +To use the GraySwan analyzer, set the following environment variables: + +* `GRAYSWAN_API_KEY`: Your GraySwan API key (required) +* `GRAYSWAN_POLICY_ID`: Your GraySwan policy ID (optional) + +Then configure OpenHands to use the GraySwan analyzer: + +```toml +[security] +security_analyzer = "grayswan" +``` + +or select "grayswan" from the dropdown in settings! diff --git a/openhands/security/analyzer.py b/openhands/security/analyzer.py index 8650b5463f..cb79f799ce 100644 --- a/openhands/security/analyzer.py +++ b/openhands/security/analyzer.py @@ -24,6 +24,14 @@ class SecurityAnalyzer: 'Need to implement security_risk method in SecurityAnalyzer subclass' ) + def set_event_stream(self, event_stream) -> None: + """Set the event stream for accessing conversation history. + + Args: + event_stream: EventStream instance for accessing events + """ + pass + async def close(self) -> None: """Cleanup resources allocated by the SecurityAnalyzer.""" pass diff --git a/openhands/security/grayswan/__init__.py b/openhands/security/grayswan/__init__.py new file mode 100644 index 0000000000..df4e26e613 --- /dev/null +++ b/openhands/security/grayswan/__init__.py @@ -0,0 +1,3 @@ +from openhands.security.grayswan.analyzer import GraySwanAnalyzer + +__all__ = ['GraySwanAnalyzer'] diff --git a/openhands/security/grayswan/analyzer.py b/openhands/security/grayswan/analyzer.py new file mode 100644 index 0000000000..cdc3b8b7fd --- /dev/null +++ b/openhands/security/grayswan/analyzer.py @@ -0,0 +1,204 @@ +"""GraySwan security analyzer for OpenHands.""" + +import asyncio +import os +from typing import Any + +import aiohttp +from fastapi import Request + +from openhands.core.logger import openhands_logger as logger +from openhands.events.action.action import Action, ActionSecurityRisk +from openhands.events.event_store_abc import EventStoreABC +from openhands.memory.view import View +from openhands.security.analyzer import SecurityAnalyzer +from openhands.security.grayswan.utils import convert_events_to_openai_messages + + +class GraySwanAnalyzer(SecurityAnalyzer): + """Security analyzer using GraySwan's Cygnal API for AI safety monitoring.""" + + def __init__( + self, + history_limit: int = 20, + max_message_chars: int = 30000, + timeout: int = 30, + low_threshold: float = 0.3, + medium_threshold: float = 0.7, + high_threshold: float = 1.0, + session: aiohttp.ClientSession | None = None, + ) -> None: + """Initialize GraySwan analyzer. + + Args: + history_limit: Number of recent events to include as context + max_message_chars: Max characters for conversation processing + timeout: Request timeout in seconds + low_threshold: Risk threshold for LOW classification (default: 0.3) + medium_threshold: Risk threshold for MEDIUM classification (default: 0.7) + high_threshold: Risk threshold for HIGH classification (default: 1.0) + session: Optional pre-configured session (mainly for testing) + + Environment Variables: + GRAYSWAN_API_KEY: Required API key for GraySwan authentication + GRAYSWAN_POLICY_ID: Optional policy ID for custom GraySwan policy + """ + super().__init__() + + self.api_key = os.getenv('GRAYSWAN_API_KEY') + if not self.api_key: + logger.error( + 'GRAYSWAN_API_KEY environment variable is required for GraySwanAnalyzer' + ) + raise ValueError('GRAYSWAN_API_KEY environment variable is required') + + self.policy_id = os.getenv('GRAYSWAN_POLICY_ID') + if not self.policy_id: + self.policy_id = ( + '689ca4885af3538a39b2ba04' # GraySwan default coding agent policy + ) + logger.info(f'Using default GraySwan policy ID: {self.policy_id}') + else: + logger.info(f'Using GraySwan policy ID from environment: {self.policy_id}') + + self.event_stream: EventStoreABC | None = None + self.history_limit = history_limit + self.max_message_chars = max_message_chars + self.timeout = timeout + + self.violation_thresholds = { + 'low': low_threshold, + 'medium': medium_threshold, + 'high': high_threshold, + } + + self.api_url = 'https://api.grayswan.ai/cygnal/monitor' + self.session: aiohttp.ClientSession | None = session + + logger.info( + f'GraySwanAnalyzer initialized with history_limit={history_limit}, timeout={timeout}s' + ) + + def set_event_stream(self, event_stream: EventStoreABC) -> None: + """Set the event stream for accessing conversation history.""" + self.event_stream = event_stream + logger.debug('Event stream set for GraySwanAnalyzer') + + async def _get_session(self) -> aiohttp.ClientSession: + """Get or create HTTP session.""" + if self.session is None: + self.session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=self.timeout), + headers={ + 'Authorization': f'Bearer {self.api_key}', + 'x-grayswan-api-key': self.api_key, + 'Content-Type': 'application/json', + }, + ) + elif hasattr(self.session, 'closed') and self.session.closed: + self.session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=self.timeout), + headers={ + 'Authorization': f'Bearer {self.api_key}', + 'x-grayswan-api-key': self.api_key, + 'Content-Type': 'application/json', + }, + ) + return self.session + + def _map_violation_to_risk(self, violation_score: float) -> ActionSecurityRisk: + """Map GraySwan violation score to OpenHands ActionSecurityRisk.""" + if violation_score <= self.violation_thresholds['low']: + return ActionSecurityRisk.LOW + elif violation_score <= self.violation_thresholds['medium']: + return ActionSecurityRisk.MEDIUM + else: + return ActionSecurityRisk.HIGH + + async def _call_grayswan_api( + self, messages: list[dict[str, Any]] + ) -> ActionSecurityRisk: + """Call GraySwan API with formatted messages.""" + try: + session = await self._get_session() + + payload = {'messages': messages, 'policy_id': self.policy_id} + + logger.info( + f'Sending request to GraySwan API with {len(messages)} messages and policy_id: {self.policy_id}' + ) + logger.info(f'Payload: {payload}') + + response = await session.post(self.api_url, json=payload) + + async with response as resp: + if resp.status == 200: + result = await resp.json() + violation_score = ( + result.get('output', {}).get('data', {}).get('violation', 0.0) + ) + risk_level = self._map_violation_to_risk(violation_score) + if 'ipi' in result and result['ipi']: + risk_level = ( + ActionSecurityRisk.HIGH + ) # indirect prompt injection is auto escalated to HIGH + logger.info( + f'GraySwan risk assessment: {risk_level.name} (violation_score: {violation_score:.2f})' + ) + return risk_level + else: + error_text = await resp.text() + logger.error(f'GraySwan API error {resp.status}: {error_text}') + return ActionSecurityRisk.UNKNOWN + + except asyncio.TimeoutError: + logger.error('GraySwan API request timed out') + return ActionSecurityRisk.UNKNOWN + except Exception as e: + logger.error(f'GraySwan security analysis failed: {e}') + return ActionSecurityRisk.UNKNOWN + + async def handle_api_request(self, request: Request) -> Any: + """Handle incoming API requests for configuration or webhooks.""" + return {'status': 'ok', 'analyzer': 'grayswan'} + + async def security_risk(self, action: Action) -> ActionSecurityRisk: + """Analyze action for security risks using GraySwan API.""" + logger.debug( + f'Calling security_risk on GraySwanAnalyzer for action: {type(action).__name__}' + ) + + if not self.event_stream: + logger.warning('No event stream available for GraySwan analysis') + return ActionSecurityRisk.UNKNOWN + + try: + # Use View to get closer to what the agent's LLM actually sees + # This applies context management (trimming, summaries, masking) + view = View.from_events(list(self.event_stream.get_events())) + recent_events = ( + list(view)[-self.history_limit :] + if len(view) > self.history_limit + else list(view) + ) + + events_to_process = recent_events + [action] + openai_messages = convert_events_to_openai_messages(events_to_process) + + if not openai_messages: + logger.warning('No valid messages to analyze') + return ActionSecurityRisk.UNKNOWN + + logger.debug( + f'Converted {len(events_to_process)} events into {len(openai_messages)} OpenAI messages for GraySwan analysis' + ) + return await self._call_grayswan_api(openai_messages) + + except Exception as e: + logger.error(f'GraySwan security analysis failed: {e}') + return ActionSecurityRisk.UNKNOWN + + async def close(self) -> None: + """Clean up resources.""" + if self.session and not self.session.closed: + await self.session.close() diff --git a/openhands/security/grayswan/utils.py b/openhands/security/grayswan/utils.py new file mode 100644 index 0000000000..22820a69dc --- /dev/null +++ b/openhands/security/grayswan/utils.py @@ -0,0 +1,145 @@ +"""Utility for converting OpenHands events to OpenAI message format.""" + +from typing import Any + +from openhands.core.logger import openhands_logger as logger +from openhands.events.action.message import MessageAction, SystemMessageAction +from openhands.events.event import EventSource +from openhands.events.observation.browse import BrowserOutputObservation +from openhands.events.observation.commands import ( + CmdOutputObservation, + IPythonRunCellObservation, +) +from openhands.events.observation.file_download import FileDownloadObservation +from openhands.events.observation.files import ( + FileEditObservation, + FileReadObservation, + FileWriteObservation, +) +from openhands.events.observation.mcp import MCPObservation +from openhands.events.observation.observation import Observation + + +def convert_events_to_openai_messages(events: list[Any]) -> list[dict[str, Any]]: + """Convert OpenHands events to OpenAI message format for LLM APIs.""" + openai_messages = [] + + logger.info(f'Converting {len(events)} events to OpenAI messages') + + for i, event in enumerate(events): + event_type = type(event).__name__ + + # Skip agent_state_changed events and internal system actions + if event_type in [ + 'AgentStateChangedObservation', + 'ChangeAgentStateAction', + 'RecallAction', + 'RecallObservation', + 'TaskTrackingAction', + ]: + continue + + # Handle system messages + if isinstance(event, SystemMessageAction): + msg = {'role': 'system', 'content': event.content} + openai_messages.append(msg) + # Handle content messages + elif isinstance(event, MessageAction): + source = getattr(event, '_source', getattr(event, 'source', None)) + if source == EventSource.USER: + msg = {'role': 'user', 'content': event.content} + (msg['role'], msg['content']) + openai_messages.append(msg) + + elif source == EventSource.AGENT: + msg = {'role': 'assistant', 'content': event.content} + (msg['role'], msg['content']) + openai_messages.append(msg) + + # Handle tool calls + elif ( + not isinstance(event, Observation) + and hasattr(event, 'tool_call_metadata') + and event.tool_call_metadata + and getattr(event, '_source', getattr(event, 'source', None)) + == EventSource.AGENT + ): + tool_metadata = event.tool_call_metadata + model_response = getattr(tool_metadata, 'model_response', {}) or {} + choices = model_response.get('choices', []) + + if choices: + choice = choices[0] + message_data = choice.get('message', {}) + + tool_calls = message_data.get('tool_calls') + if tool_calls: + serializable_tool_calls = [] + for tc in tool_calls: + if hasattr(tc, 'id'): + tc_dict = { + 'id': tc.id, + 'type': getattr(tc, 'type', 'function'), + 'function': { + 'name': tc.function.name, + 'arguments': tc.function.arguments, + }, + } + # Remove security_risk from arguments to avoid biasing the analysis + try: + import json + + args = json.loads(tc.function.arguments) + if 'security_risk' in args: + del args['security_risk'] + tc_dict['function']['arguments'] = json.dumps(args) + except (json.JSONDecodeError, KeyError): + pass + serializable_tool_calls.append(tc_dict) + else: + serializable_tool_calls.append(tc) + + assistant_msg = { + 'role': 'assistant', + 'content': message_data.get('content', ''), + 'tool_calls': serializable_tool_calls, + } + + openai_messages.append(assistant_msg) + + # Handle tool responses + elif isinstance( + event, + ( + FileReadObservation, + FileWriteObservation, + FileEditObservation, + CmdOutputObservation, + IPythonRunCellObservation, + BrowserOutputObservation, + MCPObservation, + FileDownloadObservation, + ), + ): + # Skip observations from ENVIRONMENT source + source = getattr(event, '_source', getattr(event, 'source', None)) + if source == EventSource.ENVIRONMENT: + continue + + tool_call_id = None + if hasattr(event, 'tool_call_metadata') and event.tool_call_metadata: + tool_call_id = getattr(event.tool_call_metadata, 'tool_call_id', None) + + if tool_call_id: + content = ( + str(event.content) if hasattr(event, 'content') else str(event) + ) + msg = {'role': 'tool', 'content': content, 'tool_call_id': tool_call_id} + + openai_messages.append(msg) + else: + logger.warning( + f'Could not find tool_call_id for observation {event_type}' + ) + + return openai_messages diff --git a/openhands/security/options.py b/openhands/security/options.py index cec0726dee..d0b1691f5c 100644 --- a/openhands/security/options.py +++ b/openhands/security/options.py @@ -1,8 +1,10 @@ from openhands.security.analyzer import SecurityAnalyzer +from openhands.security.grayswan.analyzer import GraySwanAnalyzer from openhands.security.invariant.analyzer import InvariantAnalyzer from openhands.security.llm.analyzer import LLMRiskAnalyzer SecurityAnalyzers: dict[str, type[SecurityAnalyzer]] = { 'invariant': InvariantAnalyzer, 'llm': LLMRiskAnalyzer, + 'grayswan': GraySwanAnalyzer, }