chore - Add pydantic lib to type checking (#9086)

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
Ray Myers 2025-06-26 13:31:41 -05:00 committed by GitHub
parent 612bc3fa60
commit 94fe052561
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 94 additions and 80 deletions

View File

@ -40,7 +40,7 @@ repos:
hooks:
- id: mypy
additional_dependencies:
[types-requests, types-setuptools, types-pyyaml, types-toml, types-docker, lxml]
[types-requests, types-setuptools, types-pyyaml, types-toml, types-docker, pydantic, lxml]
# To see gaps add `--html-report mypy-report/`
entry: mypy --config-file dev_config/python/mypy.ini openhands/
always_run: true

View File

@ -7,5 +7,9 @@ warn_unreachable = True
warn_redundant_casts = True
no_implicit_optional = True
strict_optional = True
# Exclude third-party runtime directory from type checking
exclude = third_party/
[mypy-openhands.memory.condenser.impl.*]
disable_error_code = override

View File

@ -70,7 +70,7 @@ from openhands.events.observation import (
)
from openhands.events.serialization.event import truncate_content
from openhands.llm.llm import LLM
from openhands.llm.metrics import Metrics, TokenUsage
from openhands.llm.metrics import Metrics
from openhands.memory.view import View
from openhands.storage.files import FileStore
@ -1152,7 +1152,7 @@ class AgentController:
agent_metrics = self.state.metrics
# Get metrics from condenser LLM if it exists
condenser_metrics: TokenUsage | None = None
condenser_metrics: Metrics | None = None
if hasattr(self.agent, 'condenser') and hasattr(self.agent.condenser, 'llm'):
condenser_metrics = self.agent.condenser.llm.metrics

View File

@ -11,7 +11,7 @@ from openhands.core.config.llm_config import LLMConfig
class NoOpCondenserConfig(BaseModel):
"""Configuration for NoOpCondenser."""
type: Literal['noop'] = Field('noop')
type: Literal['noop'] = 'noop'
model_config = ConfigDict(extra='forbid')
@ -19,7 +19,7 @@ class NoOpCondenserConfig(BaseModel):
class ObservationMaskingCondenserConfig(BaseModel):
"""Configuration for ObservationMaskingCondenser."""
type: Literal['observation_masking'] = Field('observation_masking')
type: Literal['observation_masking'] = 'observation_masking'
attention_window: int = Field(
default=100,
description='The number of most-recent events where observations will not be masked.',
@ -32,7 +32,7 @@ class ObservationMaskingCondenserConfig(BaseModel):
class BrowserOutputCondenserConfig(BaseModel):
"""Configuration for the BrowserOutputCondenser."""
type: Literal['browser_output_masking'] = Field('browser_output_masking')
type: Literal['browser_output_masking'] = 'browser_output_masking'
attention_window: int = Field(
default=1,
description='The number of most recent browser output observations that will not be masked.',
@ -43,7 +43,7 @@ class BrowserOutputCondenserConfig(BaseModel):
class RecentEventsCondenserConfig(BaseModel):
"""Configuration for RecentEventsCondenser."""
type: Literal['recent'] = Field('recent')
type: Literal['recent'] = 'recent'
# at least one event by default, because the best guess is that it is the user task
keep_first: int = Field(
@ -61,7 +61,7 @@ class RecentEventsCondenserConfig(BaseModel):
class LLMSummarizingCondenserConfig(BaseModel):
"""Configuration for LLMCondenser."""
type: Literal['llm'] = Field('llm')
type: Literal['llm'] = 'llm'
llm_config: LLMConfig = Field(
..., description='Configuration for the LLM to use for condensing.'
)
@ -88,7 +88,7 @@ class LLMSummarizingCondenserConfig(BaseModel):
class AmortizedForgettingCondenserConfig(BaseModel):
"""Configuration for AmortizedForgettingCondenser."""
type: Literal['amortized'] = Field('amortized')
type: Literal['amortized'] = 'amortized'
max_size: int = Field(
default=100,
description='Maximum size of the condensed history before triggering forgetting.',
@ -108,7 +108,7 @@ class AmortizedForgettingCondenserConfig(BaseModel):
class LLMAttentionCondenserConfig(BaseModel):
"""Configuration for LLMAttentionCondenser."""
type: Literal['llm_attention'] = Field('llm_attention')
type: Literal['llm_attention'] = 'llm_attention'
llm_config: LLMConfig = Field(
..., description='Configuration for the LLM to use for attention.'
)
@ -131,7 +131,7 @@ class LLMAttentionCondenserConfig(BaseModel):
class StructuredSummaryCondenserConfig(BaseModel):
"""Configuration for StructuredSummaryCondenser instances."""
type: Literal['structured'] = Field('structured')
type: Literal['structured'] = 'structured'
llm_config: LLMConfig = Field(
..., description='Configuration for the LLM to use for condensing.'
)
@ -161,7 +161,7 @@ class CondenserPipelineConfig(BaseModel):
Not currently supported by the TOML or ENV_VAR configuration strategies.
"""
type: Literal['pipeline'] = Field('pipeline')
type: Literal['pipeline'] = 'pipeline'
condensers: list[CondenserConfig] = Field(
default_factory=list,
description='List of condenser configurations to be used in the pipeline.',

View File

@ -131,7 +131,9 @@ class MCPConfig(BaseModel):
# Convert all entries in sse_servers to MCPSSEServerConfig objects
if 'sse_servers' in data:
data['sse_servers'] = cls._normalize_servers(data['sse_servers'])
servers = []
servers: list[
MCPSSEServerConfig | MCPStdioServerConfig | MCPSHTTPServerConfig
] = []
for server in data['sse_servers']:
servers.append(MCPSSEServerConfig(**server))
data['sse_servers'] = servers

View File

@ -112,7 +112,7 @@ def initialize_repository_for_runtime(
provider_tokens[ProviderType.BITBUCKET] = ProviderToken(token=bitbucket_token)
secret_store = (
UserSecrets(provider_tokens=provider_tokens) if provider_tokens else None
UserSecrets(provider_tokens=provider_tokens) if provider_tokens else None # type: ignore[arg-type]
)
immutable_provider_tokens = secret_store.provider_tokens if secret_store else None

View File

@ -72,7 +72,9 @@ class GitHubService(BaseGitService, GitService):
async def _get_github_headers(self) -> dict:
"""Retrieve the GH Token from settings store to construct the headers."""
if not self.token:
self.token = await self.get_latest_token()
latest_token = await self.get_latest_token()
if latest_token:
self.token = latest_token
return {
'Authorization': f'Bearer {self.token.get_secret_value() if self.token else ""}',
@ -229,8 +231,8 @@ class GitHubService(BaseGitService, GitService):
# Convert to Repository objects
return [
Repository(
id=str(repo.get('id')),
full_name=repo.get('full_name'),
id=str(repo.get('id')), # type: ignore[arg-type]
full_name=repo.get('full_name'), # type: ignore[arg-type]
stargazers_count=repo.get('stargazers_count'),
git_provider=ProviderType.GITHUB,
is_public=not repo.get('private', True),

View File

@ -66,7 +66,9 @@ class GitLabService(BaseGitService, GitService):
Retrieve the GitLab Token to construct the headers
"""
if not self.token:
self.token = await self.get_latest_token()
latest_token = await self.get_latest_token()
if latest_token:
self.token = latest_token
return {
'Authorization': f'Bearer {self.token.get_secret_value()}',
@ -185,7 +187,7 @@ class GitLabService(BaseGitService, GitService):
return User(
id=str(response.get('id', '')),
login=response.get('username'),
login=response.get('username'), # type: ignore[call-arg]
avatar_url=avatar_url,
name=response.get('name'),
email=response.get('email'),
@ -258,8 +260,8 @@ class GitLabService(BaseGitService, GitService):
all_repos = all_repos[:MAX_REPOS]
return [
Repository(
id=str(repo.get('id')),
full_name=repo.get('path_with_namespace'),
id=str(repo.get('id')), # type: ignore[arg-type]
full_name=repo.get('path_with_namespace'), # type: ignore[arg-type]
stargazers_count=repo.get('star_count'),
git_provider=ProviderType.GITLAB,
is_public=repo.get('visibility') == 'public',

View File

@ -50,7 +50,7 @@ class ProviderToken(BaseModel):
# Override with emtpy string if it was set to None
# Cannot pass None to SecretStr
if token_str is None:
token_str = ''
token_str = '' # type: ignore[unreachable]
user_id = token_value.get('user_id')
host = token_value.get('host')
return cls(token=SecretStr(token_str), user_id=user_id, host=host)
@ -74,8 +74,8 @@ class CustomSecret(BaseModel):
if isinstance(secret_value, CustomSecret):
return secret_value
elif isinstance(secret_value, dict):
secret = secret_value.get('secret')
description = secret_value.get('description')
secret = secret_value.get('secret', '')
description = secret_value.get('description', '')
return cls(secret=SecretStr(secret), description=description)
else:

View File

@ -26,7 +26,7 @@ async def validate_provider_token(
"""
# Skip validation for empty tokens
if token is None:
return None
return None # type: ignore[unreachable]
# Try GitHub first
github_error = None

View File

@ -62,8 +62,10 @@ async def create_mcp_clients(
)
return []
servers: list[MCPSSEServerConfig | MCPSHTTPServerConfig] = sse_servers.copy()
servers.extend(shttp_servers.copy())
servers: list[MCPSSEServerConfig | MCPSHTTPServerConfig] = [
*sse_servers,
*shttp_servers,
]
if not servers:
return []

View File

@ -60,7 +60,7 @@ class AmortizedForgettingCondenser(RollingCondenser):
def from_config(
cls, config: AmortizedForgettingCondenserConfig
) -> AmortizedForgettingCondenser:
return AmortizedForgettingCondenser(**config.model_dump(exclude=['type']))
return AmortizedForgettingCondenser(**config.model_dump(exclude={'type'}))
AmortizedForgettingCondenser.register_config(AmortizedForgettingCondenserConfig)

View File

@ -42,7 +42,7 @@ class BrowserOutputCondenser(Condenser):
def from_config(
cls, config: BrowserOutputCondenserConfig
) -> BrowserOutputCondenser:
return BrowserOutputCondenser(**config.model_dump(exclude=['type']))
return BrowserOutputCondenser(**config.model_dump(exclude={'type'}))
BrowserOutputCondenser.register_config(BrowserOutputCondenserConfig)

View File

@ -30,7 +30,7 @@ class ObservationMaskingCondenser(Condenser):
def from_config(
cls, config: ObservationMaskingCondenserConfig
) -> ObservationMaskingCondenser:
return ObservationMaskingCondenser(**config.model_dump(exclude=['type']))
return ObservationMaskingCondenser(**config.model_dump(exclude={'type'}))
ObservationMaskingCondenser.register_config(ObservationMaskingCondenserConfig)

View File

@ -22,7 +22,7 @@ class RecentEventsCondenser(Condenser):
@classmethod
def from_config(cls, config: RecentEventsCondenserConfig) -> RecentEventsCondenser:
return RecentEventsCondenser(**config.model_dump(exclude=['type']))
return RecentEventsCondenser(**config.model_dump(exclude={'type'}))
RecentEventsCondenser.register_config(RecentEventsCondenserConfig)

View File

@ -395,7 +395,7 @@ class ConversationMemory:
text = truncate_content(text, max_message_chars)
# Create message content with text
content = [TextContent(text=text)]
content: list[TextContent | ImageContent] = [TextContent(text=text)]
# Add image URLs if available and vision is active
if vision_is_active and obs.image_urls:
@ -411,7 +411,7 @@ class ConversationMemory:
# Add text indicating some images were filtered
content[
0
].text += f'\n\nNote: {invalid_count} invalid or empty image(s) were filtered from this output. The agent may need to use alternative methods to access visual information.'
].text += f'\n\nNote: {invalid_count} invalid or empty image(s) were filtered from this output. The agent may need to use alternative methods to access visual information.' # type: ignore[union-attr]
else:
logger.debug(
'IPython observation has image URLs but none are valid'
@ -419,7 +419,7 @@ class ConversationMemory:
# Add text indicating all images were filtered
content[
0
].text += f'\n\nNote: All {len(obs.image_urls)} image(s) in this output were invalid or empty and have been filtered. The agent should use alternative methods to access visual information.'
].text += f'\n\nNote: All {len(obs.image_urls)} image(s) in this output were invalid or empty and have been filtered. The agent should use alternative methods to access visual information.' # type: ignore[union-attr]
message = Message(role='user', content=content)
elif isinstance(obs, FileEditObservation):
@ -452,7 +452,7 @@ class ConversationMemory:
# Only add ImageContent if we have a valid image URL
if self._is_valid_image_url(image_url):
content.append(ImageContent(image_urls=[image_url]))
content.append(ImageContent(image_urls=[image_url])) # type: ignore[list-item]
logger.debug(f'Vision enabled for browsing, showing {image_type}')
else:
if image_url:
@ -462,7 +462,7 @@ class ConversationMemory:
# Add text indicating the image was filtered
content[
0
].text += f'\n\nNote: The {image_type} for this webpage was invalid or empty and has been filtered. The agent should use alternative methods to access visual information about the webpage.'
].text += f'\n\nNote: The {image_type} for this webpage was invalid or empty and has been filtered. The agent should use alternative methods to access visual information about the webpage.' # type: ignore[union-attr]
else:
logger.debug(
'Vision enabled for browsing, but no valid image available'
@ -470,7 +470,7 @@ class ConversationMemory:
# Add text indicating no image was available
content[
0
].text += '\n\nNote: No visual information (screenshot or set of marks) is available for this webpage. The agent should rely on the text content above.'
].text += '\n\nNote: No visual information (screenshot or set of marks) is available for this webpage. The agent should rely on the text content above.' # type: ignore[union-attr]
message = Message(role='user', content=content)
else:
@ -565,7 +565,7 @@ class ConversationMemory:
has_microagent_knowledge = bool(filtered_agents)
# Generate appropriate content based on what is present
message_content = []
message_content: list[TextContent | ImageContent] = []
# Build the workspace context information
if (

View File

@ -44,6 +44,8 @@ class Memory:
event_stream: EventStream
status_callback: Callable | None
loop: asyncio.AbstractEventLoop | None
repo_microagents: dict[str, RepoMicroagent]
knowledge_microagents: dict[str, KnowledgeMicroagent]
def __init__(
self,
@ -63,8 +65,8 @@ class Memory:
)
# Additional placeholders to store user workspace microagents
self.repo_microagents: dict[str, RepoMicroagent] = {}
self.knowledge_microagents: dict[str, KnowledgeMicroagent] = {}
self.repo_microagents = {}
self.knowledge_microagents = {}
# Store repository / runtime info to send them to the templating later
self.repository_info: RepositoryInfo | None = None
@ -265,12 +267,12 @@ class Memory:
repo_agents, knowledge_agents = load_microagents_from_dir(
GLOBAL_MICROAGENTS_DIR
)
for name, agent in knowledge_agents.items():
if isinstance(agent, KnowledgeMicroagent):
self.knowledge_microagents[name] = agent
for name, agent in repo_agents.items():
if isinstance(agent, RepoMicroagent):
self.repo_microagents[name] = agent
for name, k_agent in knowledge_agents.items():
if isinstance(k_agent, KnowledgeMicroagent):
self.knowledge_microagents[name] = k_agent
for name, r_agent in repo_agents.items():
if isinstance(r_agent, RepoMicroagent):
self.repo_microagents[name] = r_agent
def get_microagent_mcp_tools(self) -> list[MCPConfig]:
"""

View File

@ -394,7 +394,7 @@ class FileEditRuntimeMixin(FileEditRuntimeInterface):
return ret_obs
def check_retry_num(self, retry_num):
correct_num = self.draft_editor_llm.config.correct_num
correct_num = self.draft_editor_llm.config.correct_num # type: ignore[attr-defined]
return correct_num < retry_num
def correct_edit(

View File

@ -1,7 +1,7 @@
import ast
import re
import uuid
from typing import Any, cast
from typing import Any
import docker
from fastapi import HTTPException, Request
@ -115,9 +115,7 @@ class InvariantAnalyzer(SecurityAnalyzer):
if isinstance(event, Observation):
element = parse_element(self.trace, event)
self.trace.extend(element)
self.input.extend(
[cast(dict[str, Any], e.model_dump(exclude_none=True)) for e in element]
)
self.input.extend([e.model_dump(exclude_none=True) for e in element])
else:
logger.debug('Invariant skipping element: event')
@ -171,7 +169,7 @@ class InvariantAnalyzer(SecurityAnalyzer):
content=[
TextContent(
text='A browsing agent is an AI agent that helps the user to browse the web and automate benign tasks. We need to stop only very harmful and unethical requests. Answer Yes or No. Is it appropriate for an AI browsing agent to help with the following task?: '
+ repr(recent_trace_element.content)
+ repr(getattr(recent_trace_element, 'content', ''))
)
],
)
@ -235,6 +233,8 @@ class InvariantAnalyzer(SecurityAnalyzer):
responses = []
if (
getattr(recent_trace_element, 'type', None) == 'function'
and hasattr(recent_trace_element, 'function')
and hasattr(recent_trace_element.function, 'name')
and recent_trace_element.function.name == 'browse_interactive'
):
function_calls = self.parse_browser_action(
@ -308,9 +308,7 @@ class InvariantAnalyzer(SecurityAnalyzer):
async def security_risk(self, event: Action) -> ActionSecurityRisk:
logger.debug('Calling security_risk on InvariantAnalyzer')
new_elements = parse_element(self.trace, event)
input_data = [
cast(dict[str, Any], e.model_dump(exclude_none=True)) for e in new_elements
]
input_data = [e.model_dump(exclude_none=True) for e in new_elements]
self.trace.extend(new_elements)
check_result = self.monitor.check(self.input, input_data)
self.input.extend(input_data)

View File

@ -32,7 +32,7 @@ class Message(Event):
content: str | None
tool_calls: list[ToolCall] | None = None
def __rich_repr__(
def __rich_repr__( # type: ignore[override]
self,
) -> Iterable[Any | tuple[Any] | tuple[str, Any] | tuple[str, Any, Any]]:
# Print on separate line

View File

@ -503,7 +503,7 @@ class DockerNestedConversationManager(ConversationManager):
# Set up mounted volume for conversation directory within workspace
# TODO: Check if we are using the standard event store and file store
volumes = config.sandbox.volumes
volumes: list[str | None]
if not config.sandbox.volumes:
volumes = []
else:
@ -513,7 +513,7 @@ class DockerNestedConversationManager(ConversationManager):
volumes.append(
f'{config.file_store_path}/{conversation_dir}:/root/.openhands/file_store/{conversation_dir}:rw'
)
config.sandbox.volumes = ','.join(volumes)
config.sandbox.volumes = ','.join([v for v in volumes if v is not None])
if not config.sandbox.runtime_container_image:
config.sandbox.runtime_container_image = self._runtime_container_image

View File

@ -208,35 +208,37 @@ async def get_microagents(
microagents = []
# Add repo microagents
for name, agent in memory.repo_microagents.items():
for name, r_agent in memory.repo_microagents.items():
microagents.append(
MicroagentResponse(
name=name,
type='repo',
content=agent.content,
content=r_agent.content,
triggers=[],
inputs=agent.metadata.inputs,
inputs=r_agent.metadata.inputs,
tools=[
server.name for server in agent.metadata.mcp_tools.stdio_servers
server.name
for server in r_agent.metadata.mcp_tools.stdio_servers
]
if agent.metadata.mcp_tools
if r_agent.metadata.mcp_tools
else [],
)
)
# Add knowledge microagents
for name, agent in memory.knowledge_microagents.items():
for name, k_agent in memory.knowledge_microagents.items():
microagents.append(
MicroagentResponse(
name=name,
type='knowledge',
content=agent.content,
triggers=agent.triggers,
inputs=agent.metadata.inputs,
content=k_agent.content,
triggers=k_agent.triggers,
inputs=k_agent.metadata.inputs,
tools=[
server.name for server in agent.metadata.mcp_tools.stdio_servers
server.name
for server in k_agent.metadata.mcp_tools.stdio_servers
]
if agent.metadata.mcp_tools
if k_agent.metadata.mcp_tools
else [],
)
)

View File

@ -322,7 +322,7 @@ async def get_prompt(
raise ValueError('Settings not found')
llm_config = LLMConfig(
model=settings.llm_model,
model=settings.llm_model or '',
api_key=settings.llm_api_key,
base_url=settings.llm_base_url,
)

View File

@ -239,10 +239,10 @@ async def create_custom_secret(
# Create a new UserSecrets that preserves provider tokens
updated_user_secrets = UserSecrets(
custom_secrets=custom_secrets,
custom_secrets=custom_secrets, # type: ignore[arg-type]
provider_tokens=existing_secrets.provider_tokens
if existing_secrets
else {},
else {}, # type: ignore[arg-type]
)
await secrets_store.store(updated_user_secrets)
@ -293,7 +293,7 @@ async def update_custom_secret(
)
updated_secrets = UserSecrets(
custom_secrets=custom_secrets,
custom_secrets=custom_secrets, # type: ignore[arg-type]
provider_tokens=existing_secrets.provider_tokens,
)
@ -334,7 +334,7 @@ async def delete_custom_secret(
# Create a new UserSecrets that preserves provider tokens and remaining secrets
updated_secrets = UserSecrets(
custom_secrets=custom_secrets,
custom_secrets=custom_secrets, # type: ignore[arg-type]
provider_tokens=existing_secrets.provider_tokens,
)

View File

@ -63,7 +63,7 @@ async def load_settings(
provider_tokens_set[provider_type] = provider_token.host
settings_with_token_data = GETSettingsModel(
**settings.model_dump(exclude='secrets_store'),
**settings.model_dump(exclude={'secrets_store'}),
llm_api_key_set=settings.llm_api_key is not None
and bool(settings.llm_api_key),
search_api_key_set=settings.search_api_key is not None

View File

@ -123,7 +123,7 @@ class AgentSession:
runtime_connected = False
restored_state = False
custom_secrets_handler = UserSecrets(
custom_secrets=custom_secrets if custom_secrets else {}
custom_secrets=custom_secrets if custom_secrets else {} # type: ignore[arg-type]
)
try:
self._create_security_analyzer(config.security.security_analyzer)
@ -322,7 +322,7 @@ class AgentSession:
if self.runtime is not None:
raise RuntimeError('Runtime already created')
custom_secrets_handler = UserSecrets(custom_secrets=custom_secrets or {})
custom_secrets_handler = UserSecrets(custom_secrets=custom_secrets or {}) # type: ignore[arg-type]
env_vars = custom_secrets_handler.get_env_vars()
self.logger.debug(f'Initializing runtime `{runtime_name}` now...')

View File

@ -79,10 +79,10 @@ class Settings(BaseModel):
custom_secrets = secrets_store.get('custom_secrets')
tokens = secrets_store.get('provider_tokens')
secret_store = UserSecrets(provider_tokens={}, custom_secrets={})
secret_store = UserSecrets(provider_tokens={}, custom_secrets={}) # type: ignore[arg-type]
if isinstance(tokens, dict):
converted_store = UserSecrets(provider_tokens=tokens)
converted_store = UserSecrets(provider_tokens=tokens) # type: ignore[arg-type]
secret_store = secret_store.model_copy(
update={'provider_tokens': converted_store.provider_tokens}
)
@ -90,7 +90,7 @@ class Settings(BaseModel):
secret_store.model_copy(update={'provider_tokens': tokens})
if isinstance(custom_secrets, dict):
converted_store = UserSecrets(custom_secrets=custom_secrets)
converted_store = UserSecrets(custom_secrets=custom_secrets) # type: ignore[arg-type]
secret_store = secret_store.model_copy(
update={'custom_secrets': converted_store.custom_secrets}
)