simplify ocr code

This commit is contained in:
yuruo 2025-03-19 22:02:08 +08:00
parent b7435e9b17
commit a48ff2d37d
7 changed files with 49 additions and 515 deletions

View File

@ -7,7 +7,6 @@ from gradio_ui.tools.computer import Action
class TaskPlanAgent(BaseAgent):
def __call__(self, messages, parsed_screen_result):
screen_info = str([{"box_id": i.element_id, "caption": i.caption, "text": i.text} for i in parsed_screen_result['parsed_content_list']])
messages[-1] = {"role": "user",
"content": [
{"type": "text", "text": messages[-1]["content"]},
@ -17,7 +16,7 @@ class TaskPlanAgent(BaseAgent):
}
]
}
response = run(messages, user_prompt=system_prompt.format(screen_info=screen_info, action_list=str(Action)), response_format=TaskPlanResponse)
response = run(messages, user_prompt=system_prompt.format(action_list=str(Action)), response_format=TaskPlanResponse)
print("task_plan_agent response: ", response)
return json.loads(response)
@ -31,8 +30,6 @@ system_prompt = """
### 目标 ###
你是自动化操作规划专家根据屏幕内容和用户需求规划精确可执行的操作序列
当前屏幕内容如下
{screen_info}
### 输入 ###
1. 用户需求文本描述形式的任务目标

View File

@ -25,8 +25,7 @@ class TaskRunAgent(BaseAgent):
task_list = json.loads(messages[1]['content'])['task_list']
# Convert task_list to a numbered format
formatted_task_list = "\n".join([f"{i}.{task}" for i, task in enumerate(task_list)])
screen_info = str([{"box_id": i.element_id, "caption": i.caption, "text": i.text} for i in parsed_screen_result['parsed_content_list']])
system_prompt = prompt.format(screen_info=screen_info, task_list=formatted_task_list)
system_prompt = prompt.format(task_list=formatted_task_list)
vlm_response = run(
messages,
user_prompt=system_prompt,
@ -34,14 +33,35 @@ class TaskRunAgent(BaseAgent):
)
vlm_response_json = json.loads(vlm_response)
response_content = [BetaTextBlock(text=vlm_response_json["reasoning"], type='text')]
if "box_id" in vlm_response_json and vlm_response_json["next_action"] not in ["None", "key", "type", "scroll_down", "scroll_up","cursor_position", "wait"]:
bbox = self.find_element_by_id(parsed_screen_result, vlm_response_json["box_id"]).coordinates
box_centroid_coordinate = [int((bbox[0] + bbox[2]) / 2 ), int((bbox[1] + bbox[3]) / 2 )]
move_cursor_block = BetaToolUseBlock(id=f'toolu_{uuid.uuid4()}',
input={'action': 'mouse_move', 'coordinate': box_centroid_coordinate},
name='computer', type='tool_use')
response_content.append(move_cursor_block)
# Handle cursor movement based on box_id
if "box_id" in vlm_response_json:
action_types_without_cursor = ["None", "key", "type", "scroll_down", "scroll_up", "cursor_position", "wait"]
if vlm_response_json["box_id"] != -1 and vlm_response_json["next_action"] not in action_types_without_cursor:
# Move cursor to the center of the identified element
element = self.find_element_by_id(parsed_screen_result, vlm_response_json["box_id"])
bbox = element.coordinates
box_centroid_coordinate = [
int((bbox[0] + bbox[2]) / 2),
int((bbox[1] + bbox[3]) / 2)
]
move_cursor_block = BetaToolUseBlock(
id=f'toolu_{uuid.uuid4()}',
input={'action': 'mouse_move', 'coordinate': box_centroid_coordinate},
name='computer',
type='tool_use'
)
response_content.append(move_cursor_block)
elif vlm_response_json["box_id"] == -1 and len(vlm_response_json["coordinates"]) == 2:
# Move cursor to specified coordinates
move_cursor_block = BetaToolUseBlock(
id=f'toolu_{uuid.uuid4()}',
input={'action': 'mouse_move', 'coordinate': vlm_response_json["coordinates"]},
name='computer',
type='tool_use'
)
response_content.append(move_cursor_block)
if vlm_response_json["next_action"] == "None":
print("Task paused/completed.")
elif vlm_response_json["next_action"] == "type":
@ -66,6 +86,7 @@ class TaskRunAgent(BaseAgent):
def create_dynamic_response_model(parsed_screen_result):
available_box_ids = [item.element_id for item in parsed_screen_result['parsed_content_list']]
available_box_ids.append(-1)
task_run_agent_response = create_model(
'TaskRunAgentResponse',
reasoning = (str, Field(
@ -78,11 +99,14 @@ def create_dynamic_response_model(parsed_screen_result):
}
)),
box_id = (int, Field(
description="要操作的框ID",
description="要操作的框ID如果框ID不存在就返回-1",
json_schema_extra={
"enum": available_box_ids
}
)),
coordinates = (list[int], Field(
description="当 box_id 为-1时直接返回要操作对象的坐标只返回x,y这2个整数"
)),
value = (str, Field(
description="仅当next_action为type时提供否则为None"
)),
@ -97,15 +121,12 @@ prompt = """
### 目标 ###
你是一个任务执行者请你根据屏幕截图和所有元素确定接下来要做什么如果任务完成把next_action设置为None
以下是当前屏幕上的所有元素caption和text是辅助你理解当前屏幕内容的你的决策主要依靠这两个信息截图仅限参考图标左上角的数字为box_id
{screen_info}
请根据以下任务列表判断一下你正在执行第几个任务current_task_id第一个任务是0任务列表如下
{task_list}
##########
### 注意 ###
- box_id 要严格参考所有元素中的box_id给出
- 要结合用户传入的屏幕图片观察其中的 box_id 框框和标号确定要操作哪一个box_id如果没有合适的请返回-1然后通过coordinates给出要操作对象的坐标
- 每次应该只给出一个操作告诉我要对哪个box_id进行操作输入什么内容或者滚动或者其他操作
- 应该对当前屏幕进行分析通过查看历史记录反思已完成的工作然后描述您如何实现任务的逐步思考
- 避免连续多次选择相同的操作/元素如果发生这种情况反思自己可能出了什么问题并预测不同的操作
@ -122,7 +143,8 @@ prompt = """
"next_action": str, # 要执行的动作。
"box_id": int, # 要操作的框ID当next_action为left_click、right_click、double_click、hover时提供否则为None
"value": "xxx" # 仅当操作为type时提供value字段否则不包括value键
"current_task_id": int # 当前正在执行第几个任务第一个任务是0
"current_task_id": int # 当前正在执行第几个任务第一个任务是0,
"coordinates": list[int] # 仅当box_id为-1时提供返回要操作对象的坐标只返回x,y这2个整数
}}
```

View File

@ -1,72 +1,27 @@
import os
from typing import List, Optional
from typing import List
import cv2
import torch
from ultralytics import YOLO
from transformers import AutoModelForCausalLM, AutoProcessor
import easyocr
import supervision as sv
import numpy as np
import time
from pydantic import BaseModel
import base64
from PIL import Image
from transformers import AutoConfig
import os
class UIElement(BaseModel):
element_id: int
coordinates: list[float]
caption: Optional[str] = None
text: Optional[str] = None
class VisionAgent:
def __init__(self, yolo_model_path: str, florence_model_path: str):
def __init__(self, yolo_model_path: str):
"""
Initialize the vision agent
Parameters:
yolo_model_path: Path to YOLO model
caption_model_path: Path to image caption model
"""
# determine the available device and the best dtype
self.device, self.dtype = self._get_optimal_device_and_dtype()
# load the YOLO model
self.yolo_model = YOLO(yolo_model_path)
# load the image caption model and processor
self.caption_processor = AutoProcessor.from_pretrained(
florence_model_path,
trust_remote_code=True,
local_files_only=True
)
try:
self.caption_model = AutoModelForCausalLM.from_pretrained(
florence_model_path, # 这里使用包含代码和权重的完整目录
torch_dtype=self.dtype,
trust_remote_code=True,
local_files_only=True
).to(self.device)
# 不需要额外加载权重,因为权重已经包含在 florence_base_path 中
except Exception as e:
print(f"Model loading failed: {e}")
raise e
self.prompt = "<CAPTION>"
# set the batch size
if self.device.type == 'cuda':
self.batch_size = 128
elif self.device.type == 'mps':
self.batch_size = 128
else:
self.batch_size = 16
self.elements: List[UIElement] = []
self.ocr_reader = easyocr.Reader(['en', 'ch_sim'])
def __call__(self, image_path: str) -> List[UIElement]:
"""Process an image from file path."""
@ -76,26 +31,6 @@ class VisionAgent:
raise FileNotFoundError(f"Vision agent: Failed to read image")
return self.analyze_image(image)
def _get_optimal_device_and_dtype(self):
"""determine the optimal device and dtype"""
if torch.cuda.is_available():
device = torch.device("cuda")
# check if the GPU is suitable for using float16
capability = torch.cuda.get_device_capability()
# only use float16 on newer GPUs
if capability[0] >= 7:
dtype = torch.float16
else:
dtype = torch.float32
elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
device = torch.device("mps")
dtype = torch.float32
else:
device = torch.device("cpu")
dtype = torch.float32
return device, dtype
def _reset_state(self):
"""Clear previous analysis results"""
self.elements = []
@ -112,114 +47,15 @@ class VisionAgent:
"""
self._reset_state()
element_crops, boxes = self._detect_objects(image)
start = time.time()
element_texts = self._extract_text(element_crops)
end = time.time()
ocr_time = (end-start) * 10 ** 3
print(f"Speed: {ocr_time:.2f} ms OCR of {len(element_texts)} icons.")
start = time.time()
element_captions = self._get_caption(element_crops, 5)
end = time.time()
caption_time = (end-start) * 10 ** 3
print(f"Speed: {caption_time:.2f} ms captioning of {len(element_captions)} icons.")
for idx in range(len(element_crops)):
boxes = self._detect_objects(image)
for idx in range(len(boxes)):
new_element = UIElement(element_id=idx,
coordinates=boxes[idx],
text=element_texts[idx][0] if len(element_texts[idx]) > 0 else '',
caption=element_captions[idx]
)
coordinates=boxes[idx])
self.elements.append(new_element)
return self.elements
def _extract_text(self, images: np.ndarray) -> list[str]:
"""
Run OCR in sequential mode
TODO: It is possible to run in batch mode for a speed up, but the result quality needs test.
https://github.com/JaidedAI/EasyOCR/pull/458
"""
texts = []
for image in images:
text = self.ocr_reader.readtext(image, detail=0, paragraph=True, text_threshold=0.85)
texts.append(text)
# print(texts)
return texts
def _get_caption(self, element_crops, batch_size=None):
"""get the caption of the element crops"""
if not element_crops:
return []
# if batch_size is not specified, use the instance's default value
if batch_size is None:
batch_size = self.batch_size
# resize the image to 64x64
resized_crops = []
for img in element_crops:
# convert to numpy array, resize, then convert back to PIL
img_np = np.array(img)
resized_np = cv2.resize(img_np, (64, 64))
resized_crops.append(Image.fromarray(resized_np))
generated_texts = []
device = self.device
# process in batches
for i in range(0, len(resized_crops), batch_size):
batch = resized_crops[i:i+batch_size]
try:
# select the dtype according to the device type
if device.type == 'cuda':
inputs = self.caption_processor(
images=batch,
text=[self.prompt] * len(batch),
return_tensors="pt",
do_resize=False
).to(device=device, dtype=torch.float16)
else:
# MPS and CPU use float32
inputs = self.caption_processor(
images=batch,
text=[self.prompt] * len(batch),
return_tensors="pt"
).to(device=device)
# special treatment for Florence-2
with torch.no_grad():
if 'florence' in self.caption_model.config.model_type:
generated_ids = self.caption_model.generate(
input_ids=inputs["input_ids"],
pixel_values=inputs["pixel_values"],
max_new_tokens=20,
num_beams=5,
do_sample=False
)
else:
generated_ids = self.caption_model.generate(
**inputs,
max_length=50,
num_beams=3,
early_stopping=True
)
# decode the generated IDs
texts = self.caption_processor.batch_decode(
generated_ids,
skip_special_tokens=True
)
texts = [text.strip() for text in texts]
generated_texts.extend(texts)
# clean the cache
if device.type == 'cuda' and torch.cuda.is_available():
torch.cuda.empty_cache()
except RuntimeError as e:
raise e
return generated_texts
def _detect_objects(self, image: np.ndarray) -> tuple[list[np.ndarray], list]:
"""Run object detection pipeline"""
results = self.yolo_model(image)[0]
@ -250,42 +86,7 @@ class VisionAgent:
# Map back to original indices
keep_indices = sorted_indices[keep_sorted]
filtered_boxes = boxes[keep_indices]
# Extract element crops
element_crops = []
for box in filtered_boxes:
x1, y1, x2, y2 = map(int, map(round, box))
element = image[y1:y2, x1:x2]
element_crops.append(np.array(element))
return element_crops, filtered_boxes
def load_image(self, image_source: str) -> np.ndarray:
try:
# Handle potential Data URL prefix (like "data:image/png;base64,")
if ',' in image_source:
_, payload = image_source.split(',', 1)
else:
payload = image_source
# Base64 decode -> bytes -> numpy array
image_bytes = base64.b64decode(payload)
np_array = np.frombuffer(image_bytes, dtype=np.uint8)
# OpenCV decode image
image = cv2.imdecode(np_array, cv2.IMREAD_COLOR)
if image is None:
raise ValueError("Failed to decode image: Invalid image data")
return self.analyze_image(image)
except (base64.binascii.Error, ValueError) as e:
# Generate clearer error message
error_msg = f"Input is neither a valid file path nor valid Base64 image data"
raise ValueError(error_msg) from e
return filtered_boxes

View File

@ -324,8 +324,7 @@ def run():
model.change(fn=update_model, inputs=[model, state], outputs=None)
api_key.change(fn=update_api_key, inputs=[api_key, state], outputs=None)
chatbot.clear(fn=clear_chat, inputs=[state], outputs=[chatbot])
vision_agent = VisionAgent(yolo_model_path=os.path.join(OMNI_PARSER_DIR, "icon_detect", "model.pt"),
florence_model_path=FLORENCE_DIR)
vision_agent = VisionAgent(yolo_model_path=os.path.join(OMNI_PARSER_DIR, "icon_detect", "model.pt"))
vision_agent_state = gr.State({"agent": vision_agent})
submit_button.click(process_input, [chat_input, state, vision_agent_state], [chatbot, task_list])
stop_button.click(stop_app, [state], None)

View File

@ -1,64 +1,18 @@
import subprocess
import os
import sys
from util import download_weights
def check_cuda_version():
try:
# try to get cuda version from nvidia-smi
result = subprocess.run(['nvidia-smi'], capture_output=True, text=True)
for line in result.stdout.split('\n'):
if 'CUDA Version:' in line:
cuda_version = line.split('CUDA Version:')[1].strip()
return cuda_version
# try to get cuda version from nvcc
result = subprocess.run(['nvcc', '--version'], capture_output=True, text=True)
for line in result.stdout.split('\n'):
if 'release' in line:
version = line.split('V')[-1].split('.')[0:2]
return '.'.join(version)
return None
except:
return None
def install_pytorch():
cuda_version = check_cuda_version()
if cuda_version is None:
print("CUDA not found. Installing CPU version of PyTorch")
cmd = "pip install torch torchvision --index-url https://download.pytorch.org/whl/cpu --timeout 3000"
elif cuda_version.startswith("11."):
print(f"CUDA {cuda_version} found. Installing PyTorch for CUDA 11.8")
cmd = "pip install torch torchvision --index-url https://download.pytorch.org/whl/cu118 --timeout 3000"
elif cuda_version.startswith("12.4"):
print(f"CUDA {cuda_version} found. Installing PyTorch for CUDA 12.4")
cmd = "pip install torch torchvision --index-url https://download.pytorch.org/whl/cu124 --timeout 3000"
elif cuda_version.startswith("12.6"):
print(f"CUDA {cuda_version} found. Installing PyTorch for CUDA 12.6")
cmd = "pip install torch torchvision --index-url https://download.pytorch.org/whl/cu126 --timeout 3000"
else:
print(f"CUDA {cuda_version} found, but not in 11.8, 12.4, 12.6, please reinstall cuda and try again")
exit(1)
print(f"Running: {cmd}")
subprocess.run(cmd, shell=True)
def install_requirements():
subprocess.run([sys.executable, '-m', 'pip', 'install', '-r', 'requirements.txt'])
def adjust_python_env():
# check if python is 3.12
if sys.version_info.major != 3 or sys.version_info.minor != 12:
print("Python version is not 3.12, please install python 3.12")
exit(1)
def install():
adjust_python_env()
install_pytorch()
install_requirements()
# download the weight files
download_weights.download()

View File

@ -1,8 +1,8 @@
# torch
# torchvision
easyocr
# easyocr
supervision==0.18.0
transformers
# transformers
ultralytics==8.3.70
numpy==1.26.4
gradio

View File

@ -1,239 +0,0 @@
{
"_name_or_path": "./Florence-2-base-ft",
"architectures": [
"Florence2ForConditionalGeneration"
],
"auto_map": {
"AutoConfig": "configuration_florence2.Florence2Config",
"AutoModelForCausalLM": "modeling_florence2.Florence2ForConditionalGeneration"
},
"bos_token_id": 2,
"eos_token_id": 1,
"ignore_index": -100,
"is_encoder_decoder": true,
"model_type": "florence2",
"pad_token_id": 0,
"projection_dim": 768,
"text_config": {
"_attn_implementation_autoset": true,
"_name_or_path": "",
"activation_dropout": 0.1,
"activation_function": "gelu",
"add_bias_logits": false,
"add_cross_attention": false,
"add_final_layer_norm": false,
"architectures": null,
"attention_dropout": 0.1,
"bad_words_ids": null,
"begin_suppress_tokens": null,
"bos_token_id": 0,
"chunk_size_feed_forward": 0,
"classif_dropout": 0.1,
"classifier_dropout": 0.0,
"cross_attention_hidden_size": null,
"d_model": 768,
"decoder_attention_heads": 12,
"decoder_ffn_dim": 3072,
"decoder_layerdrop": 0.0,
"decoder_layers": 6,
"decoder_start_token_id": 2,
"diversity_penalty": 0.0,
"do_sample": false,
"dropout": 0.1,
"early_stopping": true,
"encoder_attention_heads": 12,
"encoder_ffn_dim": 3072,
"encoder_layerdrop": 0.0,
"encoder_layers": 6,
"encoder_no_repeat_ngram_size": 0,
"eos_token_id": 2,
"exponential_decay_length_penalty": null,
"finetuning_task": null,
"forced_bos_token_id": 0,
"forced_eos_token_id": 2,
"gradient_checkpointing": false,
"id2label": {
"0": "LABEL_0",
"1": "LABEL_1",
"2": "LABEL_2"
},
"init_std": 0.02,
"is_decoder": false,
"is_encoder_decoder": true,
"label2id": {
"LABEL_0": 0,
"LABEL_1": 1,
"LABEL_2": 2
},
"length_penalty": 1.0,
"max_length": 20,
"max_position_embeddings": 1024,
"min_length": 0,
"model_type": "florence2_language",
"no_repeat_ngram_size": 3,
"normalize_before": false,
"num_beam_groups": 1,
"num_beams": 3,
"num_hidden_layers": 6,
"num_return_sequences": 1,
"output_attentions": false,
"output_hidden_states": false,
"output_scores": false,
"pad_token_id": 1,
"prefix": null,
"problem_type": null,
"pruned_heads": {},
"remove_invalid_values": false,
"repetition_penalty": 1.0,
"return_dict": true,
"return_dict_in_generate": false,
"scale_embedding": false,
"sep_token_id": null,
"suppress_tokens": null,
"task_specific_params": null,
"temperature": 1.0,
"tf_legacy_loss": false,
"tie_encoder_decoder": false,
"tie_word_embeddings": true,
"tokenizer_class": null,
"top_k": 50,
"top_p": 1.0,
"torch_dtype": null,
"torchscript": false,
"typical_p": 1.0,
"use_bfloat16": false,
"use_cache": true,
"vocab_size": 51289
},
"torch_dtype": "float32",
"transformers_version": "4.46.1",
"vision_config": {
"_attn_implementation_autoset": false,
"_name_or_path": "",
"add_cross_attention": false,
"architectures": null,
"bad_words_ids": null,
"begin_suppress_tokens": null,
"bos_token_id": null,
"chunk_size_feed_forward": 0,
"cross_attention_hidden_size": null,
"decoder_start_token_id": null,
"depths": [
1,
1,
9,
1
],
"dim_embed": [
128,
256,
512,
1024
],
"diversity_penalty": 0.0,
"do_sample": false,
"drop_path_rate": 0.1,
"early_stopping": false,
"enable_checkpoint": false,
"encoder_no_repeat_ngram_size": 0,
"eos_token_id": null,
"exponential_decay_length_penalty": null,
"finetuning_task": null,
"forced_bos_token_id": null,
"forced_eos_token_id": null,
"id2label": {
"0": "LABEL_0",
"1": "LABEL_1"
},
"image_feature_source": [
"spatial_avg_pool",
"temporal_avg_pool"
],
"image_pos_embed": {
"max_pos_embeddings": 50,
"type": "learned_abs_2d"
},
"is_decoder": false,
"is_encoder_decoder": false,
"label2id": {
"LABEL_0": 0,
"LABEL_1": 1
},
"length_penalty": 1.0,
"max_length": 20,
"min_length": 0,
"model_type": "davit",
"no_repeat_ngram_size": 0,
"num_beam_groups": 1,
"num_beams": 1,
"num_groups": [
4,
8,
16,
32
],
"num_heads": [
4,
8,
16,
32
],
"num_return_sequences": 1,
"output_attentions": false,
"output_hidden_states": false,
"output_scores": false,
"pad_token_id": null,
"patch_padding": [
3,
1,
1,
1
],
"patch_prenorm": [
false,
true,
true,
true
],
"patch_size": [
7,
3,
3,
3
],
"patch_stride": [
4,
2,
2,
2
],
"prefix": null,
"problem_type": null,
"projection_dim": 768,
"pruned_heads": {},
"remove_invalid_values": false,
"repetition_penalty": 1.0,
"return_dict": true,
"return_dict_in_generate": false,
"sep_token_id": null,
"suppress_tokens": null,
"task_specific_params": null,
"temperature": 1.0,
"tf_legacy_loss": false,
"tie_encoder_decoder": false,
"tie_word_embeddings": true,
"tokenizer_class": null,
"top_k": 50,
"top_p": 1.0,
"torch_dtype": null,
"torchscript": false,
"typical_p": 1.0,
"use_bfloat16": false,
"visual_temporal_embedding": {
"max_temporal_embeddings": 100,
"type": "COSINE"
},
"window_size": 12
},
"vocab_size": 51289
}