diff --git a/enterprise/server/saas_nested_conversation_manager.py b/enterprise/server/saas_nested_conversation_manager.py index e757570113..d4479da0b2 100644 --- a/enterprise/server/saas_nested_conversation_manager.py +++ b/enterprise/server/saas_nested_conversation_manager.py @@ -1139,6 +1139,71 @@ class SaasNestedConversationManager(ConversationManager): } update_conversation_metadata(conversation_id, metadata_content) + async def list_files(self, sid: str, path: str | None = None) -> list[str]: + """List files in the workspace for a conversation. + + Delegates to the nested container's list-files endpoint. + + Args: + sid: The session/conversation ID. + path: Optional path to list files from. If None, lists from workspace root. + + Returns: + A list of file paths. + + Raises: + ValueError: If the conversation is not running. + httpx.HTTPError: If there's an error communicating with the nested runtime. + """ + runtime = await self._get_runtime(sid) + if runtime is None or runtime.get('status') != 'running': + raise ValueError(f'Conversation {sid} is not running') + + nested_url = self._get_nested_url_for_runtime(runtime['runtime_id'], sid) + session_api_key = runtime.get('session_api_key') + + return await self._fetch_list_files_from_nested( + sid, nested_url, session_api_key, path + ) + + async def select_file(self, sid: str, file: str) -> tuple[str | None, str | None]: + """Read a file from the workspace via nested container. + + Raises: + ValueError: If the conversation is not running. + httpx.HTTPError: If there's an error communicating with the nested runtime. + """ + runtime = await self._get_runtime(sid) + if runtime is None or runtime.get('status') != 'running': + raise ValueError(f'Conversation {sid} is not running') + + nested_url = self._get_nested_url_for_runtime(runtime['runtime_id'], sid) + session_api_key = runtime.get('session_api_key') + + return await self._fetch_select_file_from_nested( + sid, nested_url, session_api_key, file + ) + + async def upload_files( + self, sid: str, files: list[tuple[str, bytes]] + ) -> tuple[list[str], list[dict[str, str]]]: + """Upload files to the workspace via nested container. + + Raises: + ValueError: If the conversation is not running. + httpx.HTTPError: If there's an error communicating with the nested runtime. + """ + runtime = await self._get_runtime(sid) + if runtime is None or runtime.get('status') != 'running': + raise ValueError(f'Conversation {sid} is not running') + + nested_url = self._get_nested_url_for_runtime(runtime['runtime_id'], sid) + session_api_key = runtime.get('session_api_key') + + return await self._fetch_upload_files_to_nested( + sid, nested_url, session_api_key, files + ) + def _last_updated_at_key(conversation: ConversationMetadata) -> float: last_updated_at = conversation.last_updated_at diff --git a/openhands/server/conversation_manager/conversation_manager.py b/openhands/server/conversation_manager/conversation_manager.py index 2f704916ce..7a63233100 100644 --- a/openhands/server/conversation_manager/conversation_manager.py +++ b/openhands/server/conversation_manager/conversation_manager.py @@ -10,10 +10,12 @@ from __future__ import annotations from abc import ABC, abstractmethod +import httpx import socketio from openhands.core.config import OpenHandsConfig from openhands.core.config.llm_config import LLMConfig +from openhands.core.logger import openhands_logger as logger from openhands.events.action import MessageAction from openhands.server.config.server_config import ServerConfig from openhands.server.data_models.agent_loop_info import AgentLoopInfo @@ -23,6 +25,7 @@ from openhands.server.session.conversation import ServerConversation from openhands.storage.conversation.conversation_store import ConversationStore from openhands.storage.data_models.settings import Settings from openhands.storage.files import FileStore +from openhands.utils.http_session import httpx_verify_option class ConversationManager(ABC): @@ -155,6 +158,208 @@ class ConversationManager(ABC): ) -> str: """Request extraneous llm completions for a conversation""" + @abstractmethod + async def list_files(self, sid: str, path: str | None = None) -> list[str]: + """List files in the workspace for a conversation. + + Args: + sid: The session/conversation ID. + path: Optional path to list files from. If None, lists from workspace root. + + Returns: + A list of file paths. + + Raises: + ValueError: If the conversation is not running (for nested managers). + """ + + @abstractmethod + async def select_file(self, sid: str, file: str) -> tuple[str | None, str | None]: + """Read a file from the workspace. + + Args: + sid: The session/conversation ID. + file: The file path relative to the workspace root. + + Returns: + A tuple of (content, error). If successful, content is the file content + and error is None. If failed, content is None and error is the error message. + + Raises: + ValueError: If the conversation is not running (for nested managers). + """ + + @abstractmethod + async def upload_files( + self, sid: str, files: list[tuple[str, bytes]] + ) -> tuple[list[str], list[dict[str, str]]]: + """Upload files to the workspace. + + Args: + sid: The session/conversation ID. + files: List of (filename, content) tuples to upload. + + Returns: + A tuple of (uploaded_files, skipped_files) where uploaded_files is a list + of successfully uploaded file paths and skipped_files is a list of dicts + with 'name' and 'reason' keys for files that failed to upload. + + Raises: + ValueError: If the conversation is not running (for nested managers). + """ + + async def _fetch_list_files_from_nested( + self, + sid: str, + nested_url: str, + session_api_key: str | None, + path: str | None = None, + ) -> list[str]: + """Fetch file list from a nested runtime container. + + This is a helper method used by nested conversation managers to make HTTP + requests to the nested runtime's list-files endpoint. + + Args: + sid: The session/conversation ID (for logging). + nested_url: The base URL of the nested runtime. + session_api_key: The session API key for authentication. + path: Optional path to list files from. + + Returns: + A list of file paths. + + Raises: + httpx.TimeoutException: If the request times out. + httpx.ConnectError: If unable to connect to the nested runtime. + httpx.HTTPStatusError: If the nested runtime returns an error status. + """ + async with httpx.AsyncClient( + verify=httpx_verify_option(), + headers={'X-Session-API-Key': session_api_key} if session_api_key else {}, + ) as client: + params = {'path': path} if path else {} + try: + response = await client.get(f'{nested_url}/list-files', params=params) + response.raise_for_status() + return response.json() + except httpx.TimeoutException: + logger.error( + 'Timeout fetching files from nested runtime', + extra={'session_id': sid}, + ) + raise + except httpx.ConnectError as e: + logger.error( + f'Failed to connect to nested runtime: {e}', + extra={'session_id': sid}, + ) + raise + except httpx.HTTPStatusError as e: + logger.error( + f'Nested runtime returned error: {e.response.status_code}', + extra={'session_id': sid}, + ) + raise + + async def _fetch_select_file_from_nested( + self, + sid: str, + nested_url: str, + session_api_key: str | None, + file: str, + ) -> tuple[str | None, str | None]: + """Fetch file content from a nested runtime container. + + Args: + sid: The session/conversation ID (for logging). + nested_url: The base URL of the nested runtime. + session_api_key: The session API key for authentication. + file: The file path to read. + + Returns: + A tuple of (content, error). + """ + async with httpx.AsyncClient( + verify=httpx_verify_option(), + headers={'X-Session-API-Key': session_api_key} if session_api_key else {}, + ) as client: + params = {'file': file} + try: + response = await client.get(f'{nested_url}/select-file', params=params) + response.raise_for_status() + data = response.json() + return data.get('code'), None + except httpx.HTTPStatusError as e: + if e.response.status_code == 415: + return None, f'BINARY_FILE:{file}' + error_data = e.response.json() if e.response.content else {} + return None, error_data.get('error', str(e)) + except httpx.TimeoutException: + logger.error( + 'Timeout fetching file from nested runtime', + extra={'session_id': sid}, + ) + raise + except httpx.ConnectError as e: + logger.error( + f'Failed to connect to nested runtime: {e}', + extra={'session_id': sid}, + ) + raise + + async def _fetch_upload_files_to_nested( + self, + sid: str, + nested_url: str, + session_api_key: str | None, + files: list[tuple[str, bytes]], + ) -> tuple[list[str], list[dict[str, str]]]: + """Upload files to a nested runtime container. + + Args: + sid: The session/conversation ID (for logging). + nested_url: The base URL of the nested runtime. + session_api_key: The session API key for authentication. + files: List of (filename, content) tuples to upload. + + Returns: + A tuple of (uploaded_files, skipped_files). + """ + async with httpx.AsyncClient( + verify=httpx_verify_option(), + headers={'X-Session-API-Key': session_api_key} if session_api_key else {}, + ) as client: + try: + # Build multipart form data + multipart_files = [ + ('files', (filename, content)) for filename, content in files + ] + response = await client.post( + f'{nested_url}/upload-files', files=multipart_files + ) + response.raise_for_status() + data = response.json() + return data.get('uploaded_files', []), data.get('skipped_files', []) + except httpx.TimeoutException: + logger.error( + 'Timeout uploading files to nested runtime', + extra={'session_id': sid}, + ) + raise + except httpx.ConnectError as e: + logger.error( + f'Failed to connect to nested runtime: {e}', + extra={'session_id': sid}, + ) + raise + except httpx.HTTPStatusError as e: + logger.error( + f'Nested runtime returned error: {e.response.status_code}', + extra={'session_id': sid}, + ) + raise + @classmethod @abstractmethod def get_instance( diff --git a/openhands/server/conversation_manager/docker_nested_conversation_manager.py b/openhands/server/conversation_manager/docker_nested_conversation_manager.py index 7ba91f17a9..425c9cb8a5 100644 --- a/openhands/server/conversation_manager/docker_nested_conversation_manager.py +++ b/openhands/server/conversation_manager/docker_nested_conversation_manager.py @@ -644,6 +644,68 @@ class DockerNestedConversationManager(ConversationManager): except docker.errors.NotFound: return False + async def list_files(self, sid: str, path: str | None = None) -> list[str]: + """List files in the workspace for a conversation. + + Delegates to the nested container's list-files endpoint. + + Args: + sid: The session/conversation ID. + path: Optional path to list files from. If None, lists from workspace root. + + Returns: + A list of file paths. + + Raises: + ValueError: If the conversation is not running. + httpx.HTTPError: If there's an error communicating with the nested runtime. + """ + if not await self.is_agent_loop_running(sid): + raise ValueError(f'Conversation {sid} is not running') + + nested_url = self._get_nested_url(sid) + session_api_key = self._get_session_api_key_for_conversation(sid) + + return await self._fetch_list_files_from_nested( + sid, nested_url, session_api_key, path + ) + + async def select_file(self, sid: str, file: str) -> tuple[str | None, str | None]: + """Read a file from the workspace via nested container. + + Raises: + ValueError: If the conversation is not running. + httpx.HTTPError: If there's an error communicating with the nested runtime. + """ + if not await self.is_agent_loop_running(sid): + raise ValueError(f'Conversation {sid} is not running') + + nested_url = self._get_nested_url(sid) + session_api_key = self._get_session_api_key_for_conversation(sid) + + return await self._fetch_select_file_from_nested( + sid, nested_url, session_api_key, file + ) + + async def upload_files( + self, sid: str, files: list[tuple[str, bytes]] + ) -> tuple[list[str], list[dict[str, str]]]: + """Upload files to the workspace via nested container. + + Raises: + ValueError: If the conversation is not running. + httpx.HTTPError: If there's an error communicating with the nested runtime. + """ + if not await self.is_agent_loop_running(sid): + raise ValueError(f'Conversation {sid} is not running') + + nested_url = self._get_nested_url(sid) + session_api_key = self._get_session_api_key_for_conversation(sid) + + return await self._fetch_upload_files_to_nested( + sid, nested_url, session_api_key, files + ) + def _last_updated_at_key(conversation: ConversationMetadata) -> float: last_updated_at = conversation.last_updated_at diff --git a/openhands/server/conversation_manager/standalone_conversation_manager.py b/openhands/server/conversation_manager/standalone_conversation_manager.py index c63d214280..5e804cdea3 100644 --- a/openhands/server/conversation_manager/standalone_conversation_manager.py +++ b/openhands/server/conversation_manager/standalone_conversation_manager.py @@ -7,12 +7,15 @@ # Tag: Legacy-V0 # This module belongs to the old V0 web server. The V1 application server lives under openhands/app_server/. import asyncio +import os import time from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Any, Callable, Iterable import socketio +from pathspec import PathSpec +from pathspec.patterns import GitWildMatchPattern from openhands.core.config.llm_config import LLMConfig from openhands.core.config.openhands_config import OpenHandsConfig @@ -20,7 +23,7 @@ from openhands.core.exceptions import AgentRuntimeUnavailableError from openhands.core.logger import openhands_logger as logger from openhands.core.schema.agent import AgentState from openhands.core.schema.observation import ObservationType -from openhands.events.action import MessageAction +from openhands.events.action import FileReadAction, MessageAction from openhands.events.observation.commands import CmdOutputObservation from openhands.events.stream import EventStreamSubscriber, session_exists from openhands.llm.llm_registry import LLMRegistry @@ -40,6 +43,7 @@ from openhands.storage.files import FileStore from openhands.utils.async_utils import ( GENERAL_TIMEOUT, call_async_from_sync, + call_sync_from_async, run_in_loop, wait_all, ) @@ -434,6 +438,136 @@ class StandaloneConversationManager(ConversationManager): return session.agent_session return None + async def list_files(self, sid: str, path: str | None = None) -> list[str]: + """List files in the workspace for a conversation. + + Args: + sid: The session/conversation ID. + path: Optional path to list files from. If None, lists from workspace root. + + Returns: + A list of file paths, filtered by .gitignore rules if present. + + Raises: + ValueError: If the runtime is not available. + """ + agent_session = self.get_agent_session(sid) + if not agent_session or not agent_session.runtime: + raise ValueError(f'Runtime not available for conversation {sid}') + + runtime = agent_session.runtime + file_list = await call_sync_from_async(runtime.list_files, path) + + # runtime.list_files returns relative filenames within the specified directory, + # so we need to join with the path to get paths relative to workspace root + if path: + file_list = [os.path.join(path, f) for f in file_list] + + file_list = await self._filter_for_gitignore(runtime, file_list) + return file_list + + async def _filter_for_gitignore( + self, runtime: Any, file_list: list[str] + ) -> list[str]: + """Filter file list based on root-level .gitignore rules. + + Note: Only the .gitignore file at the workspace root is supported. + Nested .gitignore files in subdirectories are not processed. + + Args: + runtime: The runtime to read the .gitignore file from. + file_list: List of file paths to filter. + + Returns: + Filtered list of files excluding those matching .gitignore patterns. + """ + try: + read_action = FileReadAction('.gitignore') + observation = await call_sync_from_async(runtime.run_action, read_action) + spec = PathSpec.from_lines( + GitWildMatchPattern, observation.content.splitlines() + ) + file_list = [entry for entry in file_list if not spec.match_file(entry)] + except Exception as e: + logger.warning(f'Could not read .gitignore for filtering: {e}') + return file_list + + async def select_file(self, sid: str, file: str) -> tuple[str | None, str | None]: + """Read a file from the workspace. + + Args: + sid: The session/conversation ID. + file: The file path relative to the workspace root. + + Returns: + A tuple of (content, error). If successful, content is the file content + and error is None. If failed, content is None and error is the error message. + + Raises: + ValueError: If the runtime is not available. + """ + from openhands.events.observation import ErrorObservation, FileReadObservation + + agent_session = self.get_agent_session(sid) + if not agent_session or not agent_session.runtime: + raise ValueError(f'Runtime not available for conversation {sid}') + + runtime = agent_session.runtime + file_path = os.path.join(runtime.config.workspace_mount_path_in_sandbox, file) + read_action = FileReadAction(file_path) + + observation = await call_sync_from_async(runtime.run_action, read_action) + + if isinstance(observation, FileReadObservation): + return observation.content, None + elif isinstance(observation, ErrorObservation): + if 'ERROR_BINARY_FILE' in observation.message: + return None, f'BINARY_FILE:{file}' + return None, str(observation) + else: + return None, f'Unexpected observation type: {type(observation)}' + + async def upload_files( + self, sid: str, files: list[tuple[str, bytes]] + ) -> tuple[list[str], list[dict[str, str]]]: + """Upload files to the workspace. + + Args: + sid: The session/conversation ID. + files: List of (filename, content) tuples to upload. + + Returns: + A tuple of (uploaded_files, skipped_files). + + Raises: + ValueError: If the runtime is not available. + """ + from openhands.events.action.files import FileWriteAction + + agent_session = self.get_agent_session(sid) + if not agent_session or not agent_session.runtime: + raise ValueError(f'Runtime not available for conversation {sid}') + + runtime = agent_session.runtime + uploaded_files: list[str] = [] + skipped_files: list[dict[str, str]] = [] + + for filename, content in files: + file_path = os.path.join( + runtime.config.workspace_mount_path_in_sandbox, filename + ) + try: + write_action = FileWriteAction( + path=file_path, + content=content.decode('utf-8', errors='replace'), + ) + await call_sync_from_async(runtime.run_action, write_action) + uploaded_files.append(file_path) + except Exception as e: + skipped_files.append({'name': filename, 'reason': str(e)}) + + return uploaded_files, skipped_files + async def _close_session(self, sid: str): logger.info(f'_close_session:{sid}', extra={'session_id': sid}) diff --git a/openhands/server/routes/files.py b/openhands/server/routes/files.py index e5285e50ab..403a65cfca 100644 --- a/openhands/server/routes/files.py +++ b/openhands/server/routes/files.py @@ -9,30 +9,27 @@ import os from typing import Any +import httpx from fastapi import APIRouter, Depends, HTTPException, UploadFile, status from fastapi.responses import FileResponse, JSONResponse -from pathspec import PathSpec -from pathspec.patterns import GitWildMatchPattern from starlette.background import BackgroundTask from openhands.core.exceptions import AgentRuntimeUnavailableError from openhands.core.logger import openhands_logger as logger -from openhands.events.action import ( - FileReadAction, -) -from openhands.events.action.files import FileWriteAction -from openhands.events.observation import ( - ErrorObservation, - FileReadObservation, -) from openhands.runtime.base import Runtime from openhands.server.dependencies import get_dependencies from openhands.server.file_config import FILES_TO_IGNORE from openhands.server.files import POSTUploadFilesModel from openhands.server.session.conversation import ServerConversation +from openhands.server.shared import conversation_manager from openhands.server.user_auth import get_user_id -from openhands.server.utils import get_conversation, get_conversation_store +from openhands.server.utils import ( + get_conversation, + get_conversation_metadata, + get_conversation_store, +) from openhands.storage.conversation.conversation_store import ConversationStore +from openhands.storage.data_models.conversation_metadata import ConversationMetadata from openhands.utils.async_utils import call_sync_from_async app = APIRouter( @@ -50,7 +47,7 @@ app = APIRouter( deprecated=True, ) async def list_files( - conversation: ServerConversation = Depends(get_conversation), + metadata: ConversationMetadata = Depends(get_conversation_metadata), path: str | None = None, ) -> list[str] | JSONResponse: """List files in the specified path. @@ -64,7 +61,7 @@ async def list_files( ``` Args: - request (Request): The incoming request object. + metadata: The conversation metadata (provides conversation_id and user access validation). path (str, optional): The path to list files from. Defaults to None. Returns: @@ -76,49 +73,50 @@ async def list_files( For V1 conversations, file operations are handled through the agent server. Use the sandbox's exposed agent server URL to access file operations. """ - if not conversation.runtime: + conversation_id = metadata.conversation_id + try: + file_list = await conversation_manager.list_files(conversation_id, path) + except ValueError as e: + logger.error(f'Error listing files: {e}') return JSONResponse( status_code=status.HTTP_404_NOT_FOUND, - content={'error': 'Runtime not yet initialized'}, + content={'error': str(e)}, ) - - runtime: Runtime = conversation.runtime - try: - file_list = await call_sync_from_async(runtime.list_files, path) except AgentRuntimeUnavailableError as e: logger.error(f'Error listing files: {e}') return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={'error': f'Error listing files: {e}'}, ) - if path: - file_list = [os.path.join(path, f) for f in file_list] - - file_list = [f for f in file_list if f not in FILES_TO_IGNORE] - - async def filter_for_gitignore(file_list: list[str], base_path: str) -> list[str]: - gitignore_path = os.path.join(base_path, '.gitignore') - try: - read_action = FileReadAction(gitignore_path) - observation = await call_sync_from_async(runtime.run_action, read_action) - spec = PathSpec.from_lines( - GitWildMatchPattern, observation.content.splitlines() - ) - except Exception as e: - logger.warning(e) - return file_list - file_list = [entry for entry in file_list if not spec.match_file(entry)] - return file_list - - try: - file_list = await filter_for_gitignore(file_list, '') - except AgentRuntimeUnavailableError as e: - logger.error(f'Error filtering files: {e}') + except httpx.TimeoutException: + logger.error(f'Timeout listing files for conversation {conversation_id}') + return JSONResponse( + status_code=status.HTTP_504_GATEWAY_TIMEOUT, + content={'error': 'Request to runtime timed out'}, + ) + except httpx.ConnectError: + logger.error( + f'Connection error listing files for conversation {conversation_id}' + ) + return JSONResponse( + status_code=status.HTTP_502_BAD_GATEWAY, + content={'error': 'Unable to connect to runtime'}, + ) + except httpx.HTTPStatusError as e: + logger.error(f'HTTP error listing files: {e.response.status_code}') + return JSONResponse( + status_code=e.response.status_code, + content={'error': f'Runtime returned error: {e.response.status_code}'}, + ) + except Exception as e: + logger.error(f'Error listing files: {e}') return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - content={'error': f'Error filtering files: {e}'}, + content={'error': f'Error listing files: {e}'}, ) + file_list = [f for f in file_list if f not in FILES_TO_IGNORE] + return file_list @@ -138,19 +136,19 @@ async def list_files( deprecated=True, ) async def select_file( - file: str, conversation: ServerConversation = Depends(get_conversation) + file: str, + metadata: ConversationMetadata = Depends(get_conversation_metadata), ) -> FileResponse | JSONResponse: """Retrieve the content of a specified file. To select a file: ```sh - curl http://localhost:3000/api/conversations/{conversation_id}select-file?file= + curl http://localhost:3000/api/conversations/{conversation_id}/select-file?file= ``` Args: - file (str): The path of the file to be retrieved. - Expect path to be absolute inside the runtime. - request (Request): The incoming request object. + file (str): The path of the file to be retrieved (relative to workspace root). + metadata: The conversation metadata (provides conversation_id and user access validation). Returns: dict: A dictionary containing the file content. @@ -161,40 +159,53 @@ async def select_file( For V1 conversations, file operations are handled through the agent server. Use the sandbox's exposed agent server URL to access file operations. """ - runtime: Runtime = conversation.runtime - - file = os.path.join(runtime.config.workspace_mount_path_in_sandbox, file) - read_action = FileReadAction(file) + conversation_id = metadata.conversation_id try: - observation = await call_sync_from_async(runtime.run_action, read_action) + content, error = await conversation_manager.select_file(conversation_id, file) + except ValueError as e: + logger.error(f'Error opening file {file}: {e}') + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content={'error': str(e)}, + ) except AgentRuntimeUnavailableError as e: logger.error(f'Error opening file {file}: {e}') return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={'error': f'Error opening file: {e}'}, ) - - if isinstance(observation, FileReadObservation): - content = observation.content - return JSONResponse(content={'code': content}) - elif isinstance(observation, ErrorObservation): - logger.error(f'Error opening file {file}: {observation}') - - if 'ERROR_BINARY_FILE' in observation.message: - return JSONResponse( - status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE, - content={'error': f'Unable to open binary file: {file}'}, - ) - + except httpx.TimeoutException: + logger.error(f'Timeout reading file for conversation {conversation_id}') + return JSONResponse( + status_code=status.HTTP_504_GATEWAY_TIMEOUT, + content={'error': 'Request to runtime timed out'}, + ) + except httpx.ConnectError: + logger.error( + f'Connection error reading file for conversation {conversation_id}' + ) + return JSONResponse( + status_code=status.HTTP_502_BAD_GATEWAY, + content={'error': 'Unable to connect to runtime'}, + ) + except Exception as e: + logger.error(f'Error opening file {file}: {e}') return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - content={'error': f'Error opening file: {observation}'}, + content={'error': f'Error opening file: {e}'}, + ) + + if content is not None: + return JSONResponse(content={'code': content}) + elif error and error.startswith('BINARY_FILE:'): + return JSONResponse( + status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE, + content={'error': f'Unable to open binary file: {file}'}, ) else: - # Handle unexpected observation types return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - content={'error': f'Unexpected observation type: {type(observation)}'}, + content={'error': f'Error opening file: {error}'}, ) @@ -321,33 +332,58 @@ async def git_diff( @app.post('/upload-files', response_model=POSTUploadFilesModel, deprecated=True) async def upload_files( files: list[UploadFile], - conversation: ServerConversation = Depends(get_conversation), + metadata: ConversationMetadata = Depends(get_conversation_metadata), ): """Upload files to the workspace. For V1 conversations, file operations are handled through the agent server. Use the sandbox's exposed agent server URL to access file operations. """ - uploaded_files = [] - skipped_files = [] - runtime: Runtime = conversation.runtime + conversation_id = metadata.conversation_id + # Read all file contents + file_data: list[tuple[str, bytes]] = [] for file in files: - file_path = os.path.join( - runtime.config.workspace_mount_path_in_sandbox, str(file.filename) + content = await file.read() + file_data.append((str(file.filename), content)) + + try: + uploaded_files, skipped_files = await conversation_manager.upload_files( + conversation_id, file_data ) - try: - file_content = await file.read() - write_action = FileWriteAction( - # TODO: DISCUSS UTF8 encoding here - path=file_path, - content=file_content.decode('utf-8', errors='replace'), - ) - # TODO: DISCUSS file name unique issues - await call_sync_from_async(runtime.run_action, write_action) - uploaded_files.append(file_path) - except Exception as e: - skipped_files.append({'name': file.filename, 'reason': str(e)}) + except ValueError as e: + logger.error(f'Error uploading files: {e}') + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content={'error': str(e)}, + ) + except httpx.TimeoutException: + logger.error(f'Timeout uploading files for conversation {conversation_id}') + return JSONResponse( + status_code=status.HTTP_504_GATEWAY_TIMEOUT, + content={'error': 'Request to runtime timed out'}, + ) + except httpx.ConnectError: + logger.error( + f'Connection error uploading files for conversation {conversation_id}' + ) + return JSONResponse( + status_code=status.HTTP_502_BAD_GATEWAY, + content={'error': 'Unable to connect to runtime'}, + ) + except httpx.HTTPStatusError as e: + logger.error(f'HTTP error uploading files: {e.response.status_code}') + return JSONResponse( + status_code=e.response.status_code, + content={'error': f'Runtime returned error: {e.response.status_code}'}, + ) + except Exception as e: + logger.error(f'Error uploading files: {e}') + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content={'error': f'Error uploading files: {e}'}, + ) + return JSONResponse( status_code=status.HTTP_200_OK, content={ diff --git a/openhands/server/routes/trajectory.py b/openhands/server/routes/trajectory.py index ccd54a4f18..047a9e5dfd 100644 --- a/openhands/server/routes/trajectory.py +++ b/openhands/server/routes/trajectory.py @@ -12,10 +12,12 @@ from fastapi.responses import JSONResponse from openhands.core.logger import openhands_logger as logger from openhands.events.async_event_store_wrapper import AsyncEventStoreWrapper from openhands.events.event_filter import EventFilter +from openhands.events.event_store import EventStore from openhands.events.serialization import event_to_trajectory from openhands.server.dependencies import get_dependencies -from openhands.server.session.conversation import ServerConversation -from openhands.server.utils import get_conversation +from openhands.server.shared import file_store +from openhands.server.utils import get_conversation_metadata +from openhands.storage.data_models.conversation_metadata import ConversationMetadata app = APIRouter( prefix='/api/conversations/{conversation_id}', dependencies=get_dependencies() @@ -24,22 +26,29 @@ app = APIRouter( @app.get('/trajectory') async def get_trajectory( - conversation: ServerConversation = Depends(get_conversation), + metadata: ConversationMetadata = Depends(get_conversation_metadata), ) -> JSONResponse: """Get trajectory. This function retrieves the current trajectory and returns it. + Uses the local EventStore which reads events from the file store, + so it works with both standalone and nested conversation managers. Args: - request (Request): The incoming request object. + metadata: The conversation metadata (provides conversation_id and user access validation). Returns: JSONResponse: A JSON response containing the trajectory as a list of events. """ try: + event_store = EventStore( + sid=metadata.conversation_id, + file_store=file_store, + user_id=metadata.user_id, + ) async_store = AsyncEventStoreWrapper( - conversation.event_stream, filter=EventFilter(exclude_hidden=True) + event_store, filter=EventFilter(exclude_hidden=True) ) trajectory = [] async for event in async_store: