RecallObservations (#7292)

This commit is contained in:
Engel Nyst
2025-03-17 03:18:22 +01:00
committed by GitHub
parent 106b230fea
commit 51fb1fae88
15 changed files with 283 additions and 205 deletions

View File

@@ -9,7 +9,7 @@ from openhands.events.action.agent import RecallAction
from openhands.events.event import Event, EventSource, RecallType
from openhands.events.observation.agent import (
MicroagentKnowledge,
MicroagentObservation,
RecallObservation,
)
from openhands.events.observation.empty import NullObservation
from openhands.events.stream import EventStream, EventStreamSubscriber
@@ -31,7 +31,7 @@ GLOBAL_MICROAGENTS_DIR = os.path.join(
class Memory:
"""
Memory is a component that listens to the EventStream for information retrieval actions
(a RecallAction) and publishes observations with the content (such as MicroagentObservation).
(a RecallAction) and publishes observations with the content (such as RecallObservation).
"""
sid: str
@@ -75,48 +75,59 @@ class Memory:
async def _on_event(self, event: Event):
"""Handle an event from the event stream asynchronously."""
try:
observation: MicroagentObservation | NullObservation | None = None
if isinstance(event, RecallAction):
# if this is a workspace context recall (on first user message)
# create and add a MicroagentObservation
# with info about repo and runtime.
# create and add a RecallObservation
# with info about repo, runtime, instructions, etc. including microagent knowledge if any
if (
event.source == EventSource.USER
and event.recall_type == RecallType.WORKSPACE_CONTEXT
):
observation = self._on_first_microagent_action(event)
logger.debug('Workspace context recall')
workspace_obs: RecallObservation | NullObservation | None = None
# continue with the next handler, to include knowledge microagents if suitable for this query
assert observation is None or isinstance(
observation, MicroagentObservation
), f'Expected a MicroagentObservation, but got {type(observation)}'
observation = self._on_microagent_action(
event, prev_observation=observation
)
workspace_obs = self._on_workspace_context_recall(event)
if workspace_obs is None:
workspace_obs = NullObservation(content='')
if observation is None:
observation = NullObservation(content='')
# important: this will release the execution flow from waiting for the retrieval to complete
workspace_obs._cause = event.id # type: ignore[union-attr]
# important: this will release the execution flow from waiting for the retrieval to complete
observation._cause = event.id # type: ignore[union-attr]
self.event_stream.add_event(workspace_obs, EventSource.ENVIRONMENT)
return
self.event_stream.add_event(observation, EventSource.ENVIRONMENT)
# Handle knowledge recall (triggered microagents)
elif (
event.source == EventSource.USER
and event.recall_type == RecallType.KNOWLEDGE
):
logger.debug('Microagent knowledge recall')
microagent_obs: RecallObservation | NullObservation | None = None
microagent_obs = self._on_microagent_recall(event)
if microagent_obs is None:
microagent_obs = NullObservation(content='')
# important: this will release the execution flow from waiting for the retrieval to complete
microagent_obs._cause = event.id # type: ignore[union-attr]
self.event_stream.add_event(microagent_obs, EventSource.ENVIRONMENT)
return
except Exception as e:
error_str = f'Error: {str(e.__class__.__name__)}'
logger.error(error_str)
self.send_error_message('STATUS$ERROR_MEMORY', error_str)
return
def _on_first_microagent_action(
def _on_workspace_context_recall(
self, event: RecallAction
) -> MicroagentObservation | None:
"""Add repository and runtime information to the stream as a MicroagentObservation."""
) -> RecallObservation | None:
"""Add repository and runtime information to the stream as a RecallObservation."""
# Create ENVIRONMENT info:
# Create WORKSPACE_CONTEXT info:
# - repository_info
# - runtime_info
# - repository_instructions
# - microagent_knowledge
# Collect raw repository instructions
repo_instructions = ''
@@ -130,9 +141,17 @@ class Memory:
repo_instructions += '\n\n'
repo_instructions += microagent.content
# Find any matched microagents based on the query
microagent_knowledge = self._find_microagent_knowledge(event.query)
# Create observation if we have anything
if self.repository_info or self.runtime_info or repo_instructions:
obs = MicroagentObservation(
if (
self.repository_info
or self.runtime_info
or repo_instructions
or microagent_knowledge
):
obs = RecallObservation(
recall_type=RecallType.WORKSPACE_CONTEXT,
repo_name=self.repository_info.repo_name
if self.repository_info and self.repository_info.repo_name is not None
@@ -149,29 +168,47 @@ class Memory:
if self.runtime_info
and self.runtime_info.additional_agent_instructions is not None
else '',
microagent_knowledge=[],
content='Retrieved environment info',
microagent_knowledge=microagent_knowledge,
content='Added workspace context',
)
return obs
return None
def _on_microagent_action(
def _on_microagent_recall(
self,
event: RecallAction,
prev_observation: MicroagentObservation | None = None,
) -> MicroagentObservation | None:
"""When a microagent action triggers microagents, create a MicroagentObservation with structured data."""
# If there's no query, do nothing
query = event.query.strip()
if not query:
return prev_observation
) -> RecallObservation | None:
"""When a microagent action triggers microagents, create a RecallObservation with structured data."""
assert prev_observation is None or isinstance(
prev_observation, MicroagentObservation
), f'Expected a MicroagentObservation, but got {type(prev_observation)}'
# Find any matched microagents based on the query
microagent_knowledge = self._find_microagent_knowledge(event.query)
# Process text to find suitable microagents and create a MicroagentObservation.
# Create observation if we have anything
if microagent_knowledge:
obs = RecallObservation(
recall_type=RecallType.KNOWLEDGE,
microagent_knowledge=microagent_knowledge,
content='Retrieved knowledge from microagents',
)
return obs
return None
def _find_microagent_knowledge(self, query: str) -> list[MicroagentKnowledge]:
"""Find microagent knowledge based on a query.
Args:
query: The query to search for microagent triggers
Returns:
A list of MicroagentKnowledge objects for matched triggers
"""
recalled_content: list[MicroagentKnowledge] = []
# skip empty queries
if not query:
return recalled_content
# Search for microagent triggers in the query
for name, microagent in self.knowledge_microagents.items():
trigger = microagent.match_trigger(query)
if trigger:
@@ -183,22 +220,7 @@ class Memory:
content=microagent.content,
)
)
if recalled_content:
if prev_observation is not None:
# it may be on the first user message that already found some repo info etc
prev_observation.microagent_knowledge.extend(recalled_content)
else:
# if it's not the first user message, we may not have found any information this step
obs = MicroagentObservation(
recall_type=RecallType.KNOWLEDGE,
microagent_knowledge=recalled_content,
content='Retrieved knowledge from microagents',
)
return obs
return prev_observation
return recalled_content
def load_user_workspace_microagents(
self, user_microagents: list[BaseMicroAgent]