A2A协议:智能代理协作
A2A协议旨在解决智能代理系统间的互操作性问题,其核心思想是通过标准化的通信协议,使不同代理能在不共享内存、思想或工具的情况下完成协作。这种去中心化的设计特别适用于云计算、物联网和微服务架构场景。
核心架构解析
三层角色模型
- 用户(User):发起任务请求的人类或自动化系统
- 客户端(Client):解析用户需求并构造A2A请求的代理
- 远程代理(Server):执行具体任务并返回结果的智能服务
HTTP+SSE传输层
import requests
from sseclient import SSEClient as EventSource# 标准HTTP请求示例
response = requests.post('https://agent.example/rpc', json={"jsonrpc": "2.0","method": "tasks/send","params": {"task_id": "task123", "instructions": "处理图像数据"},"id": 1
})# SSE流式更新监听
messages = EventSource('https://agent.example/stream?task_id=task123')
for msg in messages:print(f"实时更新: {msg.data}")
JSON-RPC 2.0数据格式
{"jsonrpc": "2.0","method": "tasks/get","params": {"task_id": "task123"},"id": 2
}
任务全生命周期管理
A2A协议通过标准化的任务对象实现状态跟踪和协作,我们来看完整的任务处理流程:
任务创建与初始化
def create_task(client, task_id, instructions):response = client.post('/tasks/send', json={"task_id": task_id,"instructions": instructions,"context": {"user_id": "alice", "priority": "high"}})return response.json()# 使用示例
new_task = create_task(client=requests.Session(),task_id="img_process_001",instructions="识别图片中的交通标志"
)
状态轮询与流式更新
def monitor_task(task_id):# 轮询模式while True:response = requests.post('https://agent.example/rpc', json={"method": "tasks/get","params": {"task_id": task_id}})status = response.json().get('result', {}).get('status')if status == 'completed':breaktime.sleep(5)# SSE流式模式with EventSource(f'https://agent.example/stream?task_id={task_id}') as events:for event in events:if event.event == 'progress':print(f"处理进度: {event.data}%")
错误处理机制
A2A协议定义了标准化的错误代码体系:
错误代码 | 描述 | 处理策略 |
---|---|---|
-32600 | 无效请求 | 检查JSON格式和必填字段 |
-32601 | 方法不存在 | 验证API端点和方法名称 |
-32602 | 参数无效 | 添加参数校验逻辑 |
-32001 | 任务不存在 | 检查任务ID有效性 |
def safe_request(method, params):try:response = requests.post('https://agent.example/rpc', json={"jsonrpc": "2.0","method": method,"params": params,"id": uuid.uuid4()})return response.json()except requests.exceptions.RequestException as e:print(f"网络错误: {str(e)}")except json.JSONDecodeError:print("响应格式错误")# 错误处理示例
try:result = safe_request("tasks/invalid_method", {})if 'error' in result:error = result['error']print(f"错误 {error['code']}: {error['message']}")
except Exception as e:print(f"未捕获异常: {str(e)}")
高级功能
流式处理与实时更新
class TaskMonitor:def __init__(self, task_id):self.task_id = task_idself.event_source = SSEClient(f'https://agent.example/stream?task_id={task_id}')def start(self):for event in self.event_source:if event.event == 'artifact':self.handle_artifact(event.data)elif event.event == 'status':self.update_status(event.data)def handle_artifact(self, artifact_data):with open(f'result_{self.task_id}.jpg', 'wb') as f:f.write(base64.b64decode(artifact_data['content']))# 使用示例
monitor = TaskMonitor("img_process_001")
monitor.start()
多轮对话与上下文传递
def multi_turn_dialog(task_id):context = {}while True:response = requests.post('https://agent.example/rpc', json={"method": "tasks/get","params": {"task_id": task_id}})status = response.json()['result']['status']if status == 'input-required':required_input = response.json()['result']['required_input']user_input = input(f"需要补充信息: {required_input['prompt']}\n")context.update({"user_input": user_input,"timestamp": datetime.now().isoformat()})requests.post('https://agent.example/rpc', json={"method": "tasks/update","params": {"task_id": task_id, "context": context}})elif status == 'completed':break
非文本媒体处理
def process_image(image_path):with open(image_path, 'rb') as f:image_data = base64.b64encode(f.read()).decode('utf-8')response = requests.post('https://agent.example/rpc', json={"method": "tasks/send","params": {"task_id": "image_task_001","instructions": "识别图片内容","artifacts": [{"type": "image/jpeg", "content": image_data}]}})return response.json()# 使用示例
result = process_image("traffic_sign.jpg")
print(f"识别结果: {result['result']['artifacts'][0]['content']}")
实践
Agent Card标准化
每个代理应提供标准化的能力描述文件(Agent Card),示例:
{"name": "Image Recognition Agent","version": "1.0","methods": ["image/process", "image/analyze"],"authentication": {"type": "api_key", "endpoint": "/auth"}
}
幂等性设计
def idempotent_request(task_id, action):idempotency_key = uuid.uuid4().hexresponse = requests.post('https://agent.example/rpc', json={"method": action,"params": {"task_id": task_id},"idempotency_key": idempotency_key})return response.json()