diff --git a/gradio_ui/app.py b/gradio_ui/app.py
index 8c03fa1..8a8beb7 100644
--- a/gradio_ui/app.py
+++ b/gradio_ui/app.py
@@ -90,74 +90,87 @@ def save_to_storage(filename: str, data: str) -> None:
except Exception as e:
print(f"Debug: Error saving {filename}: {e}")
+def format_json_content(json_content):
+ """Format JSON content with reasoning and details"""
+ content_json = json.loads(json_content)
+ reasoning = f'
{content_json["reasoning"]}
'
+ details = f'
Detail
{json.dumps(content_json, indent=4, ensure_ascii=False)} '
+ return reasoning, details
+
+def format_message_content(content):
+ """Format message content for gradio chatbox display"""
+ # Handle list-type content (multimodal)
+ if isinstance(content, list):
+ formatted_content = ""
+ json_reasoning = None
+
+ for item in content:
+ if item["type"] == "image_url":
+ formatted_content += f'
'
+ elif item["type"] == "text":
+ if is_json_format(item["text"]):
+ reasoning, details = format_json_content(item["text"])
+ json_reasoning = reasoning
+ formatted_content += details
+ else:
+ formatted_content += item["text"]
+
+ return formatted_content, json_reasoning
+
+ # Handle string content
+ if is_json_format(content):
+ reasoning, _ = format_json_content(content)
+ formatted_content = json.dumps(json.loads(content), indent=4, ensure_ascii=False)
+ return formatted_content, reasoning
+
+ return content, None
+
def process_input(user_input, state, vision_agent_state):
# Reset the stop flag
if state["stop"]:
state["stop"] = False
+
+ # Configure API
config = Config()
config.set_openai_config(base_url=state["base_url"], api_key=state["api_key"], model=state["model"])
- state["messages"].append(
- {
- "role": "user",
- "content": user_input
- }
- )
- state["chatbox_messages"].append(
- {
- "role": "user",
- "content": user_input
- }
- )
- yield state['chatbox_messages']
+
+ # Add user message
+ state["messages"].append({"role": "user", "content": user_input})
+ state["chatbox_messages"].append({"role": "user", "content": user_input})
+ yield state["chatbox_messages"]
+
+ # Process with agent
agent = vision_agent_state["agent"]
for _ in sampling_loop_sync(
model=state["model"],
messages=state["messages"],
- vision_agent = agent,
+ vision_agent=agent,
screen_region=state.get("screen_region", None)
):
if state["stop"]:
return
- state['chatbox_messages'] = []
- for message in state['messages']:
- # convert message["content"] to gradio chatbox format
- if type(message["content"]) is list:
- gradio_chatbox_content = ""
- for content in message["content"]:
- # convert image_url to gradio image format
- if content["type"] == "image_url":
- gradio_chatbox_content += f'
'
- # convert text to gradio text format
- elif content["type"] == "text":
- # agent response is json format and must contains reasoning
- if is_json_format(content["text"]):
- content_json = json.loads(content["text"])
- state['chatbox_messages'].append({
- "role": message["role"],
- "content": f'{content_json["reasoning"]}
'
- })
- gradio_chatbox_content += f'
Detail
{json.dumps(content_json, indent=4, ensure_ascii=False)} '
- else:
- gradio_chatbox_content += content["text"]
-
- state['chatbox_messages'].append({
+
+ # Rebuild chatbox messages from the original messages
+ state["chatbox_messages"] = []
+
+ for message in state["messages"]:
+ formatted_content, json_reasoning = format_message_content(message["content"])
+
+ # Add json reasoning as a separate message if exists
+ if json_reasoning:
+ state["chatbox_messages"].append({
"role": message["role"],
- "content": gradio_chatbox_content
+ "content": json_reasoning
})
- else:
- if is_json_format(message["content"]):
- content_json = json.loads(message["content"])
- state['chatbox_messages'].append({
- "role": message["role"],
- "content": f'{content_json["reasoning"]}
'
- })
+
+ # Add the formatted content
+ state["chatbox_messages"].append({
+ "role": message["role"],
+ "content": formatted_content
+ })
+
+ yield state["chatbox_messages"]
- state['chatbox_messages'].append({
- "role": message["role"],
- "content": message["content"] if not is_json_format(message["content"]) else json.dumps(json.loads(message["content"]), indent=4, ensure_ascii=False)
- })
- yield state['chatbox_messages']
-
def is_json_format(text):
try:
json.loads(text)
@@ -182,7 +195,14 @@ def get_header_image_base64():
print(f"Failed to load header image: {e}")
return None
-
+def update_task_list(state):
+ """Update task list with completed tasks marked"""
+ tasks = state.get("tasks", [])
+ task_status = []
+ for task in tasks:
+ status = "✅" if task.get("completed", False) else "⬜"
+ task_status.append([task.get("description", ""), status])
+ return task_status
def run():
with gr.Blocks(theme=gr.themes.Default()) as demo:
gr.HTML("""
@@ -259,7 +279,15 @@ def run():
stop_button = gr.Button(value="Stop", variant="secondary")
with gr.Row():
- with gr.Column(scale=1):
+ with gr.Column(scale=2):
+ task_list = gr.Dataframe(
+ headers=["Status", "Task"],
+ datatype=["str", "str"],
+ value=[],
+ label="Task List",
+ interactive=False )
+
+ with gr.Column(scale=8):
chatbot = gr.Chatbot(
label="Chatbot History",
autoscroll=True,
@@ -281,6 +309,7 @@ def run():
state["chatbox_messages"] = []
state["responses"] = {}
state["tools"] = {}
+ state["tasks"] = []
return state["chatbox_messages"]
model.change(fn=update_model, inputs=[model, state], outputs=None)
@@ -292,4 +321,6 @@ def run():
submit_button.click(process_input, [chat_input, state, vision_agent_state], chatbot)
stop_button.click(stop_app, [state], None)
base_url.change(fn=update_base_url, inputs=[base_url, state], outputs=None)
+
+
demo.launch(server_name="0.0.0.0", server_port=7888)
diff --git a/gradio_ui/loop.py b/gradio_ui/loop.py
index ad852bf..4fa3fa8 100644
--- a/gradio_ui/loop.py
+++ b/gradio_ui/loop.py
@@ -3,9 +3,7 @@ Agentic sampling loop that calls the Anthropic API and local implenmentation of
"""
import base64
from io import BytesIO
-from time import sleep
import cv2
-from gradio_ui.agent.verification_agent import VerificationAgent
from gradio_ui.agent.vision_agent import VisionAgent
from gradio_ui.tools.screen_capture import get_screenshot
from anthropic.types.beta import (BetaMessageParam)
@@ -31,16 +29,16 @@ def sampling_loop_sync(
task_plan_agent = TaskPlanAgent()
executor = AnthropicExecutor()
task_run_agent = TaskRunAgent()
- parsed_screen_result = parsed_screen(vision_agent)
+ parsed_screen_result = parsed_screen(vision_agent, screen_region)
plan_list = task_plan_agent(messages=messages, parsed_screen_result=parsed_screen_result)
yield
for plan in plan_list:
- execute_task_plan(plan, vision_agent, task_run_agent, executor, messages)
+ execute_task_plan(plan, vision_agent, task_run_agent, executor, messages, screen_region)
yield
-def execute_task_plan(plan, vision_agent, task_run_agent, executor, messages):
- parsed_screen_result = parsed_screen(vision_agent)
+def execute_task_plan(plan, vision_agent, task_run_agent, executor, messages, screen_region):
+ parsed_screen_result = parsed_screen(vision_agent, screen_region)
tools_use_needed, __ = task_run_agent(task_plan=plan, parsed_screen_result=parsed_screen_result, messages=messages)
executor(tools_use_needed, messages)
diff --git a/gradio_ui/tools/screen_capture.py b/gradio_ui/tools/screen_capture.py
index 68bb277..df61757 100644
--- a/gradio_ui/tools/screen_capture.py
+++ b/gradio_ui/tools/screen_capture.py
@@ -6,13 +6,24 @@ from util import tool
OUTPUT_DIR = "./tmp/outputs"
-def get_screenshot():
+def get_screenshot(screen_region):
output_dir = Path(OUTPUT_DIR)
output_dir.mkdir(parents=True, exist_ok=True)
path = output_dir / f"screenshot_{uuid4().hex}.png"
try:
img_io = tool.capture_screen_with_cursor()
- screenshot = Image.open(img_io)
+ screenshot = Image.open(img_io)
+ # Create a black mask of the same size
+ black_mask = Image.new("RGBA", screenshot.size, (0, 0, 0, 255))
+ # If screen_region is provided and valid, copy only that region
+ if screen_region and len(screen_region) == 4:
+ x1, y1, x2, y2 = screen_region
+ region = screenshot.crop((x1, y1, x2, y2))
+ # Paste the region onto the black mask
+ black_mask.paste(region, (x1, y1, x2, y2))
+ # Use the modified image as screenshot
+ screenshot = black_mask
+
screenshot.save(path)
return screenshot, path
except Exception as e: