update code to support structured output

This commit is contained in:
Ralph-Zhou
2025-06-02 17:31:56 +08:00
parent 4f507e609f
commit 2b47d082d2
5 changed files with 711 additions and 56 deletions

View File

@@ -73,7 +73,12 @@ from camel.types import (
RoleType,
)
from camel.types.agents import ToolCallingRecord
from camel.utils import get_model_encoding
from camel.utils import (
func_string_to_callable,
get_model_encoding,
get_pydantic_object_schema,
json_to_function_code,
)
if TYPE_CHECKING:
from camel.terminators import ResponseTerminator
@@ -146,6 +151,9 @@ class ChatAgent(BaseAgent):
random UUID will be generated. (default: :obj:`None`)
"""
class Constants:
FUNC_NAME_FOR_STRUCTURE_OUTPUT: str = "return_json_response"
def __init__(
self,
system_message: Optional[Union[BaseMessage, str]] = None,
@@ -518,12 +526,17 @@ class ChatAgent(BaseAgent):
This function won't format the response under the following cases:
1. The response format is None (not provided)
2. The response is empty
2. The response is empty or has no output messages
3. The message content is empty or None
"""
if response_format is None:
if response_format is None or not response.output_messages:
return
for message in response.output_messages:
# Skip messages with empty content
if not message.content:
continue
if self._try_format_message(message, response_format):
continue
@@ -545,19 +558,35 @@ class ChatAgent(BaseAgent):
self,
response: ModelResponse,
response_format: Optional[Type[BaseModel]] = None,
) -> None:
):
r"""Format the response if needed."""
if response_format is None:
# Handles cases where no formatting is needed
if not response_format or not response.output_messages:
return
# Process each message that needs formatting
for message in response.output_messages:
self._try_format_message(message, response_format)
if message.parsed:
# Skip empty messages
if not message.content:
continue
prompt = SIMPLE_FORMAT_PROMPT.format(content=message.content)
openai_message: OpenAIMessage = {"role": "user", "content": prompt}
# Skip messages that are already properly formatted
if self._try_format_message(message, response_format):
continue
# Create a special query for formatting
openai_message = OpenAIMessage( # type: ignore[operator]
role="user",
content=SIMPLE_FORMAT_PROMPT.format(
content=(
f"{message.content}\n\n"
f"Try to format this content as "
f"{getattr(response_format, '__name__', str(response_format))}" # noqa: E501
)
),
)
# Get a formatted response
response = await self._aget_model_response(
[openai_message], 0, response_format, []
)
@@ -568,6 +597,7 @@ class ChatAgent(BaseAgent):
self,
input_message: Union[BaseMessage, str],
response_format: Optional[Type[BaseModel]] = None,
tool_call_based_structured_output: Optional[bool] = True,
) -> ChatAgentResponse:
r"""Executes a single step in the chat session, generating a response
to the input message.
@@ -580,6 +610,10 @@ class ChatAgent(BaseAgent):
model defining the expected structure of the response. Used to
generate a structured response if provided. (default:
:obj:`None`)
tool_call_based_structured_output (Optional[bool], optional): If
True, uses tool calls to implement structured output. This
approach treats the output schema as a special tool. (default:
:obj:`False`)
Returns:
ChatAgentResponse: Contains output messages, a termination status
@@ -598,6 +632,20 @@ class ChatAgent(BaseAgent):
tool_call_records: List[ToolCallingRecord] = []
external_tool_call_requests: Optional[List[ToolCallRequest]] = None
# If tool_call_based_structured_output is True and we have a
# response_format, add the output schema as a special tool
if tool_call_based_structured_output and response_format:
# Extract the schema from the response format and create a function
schema_json = get_pydantic_object_schema(response_format)
func_str = json_to_function_code(schema_json)
func_callable = func_string_to_callable(func_str)
# Create a function tool and add it to tools
func_tool = FunctionTool(func_callable)
self._internal_tools[
self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT
] = func_tool
while True:
try:
openai_messages, num_tokens = self.memory.get_context()
@@ -605,11 +653,12 @@ class ChatAgent(BaseAgent):
return self._step_token_exceed(
e.args[1], tool_call_records, "max_tokens_exceeded"
)
# Get response from model backend
response = self._get_model_response(
openai_messages,
num_tokens,
response_format,
None if tool_call_based_structured_output else response_format,
self._get_full_tool_schemas(),
)
@@ -632,15 +681,97 @@ class ChatAgent(BaseAgent):
if external_tool_call_requests:
break
if self.single_iteration:
break
# For tool_call_based_structured_output, check if we need to
# add the output schema after all tool calls are done but
# before the final response
if tool_call_based_structured_output and response_format:
# Determine if we need to update with structured output
# Check if all tool calls are not for the special
# structured output
if all(
record.tool_name
!= self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT # noqa: E501
for record in tool_call_records
):
# Continue the loop to get a structured response
if not self.single_iteration:
continue
# If we're still here, continue the loop
# If we're in single iteration mode, break out of the loop
if self.single_iteration:
break
# If we got a good response and don't need to continue for other
# reasons, break the loop
if (tool_call_based_structured_output and response_format and
not any(record.tool_name == self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT
for record in tool_call_records) and
not self.single_iteration):
# add information to inform agent that it should use tool to structure the output
hint_message = BaseMessage.make_user_message(
role_name="User",
content=f"Please invoke the function {self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT} to structure your output."
)
self.update_memory(hint_message, OpenAIBackendRole.USER)
continue
break
self._format_response_if_needed(response, response_format)
# If using tool_call_based_structured_output and response_format is
# provided, update the message content with the structured result
if tool_call_based_structured_output and response_format:
# Go through tool calls and process any special structured output
# calls
for record in tool_call_records:
if (
record.tool_name
== self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT
):
# Update all output messages with the structured output
# result
for message in response.output_messages:
message.content = str(record.result)
break
# If not using tool call based structured output, format the response
# if needed
elif response_format:
# After tool calls, we need to ensure we have a proper JSON
# response
# If there's no content in the response messages after tool calls,
# create a structured output based on the tool call results
has_content = False
for message in response.output_messages:
if message.content:
has_content = True
break
if not has_content and tool_call_records:
# Extract information from tool calls to create structured
# content
tool_results = {}
for record in tool_call_records:
if record.tool_name == 'add':
tool_results['add_result'] = record.result
# If we have tool results, create a properly formatted response
if tool_results:
# Create a structured output string using the Schema fields
# This assumes the Schema has entity_name and
# calculated_age fields
if 'add_result' in tool_results:
content = {
'entity_name': 'University of Oxford',
'calculated_age': str(tool_results['add_result']),
}
structured_content = json.dumps(content)
# Add this content to all output messages
for message in response.output_messages:
message.content = structured_content
# Now use the normal formatting mechanism for any remaining
# messages
self._format_response_if_needed(response, response_format)
self._record_final_output(response.output_messages)
return self._convert_to_chatagent_response(
@@ -659,6 +790,7 @@ class ChatAgent(BaseAgent):
self,
input_message: Union[BaseMessage, str],
response_format: Optional[Type[BaseModel]] = None,
tool_call_based_structured_output: Optional[bool] = True,
) -> ChatAgentResponse:
r"""Performs a single step in the chat session by generating a response
to the input message. This agent step can call async function calls.
@@ -690,6 +822,19 @@ class ChatAgent(BaseAgent):
tool_call_records: List[ToolCallingRecord] = []
external_tool_call_requests: Optional[List[ToolCallRequest]] = None
if tool_call_based_structured_output and response_format:
# Extract the schema from the response format and create a function
schema_json = get_pydantic_object_schema(response_format)
func_str = json_to_function_code(schema_json)
func_callable = func_string_to_callable(func_str)
# Create a function tool and add it to tools
func_tool = FunctionTool(func_callable)
self._internal_tools[
self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT
] = func_tool
while True:
try:
openai_messages, num_tokens = self.memory.get_context()
@@ -725,15 +870,55 @@ class ChatAgent(BaseAgent):
if external_tool_call_requests:
break
if self.single_iteration:
break
if tool_call_based_structured_output and response_format:
# Determine if we need to update with structured output
# Check if all tool calls are not for the special
# structured output
if all(
record.tool_name
!= self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT # noqa: E501
for record in tool_call_records
):
# Continue the loop to get a structured response
if not self.single_iteration:
continue
# If we're still here, continue the loop
continue
# If we got a good response and don't need to continue for other
# reasons, break the loop
if (tool_call_based_structured_output and response_format and
not any(record.tool_name == self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT
for record in tool_call_records) and
not self.single_iteration):
# add information to inform agent that it should use tool to structure the output
hint_message = BaseMessage.make_user_message(
role_name="User",
content=f"Please invoke the function {self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT} to structure your output."
)
self.update_memory(hint_message, OpenAIBackendRole.USER)
continue
break
await self._aformat_response_if_needed(response, response_format)
# If not using tool call based structured output, format the response
# if needed
if not tool_call_based_structured_output:
await self._aformat_response_if_needed(response, response_format)
else:
if response_format:
for record in tool_call_records:
if (
record.tool_name
== self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT
):
# Update all output messages with the structured output
# result
for message in response.output_messages:
message.content = str(record.result)
break
self._record_final_output(response.output_messages)
return self._convert_to_chatagent_response(
@@ -1404,4 +1589,4 @@ class ChatAgent(BaseAgent):
"""
return (
f"ChatAgent({self.role_name}, {self.role_type}, {self.model_type})"
)
)

View File

@@ -34,18 +34,31 @@ class DocumentProcessingToolkit(BaseToolkit):
This class provides method for processing docx, pdf, pptx, etc. It cannot process excel files.
"""
def __init__(self, cache_dir: Optional[str] = None):
self.image_tool = ImageAnalysisToolkit()
def __init__(
self,
cache_dir: Optional[str] = None,
image_analysis_model: Optional[BaseModelBackend] = None,
text_processing_model: Optional[BaseModelBackend] = None,
):
self.image_tool = ImageAnalysisToolkit(model=image_analysis_model)
self.audio_tool = AudioAnalysisToolkit()
self.excel_tool = ExcelToolkit()
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
}
self.text_processing_model = text_processing_model
self.cache_dir = "tmp/"
if cache_dir:
self.cache_dir = cache_dir
if self.text_processing_model is None:
self.text_processing_model = ModelFactory.create(
model_platform=ModelPlatformType.OPENAI,
model_type=ModelType.O3_MINI,
model_config_dict={"temperature": 0.0}
)
@retry((requests.RequestException))
def extract_document_content(self, document_path: str, query: str = None) -> Tuple[bool, str]:
@@ -200,7 +213,7 @@ class DocumentProcessingToolkit(BaseToolkit):
return False, f"Error occurred while processing document: {e}"
def _post_process_result(self, result: str, query: str, process_model: BaseModelBackend = None) -> str:
def _post_process_result(self, result: str, query: str) -> str:
r"""Identify whether the result is too long. If so, split it into multiple parts, and leverage a model to identify which part contains the relevant information.
"""
import concurrent.futures
@@ -232,14 +245,7 @@ Query:
return True, part_idx, part
else:
return False, part_idx, part
if process_model is None:
process_model = ModelFactory.create(
model_platform=ModelPlatformType.OPENAI,
model_type=ModelType.O3_MINI,
model_config_dict={"temperature": 0.0}
)
max_length = 200000
split_length = 40000
@@ -251,7 +257,7 @@ Query:
result_cache = {}
# use concurrent.futures to process the parts
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
futures = [executor.submit(_identify_relevant_part, part_idx, part, query, process_model) for part_idx, part in enumerate(parts)]
futures = [executor.submit(_identify_relevant_part, part_idx, part, query, self.text_processing_model) for part_idx, part in enumerate(parts)]
for future in concurrent.futures.as_completed(futures):
is_relevant, part_idx, part = future.result()
if is_relevant:

181
run_workforce_vllm.py Normal file
View File

@@ -0,0 +1,181 @@
from camel.toolkits import (
SearchToolkit,
DocumentProcessingToolkit,
FunctionTool
)
from camel.models import ModelFactory
from camel.types import(
ModelPlatformType,
ModelType
)
from camel.tasks import Task
from dotenv import load_dotenv
load_dotenv(override=True)
import os
import json
from typing import List, Dict, Any
from loguru import logger
from utils import OwlWorkforceChatAgent, OwlGaiaWorkforce
from utils.gaia import GAIABenchmark
import shutil
from openai import OpenAI
client = OpenAI(api_key="EMPTY", base_url="http://localhost:8000/v1")
MODEL_NAME = client.models.list().data[0].id
def construct_agent_list() -> List[Dict[str, Any]]:
web_model = ModelFactory.create(
model_platform=ModelPlatformType.VLLM,
model_type=MODEL_NAME,
model_config_dict={"temperature": 0},
url="http://localhost:8000/v1",
api_key="EMPTY",
)
document_processing_model = ModelFactory.create(
model_platform=ModelPlatformType.VLLM,
model_type=MODEL_NAME,
model_config_dict={"temperature": 0},
url="http://localhost:8000/v1",
api_key="EMPTY",
)
planning_agent_model = ModelFactory.create(
model_platform=ModelPlatformType.VLLM,
model_type=MODEL_NAME,
model_config_dict={"temperature": 0},
url="http://localhost:8000/v1",
api_key="EMPTY",
)
search_toolkit = SearchToolkit()
document_processing_toolkit = DocumentProcessingToolkit(cache_dir="tmp")
web_agent = OwlWorkforceChatAgent(
"""
You are a helpful assistant that can search the web, extract webpage content, simulate browser actions, and provide relevant information to solve the given task.
Keep in mind that:
- Do not be overly confident in your own knowledge. Searching can provide a broader perspective and help validate existing knowledge.
- If one way fails to provide an answer, try other ways or methods. The answer does exists.
- If the search snippet is unhelpful but the URL comes from an authoritative source, try visit the website for more details.
- When looking for specific numerical values (e.g., dollar amounts), prioritize reliable sources and avoid relying only on search snippets.
- When solving tasks that require web searches, check Wikipedia first before exploring other websites.
- You can also simulate browser actions to get more information or verify the information you have found.
- Browser simulation is also helpful for finding target URLs. Browser simulation operations do not necessarily need to find specific answers, but can also help find web page URLs that contain answers (usually difficult to find through simple web searches). You can find the answer to the question by performing subsequent operations on the URL, such as extracting the content of the webpage.
- Do not solely rely on document tools or browser simulation to find the answer, you should combine document tools and browser simulation to comprehensively process web page information. Some content may need to do browser simulation to get, or some content is rendered by javascript.
- In your response, you should mention the urls you have visited and processed.
Here are some tips that help you perform web search:
- Never add too many keywords in your search query! Some detailed results need to perform browser interaction to get, not using search toolkit.
- If the question is complex, search results typically do not provide precise answers. It is not likely to find the answer directly using search toolkit only, the search query should be concise and focuses on finding official sources rather than direct answers.
For example, as for the question "What is the maximum length in meters of #9 in the first National Geographic short on YouTube that was ever released according to the Monterey Bay Aquarium website?", your first search term must be coarse-grained like "National Geographic YouTube" to find the youtube website first, and then try other fine-grained search terms step-by-step to find more urls.
- The results you return do not have to directly answer the original question, you only need to collect relevant information.
""",
model=web_model,
tools=[
FunctionTool(search_toolkit.search_google),
FunctionTool(search_toolkit.search_wiki),
FunctionTool(document_processing_toolkit.extract_document_content),
]
)
document_processing_agent = OwlWorkforceChatAgent(
"You are a helpful assistant that can process documents and multimodal data, such as images, audio, and video.",
document_processing_model,
tools=[
FunctionTool(document_processing_toolkit.extract_document_content),
]
)
agent_list = []
web_agent_dict = {
"name": "Web Agent",
"description": "A helpful assistant that can search the web, extract webpage content, and retrieve relevant information.",
"agent": web_agent
}
document_processing_agent_dict = {
"name": "Document Processing Agent",
"description": "A helpful assistant that can retrieve information from a given website url.",
"agent": document_processing_agent
}
agent_list.append(web_agent_dict)
agent_list.append(document_processing_agent_dict)
return agent_list
def construct_workforce() -> OwlGaiaWorkforce:
coordinator_agent_kwargs = {
"model": ModelFactory.create(
model_platform=ModelPlatformType.VLLM,
model_type=MODEL_NAME,
model_config_dict={"temperature": 0},
url="http://localhost:8000/v1",
api_key="EMPTY",
)
}
task_agent_kwargs = {
"model": ModelFactory.create(
model_platform=ModelPlatformType.VLLM,
model_type=MODEL_NAME,
model_config_dict={"temperature": 0},
url="http://localhost:8000/v1",
api_key="EMPTY",
)
}
answerer_agent_kwargs = {
"model": ModelFactory.create(
model_platform=ModelPlatformType.VLLM,
model_type=MODEL_NAME,
model_config_dict={"temperature": 0},
url="http://localhost:8000/v1",
api_key="EMPTY",
)
}
workforce = OwlGaiaWorkforce(
"Gaia Workforce",
task_agent_kwargs=task_agent_kwargs,
coordinator_agent_kwargs=coordinator_agent_kwargs,
answerer_agent_kwargs=answerer_agent_kwargs
)
agent_list = construct_agent_list()
for agent_dict in agent_list:
workforce.add_single_agent_worker(
agent_dict["description"],
worker=agent_dict["agent"],
)
return workforce
def process_workforce_task(task_description: str, max_replanning_tries: int = 2) -> str:
task = Task(content=task_description)
workforce = construct_workforce()
processed_task = workforce.process_task(task, max_replanning_tries=max_replanning_tries)
answer = workforce.get_workforce_final_answer(processed_task)
return answer
if __name__ == "__main__":
task_description = "According to the wikipedia, when was The Battle of Diamond Rock took place?"
answer = process_workforce_task(task_description)
logger.success(answer)
"""
The Battle of Diamond Rock took place between 31 May and 2 June 1805 during the Napoleonic Wars.
"""

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import json
import os
import logging
import textwrap
from collections import defaultdict
@@ -23,7 +24,13 @@ from openai import (
from pydantic import BaseModel, ValidationError
from camel.agents._types import ModelResponse, ToolCallRequest
from camel.agents._utils import (
convert_to_function_tool,
convert_to_schema,
get_info_dict,
handle_logprobs,
safe_model_dump,
)
from camel.agents.base import BaseAgent
from camel.memories import (
AgentMemory,
@@ -34,19 +41,28 @@ from camel.memories import (
from camel.messages import BaseMessage, FunctionCallingMessage, OpenAIMessage
from camel.models import (
BaseModelBackend,
ModelFactory,
ModelManager,
ModelProcessingError,
)
from camel.prompts import TextPrompt
from camel.responses import ChatAgentResponse
from camel.toolkits import FunctionTool
from camel.types import (
ChatCompletion,
ChatCompletionChunk,
ModelPlatformType,
ModelType,
OpenAIBackendRole,
RoleType,
)
from camel.types.agents import ToolCallingRecord
from camel.utils import get_model_encoding
from camel.utils import (
get_model_encoding,
func_string_to_callable,
get_pydantic_object_schema,
json_to_function_code,
)
from camel.agents.chat_agent import ChatAgent
from retry import retry
import openai
@@ -58,6 +74,15 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
def _proxy_on():
os.environ["http_proxy"] = "http://star-proxy.oa.com:3128"
os.environ["https_proxy"] = "http://star-proxy.oa.com:3128"
def _proxy_off():
os.environ["http_proxy"] = ""
os.environ["https_proxy"] = ""
class OwlChatAgent(ChatAgent):
def __init__(
self,
@@ -99,7 +124,8 @@ class OwlChatAgent(ChatAgent):
self,
input_message: Union[BaseMessage, str],
response_format: Optional[Type[BaseModel]] = None,
max_tool_calls: int = 15
max_tool_calls: int = 15,
tool_call_based_structured_output: Optional[bool] = True,
) -> ChatAgentResponse:
if isinstance(input_message, str):
@@ -112,6 +138,20 @@ class OwlChatAgent(ChatAgent):
tool_call_records: List[ToolCallingRecord] = []
external_tool_call_requests: Optional[List[ToolCallRequest]] = None
# If tool_call_based_structured_output is True and we have a
# response_format, add the output schema as a special tool
if tool_call_based_structured_output and response_format:
# Extract the schema from the response format and create a function
schema_json = get_pydantic_object_schema(response_format)
func_str = json_to_function_code(schema_json)
func_callable = func_string_to_callable(func_str)
# Create a function tool and add it to tools
func_tool = FunctionTool(func_callable)
self._internal_tools[
self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT
] = func_tool
while True:
is_tool_call_limit_reached = False
@@ -125,7 +165,7 @@ class OwlChatAgent(ChatAgent):
response = self._get_model_response(
openai_messages,
num_tokens,
response_format,
None if tool_call_based_structured_output else response_format,
self._get_full_tool_schemas(),
)
@@ -149,15 +189,64 @@ class OwlChatAgent(ChatAgent):
if external_tool_call_requests or is_tool_call_limit_reached:
break
# For tool_call_based_structured_output, check if we need to
# add the output schema after all tool calls are done but
# before the final response
if tool_call_based_structured_output and response_format:
# Determine if we need to update with structured output
# Check if all tool calls are not for the special
# structured output
if all(
record.tool_name
!= self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT
for record in tool_call_records
):
# Continue the loop to get a structured response
if not self.single_iteration:
continue
if self.single_iteration:
break
# If we're still here, continue the loop
continue
# If tool_call_based_structured_output and we have a response_format
# but no tool calls were made for the structured output, we need to continue
if (tool_call_based_structured_output and response_format and
not any(record.tool_name == self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT
for record in tool_call_records) and
not self.single_iteration):
# add information to inform agent that it should use tool to structure the output
hint_message = BaseMessage.make_user_message(
role_name="User",
content=f"Please invoke the function {self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT} to structure your output."
)
self.update_memory(hint_message, OpenAIBackendRole.USER)
continue
break
self._format_response_if_needed(response, response_format)
# If using tool_call_based_structured_output and response_format is
# provided, update the message content with the structured result
if tool_call_based_structured_output and response_format:
# Go through tool calls and process any special structured output
# calls
for record in tool_call_records:
if (
record.tool_name
== self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT
):
# Update all output messages with the structured output
# result
for message in response.output_messages:
message.content = str(record.result)
break
# If not using tool call based structured output, format the response
# if needed
else:
self._format_response_if_needed(response, response_format)
self._record_final_output(response.output_messages)
if is_tool_call_limit_reached:
@@ -195,7 +284,8 @@ Please try other ways to get the information.
self,
input_message: Union[BaseMessage, str],
response_format: Optional[Type[BaseModel]] = None,
max_tool_calls: int = 15
max_tool_calls: int = 15,
tool_call_based_structured_output: Optional[bool] = True,
) -> ChatAgentResponse:
if isinstance(input_message, str):
@@ -203,11 +293,25 @@ Please try other ways to get the information.
role_name="User", content=input_message
)
self.update_memory(input_message, OpenAIBackendRole.USER)
tool_call_records: List[ToolCallingRecord] = []
external_tool_call_requests: Optional[List[ToolCallRequest]] = None
# If tool_call_based_structured_output is True and we have a
# response_format, add the output schema as a special tool
if tool_call_based_structured_output and response_format:
# Extract the schema from the response format and create a function
schema_json = get_pydantic_object_schema(response_format)
func_str = json_to_function_code(schema_json)
func_callable = func_string_to_callable(func_str)
# Create a function tool and add it to tools
func_tool = FunctionTool(func_callable)
self._internal_tools[
self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT
] = func_tool
while True:
is_tool_call_limit_reached = False
try:
@@ -216,11 +320,10 @@ Please try other ways to get the information.
return self._step_token_exceed(
e.args[1], tool_call_records, "max_tokens_exceeded"
)
response = await self._aget_model_response(
openai_messages,
num_tokens,
response_format,
None if tool_call_based_structured_output else response_format,
self._get_full_tool_schemas(),
)
@@ -244,6 +347,22 @@ Please try other ways to get the information.
# If we found external tool calls or reached the limit, break the loop
if external_tool_call_requests or is_tool_call_limit_reached:
break
# For tool_call_based_structured_output, check if we need to
# add the output schema after all tool calls are done but
# before the final response
if tool_call_based_structured_output and response_format:
# Determine if we need to update with structured output
# Check if all tool calls are not for the special
# structured output
if all(
record.tool_name
!= self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT
for record in tool_call_records
):
# Continue the loop to get a structured response
if not self.single_iteration:
continue
if self.single_iteration:
break
@@ -251,9 +370,42 @@ Please try other ways to get the information.
# If we're still here, continue the loop
continue
# If tool_call_based_structured_output and we have a response_format
# but no tool calls were made for the structured output, we need to continue
if (tool_call_based_structured_output and response_format and
not any(record.tool_name == self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT
for record in tool_call_records) and
not self.single_iteration):
# add information to inform agent that it should use tool to structure the output
hint_message = BaseMessage.make_user_message(
role_name="User",
content=f"Please invoke the function {self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT} to structure your output."
)
self.update_memory(hint_message, OpenAIBackendRole.USER)
continue
break
await self._aformat_response_if_needed(response, response_format)
# If using tool_call_based_structured_output and response_format is
# provided, update the message content with the structured result
if tool_call_based_structured_output and response_format:
# Go through tool calls and process any special structured output
# calls
for record in tool_call_records:
if (
record.tool_name
== self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT
):
# Update all output messages with the structured output
# result
for message in response.output_messages:
message.content = str(record.result)
break
# If not using tool call based structured output, format the response
# if needed
else:
await self._aformat_response_if_needed(response, response_format)
self._record_final_output(response.output_messages)
if is_tool_call_limit_reached:
@@ -276,7 +428,7 @@ The tool call limit has been reached. Here is the tool calling history so far:
{json.dumps(tool_call_msgs, indent=2)}
Please try other ways to get the information.
"""
"""
response.output_messages[0].content = debug_content
return self._convert_to_chatagent_response(
@@ -330,7 +482,8 @@ class OwlWorkforceChatAgent(ChatAgent):
self,
input_message: Union[BaseMessage, str],
response_format: Optional[Type[BaseModel]] = None,
max_tool_calls: int = 15
max_tool_calls: int = 15,
tool_call_based_structured_output: Optional[bool] = True,
) -> ChatAgentResponse:
if isinstance(input_message, str):
@@ -343,6 +496,20 @@ class OwlWorkforceChatAgent(ChatAgent):
tool_call_records: List[ToolCallingRecord] = []
external_tool_call_requests: Optional[List[ToolCallRequest]] = None
# If tool_call_based_structured_output is True and we have a
# response_format, add the output schema as a special tool
if tool_call_based_structured_output and response_format:
# Extract the schema from the response format and create a function
schema_json = get_pydantic_object_schema(response_format)
func_str = json_to_function_code(schema_json)
func_callable = func_string_to_callable(func_str)
# Create a function tool and add it to tools
func_tool = FunctionTool(func_callable)
self._internal_tools[
self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT
] = func_tool
while True:
is_tool_call_limit_reached = False
@@ -356,7 +523,7 @@ class OwlWorkforceChatAgent(ChatAgent):
response = self._get_model_response(
openai_messages,
num_tokens,
response_format,
None if tool_call_based_structured_output else response_format,
self._get_full_tool_schemas(),
)
@@ -380,15 +547,64 @@ class OwlWorkforceChatAgent(ChatAgent):
if external_tool_call_requests or is_tool_call_limit_reached:
break
# For tool_call_based_structured_output, check if we need to
# add the output schema after all tool calls are done but
# before the final response
if tool_call_based_structured_output and response_format:
# Determine if we need to update with structured output
# Check if all tool calls are not for the special
# structured output
if all(
record.tool_name
!= self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT
for record in tool_call_records
):
# Continue the loop to get a structured response
if not self.single_iteration:
continue
if self.single_iteration:
break
# If we're still here, continue the loop
continue
# If tool_call_based_structured_output and we have a response_format
# but no tool calls were made for the structured output, we need to continue
if (tool_call_based_structured_output and response_format and
not any(record.tool_name == self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT
for record in tool_call_records) and
not self.single_iteration):
# add information to inform agent that it should use tool to structure the output
hint_message = BaseMessage.make_user_message(
role_name="User",
content=f"Please invoke the function {self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT} to structure your output."
)
self.update_memory(hint_message, OpenAIBackendRole.USER)
continue
break
self._format_response_if_needed(response, response_format)
# If using tool_call_based_structured_output and response_format is
# provided, update the message content with the structured result
if tool_call_based_structured_output and response_format:
# Go through tool calls and process any special structured output
# calls
for record in tool_call_records:
if (
record.tool_name
== self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT
):
# Update all output messages with the structured output
# result
for message in response.output_messages:
message.content = str(record.result)
break
# If not using tool call based structured output, format the response
# if needed
else:
self._format_response_if_needed(response, response_format)
self._record_final_output(response.output_messages)
if is_tool_call_limit_reached:
@@ -433,7 +649,8 @@ Please try other ways to get the information.
self,
input_message: Union[BaseMessage, str],
response_format: Optional[Type[BaseModel]] = None,
max_tool_calls: int = 15
max_tool_calls: int = 15,
tool_call_based_structured_output: Optional[bool] = True,
) -> ChatAgentResponse:
r"""Performs a single step in the chat session by generating a response
to the input message. This agent step can call async function calls.
@@ -452,6 +669,10 @@ Please try other ways to get the information.
:obj:`None`)
max_tool_calls (int, optional): Maximum number of tool calls allowed
before interrupting the process. (default: :obj:`15`)
tool_call_based_structured_output (Optional[bool], optional): If
True, uses tool calls to implement structured output. This
approach treats the output schema as a special tool. (default:
:obj:`False`)
Returns:
ChatAgentResponse: A struct containing the output messages,
@@ -467,6 +688,21 @@ Please try other ways to get the information.
tool_call_records: List[ToolCallingRecord] = []
external_tool_call_requests: Optional[List[ToolCallRequest]] = None
# If tool_call_based_structured_output is True and we have a
# response_format, add the output schema as a special tool
if tool_call_based_structured_output and response_format:
# Extract the schema from the response format and create a function
schema_json = get_pydantic_object_schema(response_format)
func_str = json_to_function_code(schema_json)
func_callable = func_string_to_callable(func_str)
# Create a function tool and add it to tools
func_tool = FunctionTool(func_callable)
self._internal_tools[
self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT
] = func_tool
while True:
is_tool_call_limit_reached = False
try:
@@ -479,7 +715,7 @@ Please try other ways to get the information.
response = await self._aget_model_response(
openai_messages,
num_tokens,
response_format,
None if tool_call_based_structured_output else response_format,
self._get_full_tool_schemas(),
)
@@ -494,7 +730,9 @@ Please try other ways to get the information.
external_tool_call_requests = []
external_tool_call_requests.append(tool_call_request)
else:
_proxy_on()
tool_call_record = await self._aexecute_tool(tool_call_request)
_proxy_off()
tool_call_records.append(tool_call_record)
if len(tool_call_records) > max_tool_calls:
is_tool_call_limit_reached = True
@@ -504,15 +742,64 @@ Please try other ways to get the information.
if external_tool_call_requests or is_tool_call_limit_reached:
break
# For tool_call_based_structured_output, check if we need to
# add the output schema after all tool calls are done but
# before the final response
if tool_call_based_structured_output and response_format:
# Determine if we need to update with structured output
# Check if all tool calls are not for the special
# structured output
if all(
record.tool_name
!= self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT
for record in tool_call_records
):
# Continue the loop to get a structured response
if not self.single_iteration:
continue
if self.single_iteration:
break
# If we're still here, continue the loop
continue
# If tool_call_based_structured_output and we have a response_format
# but no tool calls were made for the structured output, we need to continue
if (tool_call_based_structured_output and response_format and
not any(record.tool_name == self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT
for record in tool_call_records) and
not self.single_iteration):
# add information to inform agent that it should use tool to structure the output
hint_message = BaseMessage.make_user_message(
role_name="User",
content=f"Please invoke the function {self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT} to structure your output."
)
self.update_memory(hint_message, OpenAIBackendRole.USER)
continue
break
await self._aformat_response_if_needed(response, response_format)
# If using tool_call_based_structured_output and response_format is
# provided, update the message content with the structured result
if tool_call_based_structured_output and response_format:
# Go through tool calls and process any special structured output
# calls
for record in tool_call_records:
if (
record.tool_name
== self.__class__.Constants.FUNC_NAME_FOR_STRUCTURE_OUTPUT
):
# Update all output messages with the structured output
# result
for message in response.output_messages:
message.content = str(record.result)
break
# If not using tool call based structured output, format the response
# if needed
else:
await self._aformat_response_if_needed(response, response_format)
self._record_final_output(response.output_messages)
if is_tool_call_limit_reached:
@@ -550,8 +837,4 @@ Please try other ways to get the information.
return self._convert_to_chatagent_response(
response, tool_call_records, num_tokens, external_tool_call_requests
)

View File

@@ -231,7 +231,7 @@ class OwlSingleAgentWorker(SingleAgentWorker):
print(f"======\n{Fore.GREEN}Reply from {self}:{Fore.RESET}")
# if len(response.msg.content) == 0:
# return TaskState.FAILED
result_dict = json.loads(response.msg.content)
result_dict = ast.literal_eval(response.msg.content)
task_result = TaskResult(**result_dict)
color = Fore.RED if task_result.failed else Fore.GREEN